Activemq-In-action(三)

第三部分 Activemq应用

创建Java应用

集成broker

嵌入broker

Broker Service

可以通过类org.apache.activemq.broker.BrokerService直接启动一个broker,并通过它设置配置和控制周期。

public static void main(String[] args) throws Exception {
  BrokerService broker = new BrokerService(); //A
  broker.setBrokerName("localhost");//A
  broker.setDataDirectory("data/"); //A
  
  SimpleAuthenticationPlugin authentication =
  new SimpleAuthenticationPlugin();
  List<AuthenticationUser> users =
  new ArrayList<AuthenticationUser>();
  users.add(new AuthenticationUser("admin",
  "password",
  "admins,publishers,consumers"));
  users.add(new AuthenticationUser("publisher",
  "password",
  "publishers,consumers"));
  users.add(new AuthenticationUser("consumer",
  "password",
  "consumers"));
  users.add(new AuthenticationUser("guest",
  "password",
  "guests"));
  authentication.setUsers(users);
  
  broker.setPlugins(new BrokerPlugin[]{authentication}); //B
  
  broker.addConnector("tcp://localhost:61616"); //C
  broker.start(); //D
}
//A Instantiate and configure Broker Service
//B Add plugins
//C Add connectors
//D Start broker

Broker Factory

BrokerService适用于简单场景,如果想要使用自定义配置,则org.apache.activemq.broker.BrokerFactory是个合适的选择,可以通过xml配置文件启动一个broker。

public class Factory {
  public static void main(String[] args) throws Exception {
      System.setProperty("activemq.base", System.getProperty("user.dir"));
      BrokerService broker = BrokerFactory.createBroker(new URI( #A
      "xbean:src/main/resources/org/apache/activemq/book/ch5/activemq-simple.xml" #A
      )); //A
      broker.start();
    }
}
//A Creating broker from XML
//URI:
//file:/etc/activemq/activemq.xml
//broker:(tcp://localhost:61616,network:static:tcp://remotehost:61616)?persistent=false&useJmx

使用spring集成

集成broker
package org.apache.activemq.book.ch6.spring;
import org.apache.activemq.book.ch5.Publisher;
import org.apache.xbean.spring.context.FileSystemXmlApplicationContext;
public class SpringBroker {
  public static void main(String[] args) throws Exception {
    if (args.length == 0) {
      System.err.println("Please define a configuration file!");
      return;
    }
    String config = args[0]; //A
    System.out.println(
    "Starting broker with the following configuration: " + config
    );
    System.setProperty("activemq.base", System.getProperty("user.dir")); //B
    
    FileSystemXmlApplicationContext //C
    context = new FileSystemXmlApplicationContext(config); //C
    Publisher publisher = new Publisher(); //D
    
    for (int i = 0; i < 100; i++) { //D
   	 publisher.sendMessage(new String[]{"JAVA", "IONA"}); //D
    } //D
    }
}
//A Define configuration file
//B Set base property
//C Initialize application context
//D Send messages
集成clients

Spring集成client工作:

  • 定义connection factory:
  • 定义destination:
  • 定义consumer:
  • 定义producer:
定义connection factory

使用:org.apache.activemq.ActiveMQConnectionFactoryorg.apache.activemq.pool.PooledConnectionFactory

定义destination

使用:org.apache.activemq.command.ActiveMQTopicorg.apache.activemq.command.ActiveMQQueue

定义consumer

DefaultMessageListenerContainer

定义producer

org.springframework.jms.core.JmsTemplate

嵌入Activemq到其他Java容器

Activemq不仅能嵌入Java应用,还可以嵌入:

  • 旧的Java应用
  • Apache Tomcat
  • Apache Geronimo
  • JBoss
  • Jetty

其他语法访问Activemq

Stomp (Streaming Text Orientated Messaging Protocol),用于脚本语言。

第四部分 Activemq高级特性

Broker拓扑结构

高可用

3种方式配置master/slave 。

Shared Nothing Master/Slave

Master 和 Slave各自都单独存储持久化的消息,它们不共享数据。

所有的state需要同步到slave。Master收到持久化消息时,需要先**同步(sync)**给Slave之后,才向Producer发送ACK确认。

slave不启动任何transport和network,因此不能接收连接。只有Master负责Client的请求,Slave不接收Client请求。Slave连接到Master,负责备份消息。

Master出现故障,Slave有两种处理方式:1、自己成为Master;2、关闭(停服务)—根据具体配置而定。

客户端配置failover:

failover://(tcp://masterhost:61616,tcp://slavehost:61616)?randomize=false

Slave 只能同步它连接到Master之后的消息。在Slave连接到Master之前Producer向Master发送的消息将不会同步给Slave,这可以通过配置(waitForSlave)参数,只有当Slave也启动之后,Master才开始初始化TransportConnector接受Client的请求(Producer的请求),否则有可能丢失消息。

如果Master 或者 Slave其中之一宕机,它们之间不同步的消息 无法 自动进行同步,此时只能手动恢复不同步的消息了。也就是说:“ActiveMQ没有提供任何有效的手段,能够让master与slave在故障恢复期间,自动进行数据同步”

对于非持久化消息,并不会同步给Slave。因此,Master宕机,非持久化消息会丢失。

已经运行的broker,如果做master/slave,需要把master的数据文件copy到slave上,然后重启maser和slave。

Master 与 Slave之间可能会出现“Split Brain”现象。比如:Master本身是正常的,但是Master与Slave之间的网络出现故障,网络故障导致Slave认为Master已经宕机,因为它自己会成为Master(根据配置:shutdownOnMasterFailure)。此时,对Client而言,就会存在两个Master。

配置

配置一个broker为slave。

<services>
	<masterConnector remoteURI= "tcp://localhost:62001" userName="Rob" password="Davies"/>
</services>
属性默认值描述
shutdownOnmasterFailurefalsemaster 宕机之后slave是否shutdown
属性默认值描述
waitForslavefalsemaster是否等slave启动之后才能接收连接
shutdownOnslaveFailurefalseslave 宕机之后master是否shutdown
场景

share database和share file 不能满足的情况。

broker已运行

Shared Database Master/Slave

这是很常用的一种架构。“共享存储”,意味着Master与Slave之间的数据是共享的。

那如何避免冲突呢?通过争夺数据库表的排他锁,只有Master有锁,未获得锁的自动成为Slave。

image-20210521172000021

对于“共享存储”而言,只会“共享”持久化消息。对于非持久化消息,它们是在内存中保存的。可以通过配置(forcePersistencyModeBrokerPlugin persistenceFlag)属性强制所有的消息都持久化。

当Master宕机后,Slave可自动接管服务成为Master。由于数据是共享的,因此Master和Slave之间不需要进行数据的复制与同步。Slave之间通过竞争锁来决定谁是Master。

Shared File system Master/Slave

类似 data share。

可能是最好的解决方案。

Broker之间的网络

Activemq网络使用store和forward。

Store and Forward(存储转发)

ActiveMQ的存储和转发概念意味着,消息在通过network转发到其他broker之前,总是被存储在本地broker中,也就是说,如果一条消息由于连接原因没有被交付,比如说,正在重连,broker将能够通过网络连接将未交付的消息发送到远程broker。默认情况下,network仅以单向方式操作

