ActiveMQ拦截器使用和原理
在ActiveMQ中使用拦截器和过滤器的使用多采用插件的形式实现,继承BrokerFilter实现BrokerPlugin接口类。BrokerFilter实质一个实现Broker接口的类。
publicinterfaceBrokerPlugin {
/**
*Installsthepluginintotheinterceptorchainofthebroker,returningthenew
*interceptedbrokertouse.
*/
Broker installPlugin(Broker broker)throwsException;
}
1. 日志拦截器
日志拦截器是Broker的一个拦截器,默认的日志级别为INFO。你如你想改变日志的级别。这个日志拦截器支持Commons-log和Log4j两种日志。
Attribute | Description | Default Value |
logAll | Log all Events | false |
logMessageEvents | Events related with producing, consuming and dispatching messages | false |
logConnectionEvents | Events related to connections and sessions | true |
logTransactionEvents | Events related to transaction handling | false |
logConsumerEvents | Events related to consuming messages | false |
logProducerEvents | Events related to producing messages | false |
logInternalEvents | Events normally not of Interest for users like failover, querying internal objects etc | false |
默认连接时间日志是开启,其他均未开启。通过activemq开启相关的日志。
<plugins>
<!-- lets enable detailed logging in the broker but ignore ConnectionEvents -->
<loggingBrokerPluginlogAll="true"logConnectionEvents="false"/>
</plugins>
2. 统计拦截器
从ActiveMQ5.3开始,StatisticsPlugin插件被用作检测Broker中统计的插件。注意消息必须包含replyTo的消息头,如果是在JMS那么需要采用jmsReplyTo消息头,否则消息将被统计忽略。ReplyTo消息头包含了你想检查统计的消息。统计消息是一个MapMessage.
检查Broker的信息,仅仅需要一个名称为ActiveMQ.Statistics.Broker并且有一个replyTo的消息头的Destination。为了检测所有destination,你需要一个名称为ActiveMQ.Statistics.Destination.<destination-name>或者ActiveMQ.Statistics.Destination.<wildcard-expression>并且有一个replyTo的消息头。如果许多Destination匹配相关的模糊匹配表达式,那么统计的消息将被发送到replyTo的Destination.
<plugins>
<!-- lets enable detailed logging in the broker but ignore ConnectionEvents -->
<statisticsBrokerPlugin/>
</plugins>
Statistics插件发送消息到特殊的目标。
下面是一个统计Broker的消息
Queue replyTo = session.createTemporaryQueue();
MessageConsumer consumer = session.createConsumer(replyTo);
String queueName ="ActiveMQ.Statistics.Broker";
Queue testQueue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(testQueue);
Message msg = session.createMessage();
msg.setJMSReplyTo(replyTo);
producer.send(msg);
MapMessage reply = (MapMessage) consumer.receive();
assertNotNull(reply);
for(Enumeration e = reply.getMapNames();e.hasMoreElements();) {
String name = e.nextElement().toString();
System.out.println(name +"="+ reply.getObject(name));
}
查询Destination的统计信息
Queue replyTo = session.createTemporaryQueue();
MessageConsumer consumer = session.createConsumer(replyTo);
Queue testQueue = session.createQueue("TEST.FOO");
MessageProducer producer = session.createProducer(null);
String queueName ="ActiveMQ.Statistics.Destination."+ testQueue.getQueueName()
Queue query = session.createQueue(queueName);
Message msg = session.createMessage();
producer.send(testQueue, msg)
msg.setJMSReplyTo(replyTo);
producer.send(query, msg);
MapMessage reply = (MapMessage) consumer.receive();
assertNotNull(reply);
assertTrue(reply.getMapNames().hasMoreElements());
for(Enumeration e = reply.getMapNames();e.hasMoreElements();) {
String name = e.nextElement().toString();
System.err.println(name +"="+ reply.getObject(name));
}
ActiveMQ性能优化
1、目标策略
在节点destinationPolicy配置策略,可以对单个或者所有的主题和队列进行设置,使用流量监控,当消息达到memoryLimit的时候,ActiveMQ会减慢消息的产生甚至阻塞,destinationPolicy的配置如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | <destinationPolicy> <policyMap> <policyEntries> < policyEntry topic = ">" producerFlowControl = "true" memoryLimit = "1mb" > <pendingSubscriberPolicy> < vmCursor / > < / pendingSubscriberPolicy > < / policyEntry > < policyEntry queue = ">" producerFlowControl = "true" memoryLimit = "1mb" > < ! -- Use VM cursor for better latency For more information , see : http : //activemq.apache.org/message-cursors.html <pendingQueuePolicy> < vmQueueCursor / > < / pendingQueuePolicy > -- > < / policyEntry > < / policyEntries > < / policyMap > < / destinationPolicy > |
producerFlowControl表示是否监控流量,默认为true,如果设置为false,消息就会存在磁盘中以防止内存溢出;memoryLimit表示在producerFlowControl=”true”的情况下,消息存储在内存中最大量,当消息达到这个值时,ActiveMQ会减慢消息的产生甚至阻塞。policyEntry的属性参考:http://activemq.apache.org/per-destination-policies.html
当producer发送的持久化消息到达broker之后,broker首先会把它保存在持久存储中。接下来,如果发现当前有活跃的consumer,如果这个consumer消费消息的速度能跟上producer生产消息的速度,那么ActiveMQ会直接把消息传递给broker内部跟这个consumer关联的dispatch queue;如果当前没有活跃的consumer或者consumer消费消息的速度跟不上producer生产消息的速度,那么ActiveMQ会使用Pending Message Cursors保存对消息的引用。在需要的时候,Pending Message Cursors把消息引用传递给broker内部跟这个consumer关联的dispatch queue。以下是两种Pending Message Cursors:
VM Cursor:在内存中保存消息的引用。
File Cursor:首先在内存中保存消息的引用,如果内存使用量达到上限,那么会把消息引用保存到临时文件中。
在缺省情况下,ActiveMQ 会根据使用的Message Store来决定使用何种类型的Message Cursors,但是你可以根据destination来配置Message Cursors。
对于topic,可以使用的pendingSubscriberPolicy 有vmCursor和fileCursor。可以使用的PendingDurableSubscriberMessageStoragePolicy有
vmDurableCursor 和 fileDurableSubscriberCursor;对于queue,可以使用的pendingQueuePolicy有vmQueueCursor 和 fileQueueCursor。
Message Cursors的使用参考:http://activemq.apache.org/message-cursors.html
2、存储设置
设置消息在内存、磁盘中存储的大小,配置如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 | <systemUsage> <systemUsage> <memoryUsage> < memoryUsage limit = "20 mb" / > < / memoryUsage > <storeUsage> < storeUsage limit = "1 gb" / > < / storeUsage > <tempUsage> < tempUsage limit = "100 mb" / > < / tempUsage > < / systemUsage > < / systemUsage > |
memoryUsage表示ActiveMQ使用的内存,这个值要大于等于destinationPolicy中设置的所有队列的内存之和。
storeUsage表示持久化存储文件的大小。
tempUsage表示非持久化消息存储的临时内存大小。