activemq源码解析 (1)之服务端如何启动,客户端和服务端如何通信

ActiveMQ的启动方式 有独立启动和嵌入式启动,这里采用嵌入式启动的方式。

    public static void testImmediately(){
        BrokerService broker = new BrokerService();
        // configure the broker
        try {
            broker.addConnector("tcp://localhost:61616");
            broker.start();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

ActiveMQ的服务端启动,采用了阻塞式I/O模型,看序列图

上面的过程是将一个socket层层封装,最终封装为TransportConnector的过程(很明显的使用到工厂模式)

方法2 主要功能就是根据uri创建对应的工厂,然后通过工厂创建连接

    public static TransportServer bind(BrokerService brokerService, URI location) throws IOException {
        TransportFactory tf = TransportFactory.findTransportFactory(location); //根据uri查找工厂,比如TCPFactory
        if( brokerService!=null && tf instanceof BrokerServiceAware) {
            ((BrokerServiceAware)tf).setBrokerService(brokerService);
        }
        try {
            if( brokerService!=null ) {
                SslContext.setCurrentSslContext(brokerService.getSslContext());
            }
            return tf.doBind(location);//创建socket连接,监听指定端口
        } finally {
            SslContext.setCurrentSslContext(null);
        }
    }

方法4 创建服务器并设置监听socket

public void bind() throws IOException {
        URI bind = getBindLocation();

        String host = bind.getHost();
        host = (host == null || host.length() == 0) ? "localhost" : host;
        InetAddress addr = InetAddress.getByName(host);

        try {
            this.serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog, addr); //创建服务端监听socket
            configureServerSocket(this.serverSocket);
        } catch (IOException e) {
            throw IOExceptionSupport.create("Failed to bind to server socket: " + bind + " due to: " + e, e);
        }
        try {
            setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), resolveHostName(serverSocket, addr), serverSocket.getLocalPort(), bind.getPath(),
                bind.getQuery(), bind.getFragment()));
        }
        ... //此处省略一些代码
    }

创建并初始化socket后,启动socket监听(方法12),更准确的说是启动TransportConnector的TransportServer(这里的Demo启动的是TcpTransportServer),具体的TcpTransportServer的start方法 (start调用doStart方法) :

protected void doStart() throws Exception {
        if (useQueueForAccept) {
            Runnable run = new Runnable() {
                @Override
                public void run() {
                    try {
                        while (!isStopped() && !isStopping()) {
                            Socket sock = socketQueue.poll(1, TimeUnit.SECONDS);
                            if (sock != null) {
                                try {
                                    handleSocket(sock);
                                } catch (Throwable thrown) {
                                    if (!isStopping()) {
                                        onAcceptError(new Exception(thrown));
                                    } else if (!isStopped()) {
                                        LOG.warn("Unexpected error thrown during accept handling: ", thrown);
                                        onAcceptError(new Exception(thrown));
                                    }
                                }
                            }
                        }

                    } catch (InterruptedException e) {
                        LOG.info("socketQueue interuppted - stopping");
                        if (!isStopping()) {
                            onAcceptError(e);
                        }
                    }
                }
            };
            socketHandlerThread = new Thread(null, run, "ActiveMQ Transport Server Thread Handler: " + toString(), getStackSize());
            socketHandlerThread.setDaemon(true);
            socketHandlerThread.setPriority(ThreadPriorities.BROKER_MANAGEMENT - 1);
            socketHandlerThread.start();
        }
        super.doStart();
    }

注意上面方法最后的super.doStart() 这个是启动TcpTransportServer(TcpTransportServer继承Runnable),到这里服务端socket就已经启动监听了,后面写一个生产者客户端,用来与这个socket进行通讯,看看这个socket的通讯是如何处理的,先上序列图:

上面每种颜色代表一个线程.

未完待续...

 


版权声明:本文为u011166277原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。