当然,这并不是说network只能单向操作,如果想要双向操作,同样可以在远程broker中配置一个network connector指向本地的broker,或者直接指定创建的network connector为双向duplex。

当本地broker和远程broker之间建立好一条network后,远程broker会将其所有持久和处于活动的消费者的目的地信息传递给本地broker,本地broker使用这些信息去判断远程broker对哪种消息感兴趣,并转发该类型消息给它。

假如我们有多个超市需要连接到一个后台办公订购系统,这将很难灵活扩展新的超市,后台办公订购系统不好掌控所有新加入的超市即远程broker。注意到这里,超市broker和back office之间的network是双向的,超市broker的配置:

<networkConnectors>
    <networkConnector uri="static://(tcp://backoffice:61617)" 
        name="bridge" 
        duplex="true"
        conduitSubscriptions="true"
        decreaseNetworkConsumerPriority="false">
    </networkConnector>
</networkConnectors>

这里关于配置,主要注意一点是,配置的顺序是很重要的,关于networks,persistence,transports的顺序如下:

  1. Networks——必须在消息存储之前创建
  2. Message store——必须在传输配置好之前配置完
  3. Transports——必须在broker配置的最后
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://activemq.apache.org/schema/core">
    <broker brokerName="receiver" persistent="true" useJmx="true">
        <networkConnectors>
            <networkConnector uri="static:(tcp://backoffice:61617)"/>
        </networkConnectors>
        <persistenceAdapter>
            <kahaDB directory = "activemq-data"/>       
        </persistenceAdapter>
        <transportConnectors>
            <transportConnector uri="tcp://localhost:62002"/>
        </transportConnectors>
    </broker>
</beans>

在大型开发场景下的高可用性和network配置结合

image-20210521173809786

Network发现机制

ActiveMQ提供两种发现机制:

  1. Dynamic——使用组播和会合方式搜索brokers
  2. Static——通过一个URI列表配置brokers

使用组播发现的方式去创建network连接是最简单粗暴的,当你启动一个broker时,它会通过组播IP去搜索其他的broker并创建network连接。配置方式如下:

<networkConnectors>
    <networkConnector uri="multicast://default"/>
</networkConnectors>

更多见前面章节。

Network配置

对于远程broker现存在的目的地,可能没有任何活动持久的订阅者或消费者,因此,当network初始化连接到远程broker时,远程broker会读取它现存目的地的消息,并传递给本地broker,然后,本地broker也可以转发那些目的地的消息。
重要的是要注意,一个network将使用broker的名称来代表远程broker创建唯一的持久预订代理。 因此,如果在稍后的时间点更改broker的名称,很可能会通过network丢失持久主题订阅者的消息。 为避免这种情况,请确保为元素上的brokerName属性使用唯一的名称。 有关简要示例,请参阅以下内容:

<broker xmlns="http://activemq.apache.org/schema/core/"
    brokerName="brokerA"
    dataDirectory="${activemq.base}/data">
...
    <networkConnectors>
        <networkConnector name="brokerA to brokerB" uri="tcp://remotehost:61616"/>
    </networkConnectors>
</broker>

应用扩展

垂直扩展

垂直扩展是一种用于增加单个ActiveMQ broker可以处理的连接数(因此增加负载)的技术。默认情况下,ActiveMQ broker设计为尽可能高效地移动消息,以确保低延迟和良好的性能。但是我们可以做一些配置调整,以确保ActiveMQ broker可以处理大量的并发连接和大量的队列。

默认情况下,ActiveMQ将使用阻塞I/O来处理传输连接。 这导致每个连接使用一个线程。 我们可以在ActiveMQ broker上使用非阻塞I/O(而客户端上仍然使用默认传输)来减少使用的线程数。broker的非阻塞I/O配置如下:


<broker>
    <transportConnectors>
        <transportConnector name="nio" uri="nio://localhost:61616"/>
    </transportConnectors>
</broker>

除了每个连接使用一个线程来阻塞I/O外,ActiveMQ broker可以使用线程为每个客户端连接分派消息。可以通过将名为org.apache.activemq.UseDedicatedTaskRunner的系统属性设置为false,让ActiveMQ使用线程池。

ACTIVEMQ_OPTS="-Dorg.apache.activemq.UseDedicatedTaskRunner=false"

确保ActiveMQ broker具有足够的内存来处理大量并发连接有两步过程:

  • 首先,需要确保启动ActiveMQ broker的JVM配置了足够的内存。
ACTIVEMQ_OPTS="-Xmx1024M \-Dorg.apache.activemq.UseDedicatedTaskRunner=false"
  • 第二,确保专门为ActiveMQ broker在JVM配置适当的内存量。此调整通过< system-Usage >元素的limit属性进行。(最好从512MB开始,如果测试不够再往上加),配置示例:
<systemUsage>
    <systemUsage>
        <memoryUsage>
            <memoryUsage limit="512 mb"/>
        </memoryUsage>
        <storeUsage>
            <storeUsage limit="10 gb" name="foo"/>
        </storeUsage>
        <tempUsage>
            <tempUsage limit="1 gb"/>
        </tempUsage>
    </systemUsage>
</systemUsage>

还应该降低每一个连接的CPU负载,如果使用的OpenWire连接方式,禁用紧密编码,否则会使得CPU过度紧张。

String uri = "failover://(tcp://localhost:61616?" + " wireFormat.tightEncodingEnabled=false)";
ConnectionFactory cf = new ActiveMQConnectionFactory(uri);

前面研究的是broker怎么调整去处理数千个连接,下面开始研究的是怎么调整broker去处理数千个队列。

默认队列配置使用单独的线程来将消息从消息存储区分页到队列中,以便分发给感兴趣的消息消费者。 对于大量队列,建议通过为所有队列启用optimize-Dispatch属性来禁用此功能,

<destinationPolicy>
    <policyMap>
        <policyEntries>
            <policyEntry queue=">" optimizedDispatch="true"/>
        </policyEntries>
    </policyMap>
</destinationPolicy>

为了确保不仅可以扩展到数千个连接,而且还可以扩展到数万个队列,使用JDBC消息存储库或更新和更快的KahaDB消息存储库。 KahaDB默认情况下在ActiveMQ中启用。

到目前为止,我们已经考虑了扩展连接,减少线程使用,并选择正确的消息存储。 调整用于扩展的ActiveMQ的示例配置如以下:

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="amq-broker" dataDirectory="${activemq.base}/data">
    <persistenceAdapter>
        <kahaDB directory="${activemq.base}/data" journalMaxFileLength="32mb"/>
    </persistenceAdapter>
    <destinationPolicy>
        <policyMap>
            <policyEntries>
                <policyEntry queue="&gt;" optimizedDispatch="true"/>
            </policyEntries>
        </policyMap>
    </destinationPolicy>
    <systemUsage>
        <systemUsage>
            <memoryUsage>
                <memoryUsage limit="512 mb"/>
            </memoryUsage>
            <storeUsage>
                <storeUsage limit="10 gb" name="foo"/>
            </storeUsage>
            <tempUsage>
                <tempUsage limit="1 gb"/>
            </tempUsage>
        </systemUsage>
    </systemUsage>
    <transportConnectors>
        <transportConnector name="openwire" uri="nio://localhost:61616"/>
    </transportConnectors>
