ActiveMQ的启动方式 有独立启动和嵌入式启动,这里采用嵌入式启动的方式。
public static void testImmediately(){
BrokerService broker = new BrokerService();
// configure the broker
try {
broker.addConnector("tcp://localhost:61616");
broker.start();
} catch (Exception e) {
e.printStackTrace();
}
}ActiveMQ的服务端启动,采用了阻塞式I/O模型,看序列图

上面的过程是将一个socket层层封装,最终封装为TransportConnector的过程(很明显的使用到工厂模式)
方法2 主要功能就是根据uri创建对应的工厂,然后通过工厂创建连接
public static TransportServer bind(BrokerService brokerService, URI location) throws IOException {
TransportFactory tf = TransportFactory.findTransportFactory(location); //根据uri查找工厂,比如TCPFactory
if( brokerService!=null && tf instanceof BrokerServiceAware) {
((BrokerServiceAware)tf).setBrokerService(brokerService);
}
try {
if( brokerService!=null ) {
SslContext.setCurrentSslContext(brokerService.getSslContext());
}
return tf.doBind(location);//创建socket连接,监听指定端口
} finally {
SslContext.setCurrentSslContext(null);
}
}方法4 创建服务器并设置监听socket
public void bind() throws IOException {
URI bind = getBindLocation();
String host = bind.getHost();
host = (host == null || host.length() == 0) ? "localhost" : host;
InetAddress addr = InetAddress.getByName(host);
try {
this.serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog, addr); //创建服务端监听socket
configureServerSocket(this.serverSocket);
} catch (IOException e) {
throw IOExceptionSupport.create("Failed to bind to server socket: " + bind + " due to: " + e, e);
}
try {
setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), resolveHostName(serverSocket, addr), serverSocket.getLocalPort(), bind.getPath(),
bind.getQuery(), bind.getFragment()));
}
... //此处省略一些代码
}创建并初始化socket后,启动socket监听(方法12),更准确的说是启动TransportConnector的TransportServer(这里的Demo启动的是TcpTransportServer),具体的TcpTransportServer的start方法 (start调用doStart方法) :
protected void doStart() throws Exception {
if (useQueueForAccept) {
Runnable run = new Runnable() {
@Override
public void run() {
try {
while (!isStopped() && !isStopping()) {
Socket sock = socketQueue.poll(1, TimeUnit.SECONDS);
if (sock != null) {
try {
handleSocket(sock);
} catch (Throwable thrown) {
if (!isStopping()) {
onAcceptError(new Exception(thrown));
} else if (!isStopped()) {
LOG.warn("Unexpected error thrown during accept handling: ", thrown);
onAcceptError(new Exception(thrown));
}
}
}
}
} catch (InterruptedException e) {
LOG.info("socketQueue interuppted - stopping");
if (!isStopping()) {
onAcceptError(e);
}
}
}
};
socketHandlerThread = new Thread(null, run, "ActiveMQ Transport Server Thread Handler: " + toString(), getStackSize());
socketHandlerThread.setDaemon(true);
socketHandlerThread.setPriority(ThreadPriorities.BROKER_MANAGEMENT - 1);
socketHandlerThread.start();
}
super.doStart();
}注意上面方法最后的super.doStart() 这个是启动TcpTransportServer(TcpTransportServer继承Runnable),到这里服务端socket就已经启动监听了,后面写一个生产者客户端,用来与这个socket进行通讯,看看这个socket的通讯是如何处理的,先上序列图:

上面每种颜色代表一个线程.
未完待续...
版权声明:本文为u011166277原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。