ActiveMQ学习笔记-03——队列Queue案例
参考:【尚硅谷ActiveMQ教程(MQ消息中间件快速入门)
消息生产者编码
public class JmsProduce { public static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616"; public static final String QUEUE_NAME = "queue01"; public static void main(String[] args) throws JMSException { //1 创建连接工厂 按照给定的URL地址,采用默认用户名密码 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); //2 通过连接工厂,获得连接connection Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); //3 创建会话session //两个参数(事务/签收) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //4 创建目的地(具体是队列还是主题) Queue queue = session.createQueue(QUEUE_NAME); //5 创建消息的生产者 MessageProducer messageProducer = session.createProducer(queue); //6 通过使用messageProducer生产3条消息发送到MQ的队列里来 for (int i = 1; i <= 3; i++) { //7 创建消息 TextMessage textMessage = session.createTextMessage("msg---" + i); //8 通过messageProducer发送给MQ messageProducer.send(textMessage); } //9 关闭资源 messageProducer.close(); session.close(); connection.close(); System.out.println("----消息发布完成----"); } }
运行
说明
英文 翻译 说明 Number Of Pending Message 等待消息的数量 这个是未出队列的数量。公式=总接受量-总出队列数 Number of Consumers 消费者数量 消费者端端消费者数量 Messages Enqueued 进队消息数 进入队列的总数量,包括出队列的。这个数量只增不减 Messages Dequeued 出队消息数 可以理解为是消费者消费掉的数量
消息消费者编码
通过同步阻塞方法消费消息
public class JmsConsumer { public static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616"; public static final String QUEUE_NAME = "queue01"; public static void main(String[] args) throws JMSException { //1 创建连接工厂 按照给定的URL地址,采用默认用户名密码 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); //2 通过连接工厂,获得连接connection Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); //3 创建会话session //两个参数(事务/签收) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //4 创建目的地(具体是队列还是主题) Queue queue = session.createQueue(QUEUE_NAME); //5 创建消费者 MessageConsumer messageConsumer = session.createConsumer(queue); /** * 同步阻塞方法(receive()) * 订阅者或接收者调用messageConsumer的receive()方法来接受消息, * receive()方法能在接收到消息之前(或超时之前)将一直阻塞。 */ while (true) { TextMessage textMessage = (TextMessage) messageConsumer.receive(); if (null != textMessage) { System.out.println("----消费者接收到消息" + textMessage.getText()); } else { break; } } messageConsumer.close(); session.close(); connection.close(); } }
通过监听的方式消费消息
public class JmsConsumer { public static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616"; public static final String QUEUE_NAME = "queue01"; public static Logger logger = LoggerFactory.getLogger(JmsConsumer.class); public static void main(String[] args) throws JMSException, IOException { //1 创建连接工厂 按照给定的URL地址,采用默认用户名密码 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); //2 通过连接工厂,获得连接connection Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); //3 创建会话session //两个参数(事务/签收) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //4 创建目的地(具体是队列还是主题) Queue queue = session.createQueue(QUEUE_NAME); //5 创建消费者 MessageConsumer messageConsumer = session.createConsumer(queue); //通过监听的方式消费消息 messageConsumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { if (null != message && message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; try { System.out.println("----消费者接收到消息" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } } }); System.in.read(); messageConsumer.close(); session.close(); connection.close(); } }
运行
说明
先生产,只启动1号消费者。问题:1号消费者能消费消息吗?
Yes!
先生产,先启动1号消费者再启动1号消费者。问题:2号消费者能消费消息吗?
No!
先启动2个消费者,再生产6条消息,问题:消费情况如何
一人一半!
版权声明:本文为qq_24191395原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。