</broker>

水平扩展

除了扩展单个broker之外,还可以使用networks来增加可用于应用程序的ActiveMQ broker的数量。 由于networks会自动将消息传递给具有感兴趣的消费者的连接broker,因此可以将客户端配置为连接到一个broker集群,随机选择一个来连接。

failover://(tcp://broker1:61616,tcp://broker2:61616)?randomize=true

为了确保队列或持久主题订阅者的消息不会在broker上孤立,需要将network配置为使用dynamicOnly和低网络prefetchSize。

<networkConnector uri="static://(tcp://remotehost:61617)"
    name="bridge"
    dynamicOnly="true"
    prefetchSize="1">
</networkConnector>

使用network进行水平扩展会带来更多的延迟,因为潜在的消息必须在分发给消费者之前通过多个broker。

另一种替代部署提供了巨大的可扩展性和性能,但需要更多的应用规划。 这种混合解决方案称为流量分区。

流量分区

客户端流量分割是垂直和水平分割的混合。 通常不使用network,因为客户端应用程序决定什么流量应该到哪个broker上。 客户端应用程序必须维护多个JMS连接,并决定哪些JMS连接应用于哪些目标。

不直接使用network connection的优点是,减少在brokers之间转发消息的开销。 需要平衡这与导致典型应用程序的额外复杂性。

image-20210521175510230

Activemq Broker高级特性

Wildcards and Composite Destinations

订阅Wildcard Destinations

Activemq支持 层级destination,每个层级名字部分用 点号(.)分隔,适用于topic和queue。

destination名称保留3个特殊字符:

  • .:名称分隔符
  • *:匹配一个元素
  • >:匹配一个或多个后面的元素

仅支持消费者端。

发送消息到多个destination

Activemq支持 composite destination 特性,用于把消息发送到组合的destination(queue、topic)中。

在使用时,指定destination的name为逗号分隔的name列表:

store.order.backoffice,store.order.warehouse  #发送到2个queue。

composite destination支持topic和queue混合模式。需要对destination加上协议

queue://****  #queue默认可以不用加
topic://****
#混合模式
store.orders, topic://store.orders

advisory message

advisory message是系统消息,是broker产生的用于通知系统修改的消息。包括

  • 管理对象(Connection,Destination,Consumer,Producer)加入或离开broker
  • broker达到限制(limit)

advisory message产生在系统定义的topic上,每个advisory message都有一个JMSType:Advisory,以及预定义的字符串属性:

  • originBrokerId - 产生Advisory消息的broker的id。
  • originBrokerName -broker的名字
  • originBrokerURL - broker的url

org.apache.activemq.advisory.AdvisorySupport.CONNECTION_ADVISORY

默认一些advisories 是禁用的,

<destinationPolicy>
	<policyMap>
    <policyEntries>
		<policyEntry queue=">" advisoryForSlowConsumers="true" />
		</policyEntries>
  </policyMap>
</destinationPolicy>

Virtual Topics

如果想广播消息到多个consumer,可以使用topic。如果想一组consumer消息一个destination,则使用queue。但是没有合适的方式广播消息到topic,但是一组consumer 跟queue一样共同消费它(每条消息仅会投递到一个consumer)。

Virtual Topics 提供了一种简便的方式让topic具有queue相同的语义。

image-20210531130053639

使用Virtual topic需要遵循:

  • topic必须以 VirtualTopic. 开头。例如:VirtualTopic.orders

  • queue的名字格式为:Consumer.<consumer name>.VirtualTopic.<VirtualTopic Name>

    Consumer.A.VirtualTopic.orders,Consumer.B.VirtualTopic.orders
    
    Session consumerSessionA = consumerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
    Queue consumerAQueue = consumerSessionA.createQueue("Consumer.A.VirtualTopic.orders");
    MessageConsumer consumerA = consumerSessionA.createConsumer(consumerAQueue);
    Session consumerSessionB = consumerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
    Queue consumerBQueue = consumerSessionB.createQueue("Consumer.B.VirtualTopic.orders");
    MessageConsumer consumerB = consumerSessionB.createConsumer(consumerAQueue);
    
    //setup the sender
    Connection senderConnection = connectionFactory.createConnection();
    senderConnection.start();
    
    Session senerSession = senderConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    Topic ordersDestination = senerSession.createTopic("VirtualTopic.orders");
    MessageProducer producer = senerSession.createProducer(ordersDestination);
    

Retroactive Consumers(可回溯消费者)

为了性能,采用消息非持久化时,会带来消息不能被消费者消费。为了在消费非持久化的模式下提供有限的回溯能力,Activemq提供了缓存一定size或number的消息的能力。为了实现此能力,需要:

  • 消费者需要告诉broker关注可回溯消息
  • broker配置destination 需要缓存多少消息。
String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
Connection connection = connectionFactory.createConnection();c
onnection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("soccer.division1.leeds?consumer.retroactive=true"); //设置需要消费回溯消息
MessageConsumer consumer = session.createConsumer(topic);
Message result = consumer.receive();

broker端需要设置RecoverPolicy

<destinationPolicy>
  <policyMap>
    <policyEntries>
      <policyEntry topic=">" >
        <subscriptionRecoveryPolicy >
        	<fixedSizedSubscriptionRecoveryPolicy  maximumSize = "8mb"/>
        </subscriptionRecoveryPolicy>
      </policyEntry>
    </policyEntries>
  </policyMap>
</destinationPolicy>

消息重投递和死信队列

当消息不能重投递或者消息过期,会被移到死信队列中,由管理员消费。

消息重投递的常用场景:

  • 事务回滚
  • 事务提交前close
  • A client is using CLIENT_ACKNOWLEDGE on a Session and calls recover() on that Session.

重新投递策略

RedeliveryPolicy policy = connection.getRedeliveryPolicy();
policy.setInitialRedeliveryDelay(500);
policy.setBackOffMultiplier(2);
policy.setUseExponentialBackOff(true);
policy.setMaximumRedeliveries(2);

死信队列

重新投递 次数超过 MaximumRedeliveries ,则会进入死信队列。

默认情况,有一个死信队列:AcitveMQ.DLQ,所有的消息都投递到此队列,包括过期消息,重投递失败消息。

配置个性化死信队列。

<destinationPolicy>
    <policyMap>
        <policyEntries>
            <policyEntry queue=">">
                <deadLetterStrategy>
                  <individualDeadLetterStrategy queuePrefix="DLQ."
                  useQueueForQueueMessages="true"
                  processExpired="false"
                  processNonPersistent="false"/>
                </deadLetterStrategy>
            </policyEntry>
        </policyEntries>
    </policyMap>
</destinationPolicy>

高级客户端选项

Exclusive Consumer

当多个消费者消费一个queue时,不能保证消费的顺序。如果设置consumer为exclusive,则queue会选择一组consumer中的一个,把所有消息都发送给它,相对于仅有一个消费者的好处是,选中的consumer 不可用时,会再选中另一个继续消费。

当消费一个queue的consumer中既有exclusive的,又有非exclusive的,仍然会仅把消息投递给exclusive consumer。当所有非exclusive consumer都不可用,queue就会回到normal模式,queue会把消息投递到所有 非exclusive consumer。

queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true");
consumer = session.createConsumer(queue);

常见exclusive producer

public void start() throws JMSException {
this.connection = this.factory.createConnection();
this.connection.start();
this.session = this.connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Destination destination = this.session.createQueue(this.queueName + "?consumer.exclusive=true")
Message message = this.session.createMessage();
MessageProducer producer = this.session.createProducer(destination);
producer.send(message);
MessageConsumer consumer = this.session.createConsumer(destination);
consumer.setMessageListener(this);
}

Message Groups

所有message都由一个cousumer消费,通过Message header JMSXGroupID,具有相同JMSXGroupID的消息会被发送到同一个consumer,这种通用概念叫消息组。broker保证同一个消息组的消息都会发送到同一个consumer。当consumer不可连接时,消息会又都路由到另外一个consumer。

image-20210602130531493

创建消息组

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("group.queue");
MessageProducer producer = session.createProducer(queue);
Message message = session.createTextMessage("
<foo>test</foo>
");
message.setStringProperty("JMSXGroupID", "TEST_GROUP_A");
producer.send(message);

Activemq会为组内的每一条消息都加一个header属性值:JMSXGroupSeq

如果想明确的关闭一个消息组,设置JMSXGroupSeq为-1。

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("group.queue");
MessageProducer producer = session.createProducer(queue);
<foo></foo>
Message message = session.createTextMessage("
<foo>close</foo>
");
message.setStringProperty("JMSXGroupID", "TEST_GROUP_A");
message.setIntProperty("JMSXGroupSeq", -1);
producer.send(message);

不能假设JMSXGroupSeq都是从1开始。当一个消息组consumer不可用时,所有路由到它的同组的消息都会路由到另一个consumer,JMSXGroupFirstForConsumer被设置为路由到consumer的是第一条信息,用于标识consumer正在消费另一个消息组或者一个新的消息组。

Session session = MessageConsumer consumer = session.createConsumer(queue);
Message message = consumer.receive();
String groupId = message.getStringProperty("JMSXGroupId");
if (message.getBooleanProperty("JMSXGroupFirstForConsumer")) {
// do processing for new group
}

如果有多个consumer消费一个queue,很有可能把消息都发送给第一个consumer,如果为了聚恒分布式消费,可以设置broker有足够的consumer之后再分发消息。

等待消费者满足条件:

<destinationPolicy>
  <policyMap>
    <policyEntries>
    	<policyEntry queue=">" consumersBeforeDispatchStarts="2" timeBeforeDispatchStarts="5000"/>
    </policyEntries>
  </policyMap>
</destinationPolicy>

ActiveMQ Streams

ActiveMQ Streams是一个高级特性(需要谨慎使用),允许通过一个文件或socket发送数据到Activemq。

如果仅涉及一个consumer或一个queue(or exclusive consumer),工作很好,但是其他情况,消息的顺序就是个很大的问题。

image-20210602132223979

Activemq使用JMS Stream的好处是把消息分成chunks,可以传输大量数据。

通过ActiveMQ Stream发送数据:

//source of our large data
FileInputStream in = new FileInputStream("largetextfile.txt");
String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue destination = session.createQueue(QUEUE_NAME);
//创建stream
OutputStream out = connection.createOutputStream(destination);
//now write the file on to ActiveMQ
byte[] buffer = new byte[1024];
while(true){
  int bytesRead = in.read(buffer);
  if (bytesRead==-1){
 	 break;
  }
  out.write(buffer,0,bytesRead);
}
//close the stream so the receiving side knows the steam is finished
out.close();

ActiveMQ Stream接收数据:

//destination of our large data
FileOutputStream out = new FileOutputStream("copied.txt");
String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//we want be be an exclusive consumer
String exclusiveQueueName= QUEUE_NAME + "?consumer.exclusive=true";
Queue destination = session.createQueue(exclusiveQueueName);
InputStream in = connection.createInputStream(destination);
//now write the file from ActiveMQ
byte[] buffer = new byte[1024];
while(true){
  int bytesRead = in.read(buffer);
  if (bytesRead==-1){
  break;
  }
  out.write(buffer,0,bytesRead);
}
out.close();

使用exclusive consumer时,确保同一个时刻仅有一个consumer读取stream。Topic也可以使用stream,但是读取开始之前的数据是丢失的。

Blob Messages

Blob Message并不包含要发送的数据,仅是一个通知,告知一个Blob是可用的。Blob数据本身是在外部传输,例如ftp或http。Blob消息包含的数据的url,已经获取InputStream 得到真实数据的方法。

Sending a BlobMessage

String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
Connection connection = connectionFactory.createConnection();
connection.start();
ActiveMQSession session = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue destination = session.createQueue(QUEUE_NAME);
MessageProducer producer = session.createProducer(destination);
BlobMessage message = session.createBlobMessage(new URL("http://some.shared.site.com"));
producer.send(message);

处理BlobMessage:

// destination of our Blob data
FileOutputStream out = new FileOutputStream("blob.txt");
String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
Connection connection = (ActiveMQConnection) connectionFactory.createConnection();
connection.start();

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue destination = session.createQueue(QUEUE_NAME);
MessageConsumer consumer = session.createConsumer(destination);
BlobMessage blobMessage = (BlobMessage) consumer.receive();
//获取stream。
InputStream in = blobMessage.getInputStream();
// now write the file from ActiveMQ
byte[] buffer = new byte[1024];
while (true) {
  int bytesRead = in.read(buffer);
  if (bytesRead == -1) {
  	break;
  }
  out.write(buffer, 0, bytesRead);
}
out.close();

性能调优

通用技术

非持久化(不关心消息丢失)

持久化和非持久化消息

持久化消息用于减少灾难性错误以及保证consumer可消费之前的数据。

非持久化消息,仅把消息投递给active consumer。

非持久化消息比持久化消息快的原因:

  • 异步发送(不需要等待结果)
  • 不存储

使用持久化消息用于防范消息丢失,activemq采用:

  • 重发消息
  • 过滤掉重复消息

设置投递模式

MessageProducer producer = session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

事务

事务用于批量处理数据(不同于数据库的事务,不是原子性保证)

public void sendTransacted() throws JMSException {
//create a default connection - we'll assume a broker is running
//with its default configuration
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
Connection connection = cf.createConnection();
connection.start();
//create a transacted session
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Topic topic = session.createTopic("Test.Transactions");
MessageProducer producer = session.createProducer(topic);
int count =0;
  for (int i =0; i < 1000; i++) {
      Message message = session.createTextMessage("message " + i);
      producer.send(message);
      //commit every 10 messages
      if (i!=0 && i%10==0){
      	session.commit();
      }
  }
}

public void sendNonTransacted() throws JMSException {
//create a default connection - we'll assume a broker is running
//with its default configuration
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
Connection connection = cf.createConnection();
connection.start();
//create a default session (no transactions)
Session session = connection.createSession(false, Session.AUTO_ACKNOWELDGE);
Topic topic = session.createTopic("Test.Transactions");
MessageProducer producer = session.createProducer(topic);
int count =0;
  for (int i =0; i < 1000; i++) {
    Message message = session.createTextMessage("message " + i);
    producer.send(message);
  }
}

内嵌brokers

消除 序列化和网络传输,数据在同一个JVM内。

BrokerService broker = new BrokerService();
broker.setBrokerName("service");
broker.setPersistent(false);
broker.addConnector("tcp://localhost:61616");
broker.start();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://service");
cf.setCopyMessageOnSend(false);
Connection connection = cf.createConnection();
connection.start();

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//we will need to respond to multiple destinations - so use null
//as the destination this producer is bound to
final MessageProducer producer = session.createProducer(null);
//create a Consumer to listen for requests to service
Queue queue = session.createQueue("service.queue");
MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(new MessageListener() {
  public void onMessage(Message msg) {
    try {
      TextMessage textMsg = (TextMessage)msg;
      String payload = "REPLY: " + textMsg.getText();
      Destination replyTo;
      replyTo = msg.getJMSReplyTo();
      textMsg.clearBody();
      textMsg.setText(payload);
      producer.send(replyTo, textMsg);
    } catch (JMSException e) {
    e.printStackTrace();
    }
  }
});

Connecting a QueueRequestor,用于测试内嵌broker

ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
QueueConnection connection = cf.createQueueConnection();
connection.start();
QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("service.queue");
QueueRequestor requestor = new QueueRequestor(session,queue);
for(int i =0; i < 10; i++) {
TextMessage msg = session.createTextMessage("test msg: " + i);
TextMessage result = (TextMessage)requestor.request(msg);
System.err.println("Result = " + result.getText());
}

Tuning the OpenWire protocol

OpenWire Protocol使用二进制格式传输命令到broker,命令包含消息、应答等信息。性能相关的格式化参数如下:

属性默认值描述
tcpNoDelayEnabledfalse是否启用tcpNoDelay。true,则在慢网络发送大量小size消息能够提高性能
cacheEnabledtrue如果启用,则缓存重复的值(比如producerId和消息目的地),允许传输与缓存值对应的简短的key.该设置减小了传输数据的大小,因而在网络性能不佳时可以提升性能.但是因为在缓存中查找数据会同时在客户端和代理所在的机器中引入CPU负载的额外开销,配置时请考虑中引入开销的影响.
cacheSize1024缓存条目的最大数量.该值不能超过Short.MAX_VALUE值的二分之一.当开启缓存时,该值设置的越大性能越好.但是因为每一个传输连接都独立使用一个缓存,所以需要在代理端考虑因缓存带来的额外开销,特别是当有大量的客户端连接到代理时.
tightEncodingEnabledtrue以CPU敏感的方式压缩消息.建议在broker使用所有可用CPU时将该选项设置为停用.

可以通过下面的方式,将上述参数附加到连接到代理的传输连接器的URI中:

//使用tightEncodingEnabled参数禁用紧凑编码(tight encoding)
String uri =
"failover://(tcp://localhost:61616?wireFormat.cacheEnabled=false&wireFormat.tightEncodingEnabled=false)";
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(url);
cf.setAlwaysSyncSend(true);
TCP Transport优化

ActiveMQ中使用最广泛的连接器是TCP Transport.有两个直接影响它的性能:

  • socketBufferSize:TCP Transport用于发送和接收数据的缓冲区大小.通常该参数设置的越大越好(尽管这个最大值收到操作系统限制,但是可以去测试).默认值为65536,单位是byte.
  • tcpNoDelay–默认值为false.通常TCP套接字缓存即将被发送的小尺寸数据包.当启用这个参数时,消息会被**尽快发送(**译注:不缓冲).同样可以测试这个配置,因为修改这个参数是否能提升性能还和操作系统有关.
String url = "failover://(tcp://localhost:61616?tcpNoDelay=true)";
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(url);
cf.setAlwaysSyncSend(true);

优化Producer

在Producer分发消息到消息消费者之前,Producer发送消息到broker的效率是影响整个应用程序性能的基本因素。

以下是几个影响消息吞吐量和消息发送延迟 的调优参数.

异步发送

ActiveMQ中非持久化消息分发是可靠的,因为消息会在网络故障和系统崩溃(只要消息生产者依然是活动的–此时消息生产者将消息缓存在失效转移连接器的缓存中)中幸存下来.但是,通过设置消息生产者的连接工厂的useAsyncSend属性, 仍然 可以在使用持久化消息时获得性能提升。

ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
cf.setUseAsyncSend(true);

上面代码将设置一个属性,告知消息生产者不要尝试从broker获取一个刚刚发送的消息的回执.

如果程序要求消息确保被投递,则推荐使用系统默认的持久化消息分发模式,并且最好同时使用事务.

使用异步方式发送消息获以取性能提升的原因不难理解,同时以这种方式实现性能提升的方式也很简单–只要设置ActiveMQ连接工厂的一个属性即可.下一节将介绍ActiveMQ中一种通常不被理解的特性:消息生产者流控制.我们将看到大量的关于消息生产者效率下降或暂停问题,并且理解 如何在应用程序中使用流控制来减少这些问题的发生.

Producer Flow Control

生产者流控制允许消息broker在系统资源紧张时降低消息的通过量.这种情况发生在消息消费者处理 速度慢于生产者时,此时消息会缓存在broker的内存中等待被发送。消息生产者在收到broker通知有足够存储空间接收更多消息之前都会处于等待状态。生产者流控制对于阻止broker的内存和临时存储空间超过限制的值来说是必要的,尤其是对于广域网来说.

image-20210816213235474

对于持久化消息来说,生产者流控制默认时就是开启的,但是对于异步消息发布来说必须明确的指定为开启(针对持久化消息,或对于配置成总是异步发送消息的连接来说).可以通过设置连接工厂的producerWindowSize属性来开启消息生产者异步发送消息的流控制.

ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();  
cf.setProducerWindowSize(1024000);  

producerWindowSize属性用于设置生产者在收到broker的存储空间未超过限制值回执之前可以用来缓存消息的字节大小,超过这个设置的值,生产者将等待代理回执(而不再生产和发送消息)。对于一个异步发送消息的生产者来说,如果这个属性未开启,则broker仍然会暂停消息流动,这种情况下,默认会阻塞消息生产者的Transport.阻塞Transport会阻塞所有使用该Transport连接的用户,如果消息消费者也使用同样的连接,则会导致死锁。生产者流控制运行仅阻塞消息生产者而不是整个消息生产者使用的Transport。

尽管保护broker不要运行在可用内存空间低的状态是一个不错的想法,但是这并不能改善 系统被最慢的消费者拖慢时而产生的性能问题。所以,让我们看看禁用生产者流控制 时会发生什么,如下面代码中的粗体字所示,你可以在broker的配置中配置消息destination的 策略来实现.

<destinationPolicy>  
		<policyMap>  
   		 <policyEntries>  
        	<policyEntry topic="FOO.>"  producerFlowControl="false" memoryLimit="10mb" />  
        </policyEntries>  
    </policyMap>  
</destinationPolicy>  

默认情况下,禁用了生产者流控制之后,发送给速度慢的消费者的消息会被存放到临时存储空间中,以便消息生产者和其他消息消费者可以尽可能快的运行。如图所示,broker可用的最大内存限制决定了在什么时候 消息会通过追加到消息游标的形式持久化到磁盘上。系统可用最大内存限制的作用范围是整个broker。这个限制应该要低于消息destination可用内存限制,以便在流控制之前起作用。(即,这个较小的值先起作用,则消息destination使用内存不会超过配置的限制值,因为这个值较大)

image-20210816215751308

尽管有一些因为存储消息而导致broker的性能损失,在禁用了生产者流控制之后,消息应用程序可以独立于最慢的消息消费者而运行。在理想情况下,消息消费者总是与消息生产者一样快速运行,这就给我们引入下一节中关 于消息消费者的优化.

默认情况下,当启用了生产者流控制后,当broker没有空间存放更多消息时,生产者发送消息的操作会被阻塞直到broker有足够空间存储消息。有两种方式调整该参数,使得broker获取更多存储消息空间之前,消息 生产者不会无限期实质性的挂起。

第一种调节消息生产者流控制的方式称为sendFailIfNoSpace

<systemUsage>  
    <systemUsage sendFailIfNoSpace="true">  
        <memoryUsage>  
            <memoryUsage limit="128 mb"/>  
        </memoryUsage>  
    </systemUsage>  
</systemUsage> 

sendFailIfNoSpace属性将控制权返还给消息生产者,在代理的消息存储已经不足而生产者仍然尝试发送操作时,通过在生产者客户端抛出异常来代替永久性的阻塞生产者的发送操作。这就允许生产者可以捕捉这个异常,然后等待 一段时间后,继续尝试发送操作。

第二个调节生产者流控制的属性开始于ActiveMQ的5.4.1版本.该属性名称为sendFailIfNoSpaceAfterTimeout:

<systemUsage>  
    <systemUsage sendFailIfNoSpaceAfterTimeout="5000">  
        <memoryUsage>  
            <memoryUsage limit="128 mb"/>  
        </memoryUsage>  
    </systemUsage>  
</systemUsage>  

sendFailIfNoSpaceAfterTimeout与前面那个属性稍有不同。配置了该属性后,在等待配置的时间后,如果broker端 依然没有足够的空间存储消息,则会在客户端客户端发送消息时抛出异常。

优化Consumer

为了最大限度的提升应用程序的性能,你必须关注所有影响性能的因素。到目前为止,消息消费者在整个ActiveMQ系统的性能表现中都扮演着举足轻重的角色。通常,消息消费者必须要尽量以2倍于消息生产者的速度运行,因为消费者还要通知broker消息已经被处理了。

​ 通常,ActiveMQ broker 会通过消费者连接尽可能快的发送消息。通常情况下,一旦消息通过ActiveMQ broker的Transport发送完成之后,消息就加入了与消费者关联的session队列中,并等待分发。在下一节中,我们将解释消息发送给消费者的速度为何可控以及如何控制,同时还将阐述如何调整这个消息发送速率以获取更好的吞吐量.

预获取限制(Prefetch limit)

ActiveMQ使用一种基于推送的模式来投递消息,将broker收到的消息投递到consumer。为了防止消费者耗尽内存,有一个参数(prefetch limit)可以限制broker在等待消费者应答消息已被应用程序处理之前可以发送给消费者的消息数量。在消费者内部,从Transport上接收的消息会被投递并放置于一个和消费者 session关联的内部队列中。

image-20210816223617976

消费者连接会在内部将分发过来的消息队列化。这个内部的消息队列的尺寸加上尚未发送回执 给broker的消息 ( 这些消息已经被消费者接收了但是还没有通知broker消息已被消费 ) 的数量之和受到消费者的prefetchlimit参数限制。通常,这个prefetchlimit参数设置的越大,消费者运行的越快。

但是对于消息队列来说,设置这个限制并非是最理想方案,因为你可能希望消息会被平均的分发给一个队列上的所有消费者。这种情况下,当prefetchlimit设置的很大时,处理速度较慢的消费者可能会累积待处理的消息。而这些消息却不能被更快的消费者处理。这种情况下,设置较低的prefetchlimit值可能会更适合。如果prefetchlimit值设置为0,消息消费者会主动从broker拉取消息并且 不允许broker推送任何消息到消费者.

对于不同种类的消费者而言有不同的prefetch limit默认值:

  • 队列消费者的prefetch limit默认值为1000
  • 队列浏览消费者的prefetch limit默认值为500
  • 持久化topic消费者的prefetch limit默认值为100
  • 非持久化topic的prefetch limit默认值为32766

prefetch limit值是消息消费者等待接收的消息的数量而不是内存值大小.可以通过设置 ActiveMQConnectionFactory的相关属性值值来设置prefetch limit,如下代码所示:

ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();  
Properties props = new Properties();  
props.setProperty("prefetchPolicy.queuePrefetch", "1000");  
props.setProperty("prefetchPolicy.queueBrowserPrefetch", "500");  
props.setProperty("prefetchPolicy.durableTopicPrefetch", "100");  
props.setProperty("prefetchPolicy.topicPrefetch", "32766");   
cf.setProperties(props);  

或者,在创建一个消息destination时,传递prefetch limit 参数作为消息destination的属性:

Queue queue = new ActiveMQQueue("TEST.QUEUE?consumer.prefetchSize=10");   //问号分隔
MessageConsumer consumer = session.createConsumer(queue);  

使用Prefetch limit是一种简单的提升性能机制,但是需要谨慎使用。对于队列来说,应该考虑程序中是否有比较慢的消费者,而对于主题来说,你需要考虑在消息被分发之前队列 可以使用的客户端上的最大内存是多少.

控制消息分发给消费者的速率仅仅是消费者调优的一部分。一旦消息到达消费者的Transport之后,消息分发到消费者时使用的方法以及消费者用来将消息已被处理的确认发送给broker时使用的选项 就成为影响性能的重要组成部分。

消息的投递和确认

使用javax.jms.MessageListener.onMessage()来分发消息明显比使用 javax.jms.MessageConsumer.receive()要快。如果MessageConsumer没有设置MessageListener则该消费者的消息会分发到队列中然后等待调用receive()方法。不仅维护消费者内部队列的代价是昂贵的,而且应用程序线程不断的调用receive()切换应用程序上下文的代价也是高昂的.

因为ActiveMQ broker需要保存一个记录以表明当前有多少消息已被消费来维护消费者 内部的prefetch limit,MessageConsumer必须为每一个消费的消息 发送 消息确认。如果使用了事务, 当调用Session.commit()方法是会发送 消息确认 ,但是假如使用auto-acknowledgment模式 则每个消息处理完成后都会 单独 发送消息确认.

有一些优化选项专门用于发送消息确认给broker,当使用DUPS_OK_ACKNOWLEDGE session确认模式时, 这些优化选项可以显著的改善性能.另外,你可以设置ActiveMQ ConnectionFactory的optimizeAcknowledge属性,通过给消费者一个提示信息以便批量发送消息确认信息.

ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();   
cf.setOptimizeAcknowledge(true);  

当在一个session中使用optimizeAcknowledgeDUPS_OK_ACKNOWLEDGE确认模式时,消费者 只发送一个消息告知ActiveMQ broker一批消息已经完成处理.这样消息消费者要做的工作就减少了,便于消费者尽可能快的处理消息.

下面的列出了ACK的不同选项以及使用这些选项后消费者发送ACK给ActiveMQ broker的频率.

ACK模式发送ACK说明
Session.SESSION_TRANSACTED使用 Session.commit()方法批量确认这是消息消费的一种可靠方式,并且性能很好,允许消息一次提交中处理多个消息.
Session.CLIENT_ACKNOWLEDGE当一个消息确认了 则 所有消息都确认在确认之前可以使消费者消费大量的消息
Session.AUTO_ACKNOWLEDG每个消息处理完成后自动发送默认的消息确认机制。这种方式会比较慢,消息确认到代理.
Session.DUPS_OK_ACKNOWLEDGE允许一个ACK 确认一批 消息被消费。当消费者收到的消息达到prefetch limit的**50%**时,即发送ACK给broker.这消息处理中最快的标准的消息确认方式.
ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE每处理一个消息就发送一次确认最大限度的允许控制每个消息 单独 被确认,但是会很慢.
optimizeAcknowledge允许一个ACK 确认一批 消息被消费。与Session.AUTO_ACKNOWLEDGE一同起作用,在消费者处理完的消息占到prefetch limit65% 时发送ACK.使用这种模式可以以最快的方式处理消息.

单独确认每个消息的缺点是:不管消息消费者以任何理由失去了与ActiveMQ broker连接,那么 消息应用程序可能会收到重复的消息。但是,对于要求快速处理且不关心消息是否重复的 应用程序(比如实时的数据源)来说,推荐使用optimizeAcknowledge模式.

ActiveMQ的消息消费者包含重复消息侦测机制,可以最大限度的降低收到重复消息的风险.

异步分发

每个session都维护一个内部的队列,存储即将被分发到各自的消费者的消息。内部消息队列以及与之关联的用于发送消息到消息消费者的线程的使用,可能会给消息处理增加额外开销。

可以禁用ActiveMQ连接工厂的alwaysSessionAsync属性来停用上述消息队列和消息分发线程.这种设置允许消息直接从Transport发送到消息消费者。

ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();  
cf.setAlwaysSessionAsync(false);  

停用asynchronous允许消息直接发送到session内部的队列并由session负责进一步分发

image-20210817104642397

实践

让我们通过一个例子来看看如何综合使用前面介绍的性能调优方法.我们将用程序模拟一个 实时的数据源,该程序中消息生产者和一个嵌入式broker部署在一起,同时使用消息消费者 监听远程的消息.

我们将阐如何述使用一个嵌入式broker来减少将消息发送到ActiveMQ代理的开销.我们还将调整 消息消费者的一些选项来降低消息的拷贝.嵌入式broker将被配制成禁用流控制并且使用内存 限制以允许broker快速处理消息流.

最后,消息消费者将会配置成 直接通过 分发方式,同时配置一个高prefetch limit值以及配置 优化过的消息确认模式.

image-20210817105119493

首先我们设置一个嵌入式broker,设置其可用内存限制为一个合理的值(64M),为每一个消息destination设置可用内存限制,并且停用消息生产者流控制。

如下代码所示,使用默认的PolicyEntry设置broker的消息destination策略。PolicyEntry保存了ActiveMQ broker的消息destination的相关配置信息。可以为每一个消息destination 单独 设置策略,也可以使用通配符将一个策略应用到多个配置通配符的消息destination(比如,名称为foo.>PolicyEntry将仅应用到名称以foo开头的消息destination).在我们的例子中,我们仅仅设置内存限制以及禁用生产者流控制.为了简单起见,我们仅仅配置了默认的PolicyEntry,该PolicyEntry将应用到所有消息destination.

import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
...
//By default a broker always listens on vm://<broker name>
BrokerService broker = new BrokerService();
broker.setBrokerName("fast");
broker.getSystemUsage().getMemoryUsage().setLimit(64*1024*1024);
//Set the Destination policies
PolicyEntry policy = new PolicyEntry();
//set a memory limit of 4mb for each destination
policy.setMemoryLimit(4 * 1024 *1024);
//disable flow control
policy.setProducerFlowControl(false);
PolicyMap pMap = new PolicyMap();
//configure the policy
pMap.setDefaultEntry(policy);
broker.setDestinationPolicy(pMap);
broker.addConnector("tcp://localhost:61616");
broker.start();

上面代码创建的代理使用了一个唯一的名称fast,因此与broker同处于一个虚拟机内的数据源生产者可以 使用VM Transport 绑定到该broker.

除了使用了嵌入式broker,消息生产者也是直连的,除了将其配置成发送非持久化消息并且不使用消息拷贝。

//tell the connection factory to connect to an embedded broker named fast.
//if the embedded broker isn't already created, the connection factory will
//create a default embedded broker named "fast"
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://fast");
//disable message copying
cf.setCopyMessageOnSend(false);
Connection connection = cf.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("test.topic");
final MessageProducer producer = session.createProducer(topic);
//send non-persistent messages
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
for (int i =0; i < 1000000;i++) {
	TextMessage message = session.createTextMessage("Test:"+i);
	producer.send(message);
}

消息消费者被配置成直通方式(禁用了异步session分发)并使用了javax.jms.MessageListener.消息消费者使用的消息确认模式为optimizeAcknowledge,以便能尽可能快的处理消息

//set up the connection factory to connect the the producer's embedded broker
//using tcp://
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover://(tcp://localhost:61616)");
//configure the factory to create connections
//with straight through processing of messages
//and optimized acknowledgement
cf.setAlwaysSessionAsync(false);
cf.setOptimizeAcknowledge(true);
Connection connection = cf.createConnection();
connection.start();
//use the default session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//set the prefetch size for topics - by parsing a configuration parameter in
// the name of the topic
Topic topic = session.createTopic("test.topic?consumer.prefetchSize=32766");
MessageConsumer consumer = session.createConsumer(topic);
//setup a counter - so we don't print every message
final AtomicInteger count = new AtomicInteger();
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
    TextMessage textMessage = (TextMessage)message;
        try {
          //only print every 10,000th message
          if (count.incrementAndGet()%10000==0)
              System.err.println("Got = " + textMessage.getText());
        } catch (JMSException e) {
        e.printStackTrace();
        }
    }
    }
 );

管理和监控ActiveMQ

Activemq基于标准JMS API实现了不少功能用于管理和监控。即可以通过编程方式,也可以通过管理工具。

APIs

访问broker的最自然的方式是通过JMX APIs。

应用除了消费消息,也可以用于运行时监控,包括以下几个任务:

  • 获取broker的统计信息
  • 新增或删除connector。
  • 调整broker的配置。

JMX

从5.9开始移除了。

Activemq通过 com.sun.management.jmxremote 启用或禁用JMX。

linux:

if [ -z "$SUNJMX" ] ; then
	#SUNJMX="-Dcom.sun.management.jmxremote.port=1099 -Dcom.sun.management.jmxremote.authenticate=-Dcom.sun.management.jmxremote.ssl=false"
	SUNJMX="-Dcom.sun.management.jmxremote"
fi

windows:

if "%SUNJMX%" == "" set SUNJMX=-Dcom.sun.management.jmxremote
	REM set SUNJMX=-Dcom.sun.management.jmxremote.port=1099 -Dcom.sun.management.jmxremote.authenticate=-Dcom.sun.management.jmxremote.ssl=false

Activemq配置启用或禁用远程JMX。通过 useJmx 属性

<broker xmlns="http://activemq.org/config/1.0" useJmx="true"
brokerName="localhost"
dataDirectory="${activemq.base}/data">
...
</broker>
暴露MBean

默认,MBean是暴露的,在配置文件中有几个属性用于附加功能。

<broker xmlns="http://activemq.org/config/1.0" useJmx="true"
brokerName="localhost"
dataDirectory="${activemq.base}/data">
    <managementContext>
    		<managementContext connectorPort="2011" jmxDomainName="my-broker" />
    </managementContext>
    <!-- The transport connectors ActiveMQ will listen to -->
    <transportConnectors>
    		<transportConnector name="openwire" uri="tcp://localhost:61616" />
    </transportConnectors>
</broker>

useJmx:启用或禁用JMX,默认为true。

默认Activemq启用一个connector,监听端口1099,用于远程管理,并且使用域名:org.apache.activemq。可以在managementContext元素更改配置。

使用JMX APIs
public class Stats {
    public static void main(String[] args) throws Exception {
        JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:2011/jmxrmi");
        JMXConnector connector = JMXConnectorFactory.connect(url, null);
        connector.connect();
        //creates a connection to the MBean server
        MBeanServerConnection connection = connector.getMBeanServerConnection();
        ObjectName name = new ObjectName("my-broker:BrokerName=localhost,Type=Broker");
      // queries for the broker MBean
        BrokerViewMBean mbean = (BrokerViewMBean) MBeanServerInvocationHandler.newProxyInstance(connection, name, BrokerViewMBean.class, true);·
        System.out.println("Statistics for broker " + mbean.getBrokerId() + " - " + mbean.getBrokerName());
        System.out.println("\n-----------------\n");
        System.out.println("Total message count: " + mbean.getTotalMessageCount() + "\n");
        System.out.println("Total number of consumers: " + mbean.getTotalConsumerCount());
        //grabs some broker statistics from the MBean
        System.out.println("Total number of Queues: " + mbean.getQueues().length);
        for (ObjectName queueName : mbean.getQueues()) {
            QueueViewMBean queueMbean = (QueueViewMBean) MBeanServerInvocationHandler.newProxyInstance(connection, queueName,
            QueueViewMBean.class, true);
            System.out.println("\n-----------------\n");
            System.out.println("Statistics for queue " + queueMbean.getName());
            System.out.println("Size: " + queueMbean.getQueueSize());
          //grabs some queue statistics from the queue MBeans
            System.out.println("Number of consumers: " + queueMbean.getConsumerCount());
        }
    }
}

MBean通过 名称引用,格式为:<jmx domain name>:BrokerName=<name of the broker>,Type=Broker

ActiveMQ 的MBean的对象名称默认为:org.apache.activemq:BrokerName=localhost,Type=Broker

MBean的方法:

getTotalMessageCount():获取消息的总数量

getTotalConsumerCount():获取消费者的数量。

getQueues().length():队列的数量

getQueues():所有队列的名称,名字与broker的 名字格式类似,例如:my-broker:BrokerName=localhost,Type=Queue,Destination=JOBS.suspend

高级 JMX 配置

com.sun.management.jmxremote.port

com.sun.management.jmxremote.authentication

com.sun.management.jmxremote.ssl

java.rmi.server.hostname

配置JMX 用户名密码

$JAVA_HOME/jre/lib/management/目录下:

jmxremote.access:定义角色和权限

monitorRole readonly  //角色和权限
controlRole readwrite

jmxremote.passwordjmxremote.password.template:映射角色和密码,从template copy 一个文件,

monitorRole QED   //角色和密码
controlRole R&D

通知消息(Advisory Messages)

实现了ActiveMQ的broker上各种操作的记录跟踪和通知,实时的知道broker上:

  1. 创建或销毁了连接,
  2. 添加或删除了生存者或消费者,
  3. 添加或删除了主题或队列,
  4. 有消息发送和接收,
  5. 什么时候有慢消费者,
  6. 什么时候有快生产者
  7. 什么时候什么消息被丢弃
  8. 什么时候broker被添加到集群(主从或是网络连接)

这个机制是ActiveMQ对JMS协议的重要补充,也是基于JMS实现的ActiveMQ的可管理性的一部分。多个ActiveMQ的相互协调和互操作的基础设置。

通知消息投递到以 ActiveMQ.Advisory前缀命名的topic。

启用通知消息:

<broker xmlns="http://activemq.org/config/1.0" useJmx="true"
brokerName="localhost" dataDirectory="${activemq.base}/data"
advisorySupport="true">  <!-- 默认值:true,表示启用 -->
  <destinationPolicy>
      <policyMap>
          <policyEntries>
              <policyEntry topic=">"
              sendAdvisoryIfNoConsumers="true"/> <!-- destination没有消费者则发送通知消息,每条消息都有通知消息 -->
              </policyEntries>
      </policyMap>
  </destinationPolicy>
  <!-- The transport connectors ActiveMQ will listen to -->
  <transportConnectors>
  		<transportConnector name="openwire" uri="tcp://localhost:61616" />
  </transportConnectors>
</broker>

示例:

Destination advisoryDestination = AdvisorySupport.getProducerAdvisoryTopic(destination)
MessageConsumer consumer = session.createConsumer(advisoryDestination);
consumer.setMessageListener(this);

public void onMessage(Message msg){
    if(msg instanceof ActiveMQMessage) {
        try {
             ActiveMQMessage aMsg = (ActiveMQMessage)msg;
             ProducerInfo prod = (ProducerInfo) aMsg.getDataStructure();
        } 
        catch(JMSException e) {
            log.error("Failed to process message: " + msg);
        }
    }
}

Tools

命令行工具

#start 
$ cd apache-activemq-5.3.0
$ ./bin/activemq

#start
$ ./bin/activemq-admin start

#stop
$ ./bin/activemq-admin stop
$ ./bin/activemq-admin stop --jmxurl service:jmx:rmi:///jndi/rmi://localhost:2011/jmxrmi --jmxdomain my-broker

#获取所有 broker
./bin/activemq-admin list

#查询broker
./bin/activemq-admin query -QQueue=*

#browse destination
./bin/activemq-admin browse --amqurl tcp://localhost:61616 JOBS.delete

Command Agent

JConsole

Web Console

Logging

配置文件:conf/log4j.properties

log4j.rootLogger=INFO, stdout, out
log4j.logger.org.apache.activemq.spring=WARN
log4j.logger.org.springframework=WARN
log4j.logger.org.apache.xbean.spring=WARN

客户端配置:

log4j.rootLogger=INFO, out, stdout
log4j.logger.org.apache.activemq.spring=WARN
log4j.logger.org.springframework=WARN
log4j.logger.org.apache.xbean.spring=WARN
log4j.logger.org.apache.activemq.transport.failover.FailoverTransport=DEBUG
log4j.logger.org.apache.activemq.transport.TransportLogger=DEBUG

Logging Interceptor:

<plugins>
<loggingBrokerPlugin/>
</plugins>

参考

https://activemq.apache.org/features

https://activemq.apache.org/jmx.html#JMX

prefetch:https://activemq.apache.org/what-is-the-prefetch-limit-for

通知消息:https://activemq.apache.org/advisory-message

https://activemq.apache.org/mdc-logging


版权声明:本文为demon7552003原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。