rabbitmq.properties
#本地环境配置项-开始###################### #rabbitmq.host=127.0.0.1 #rabbitmq.port=5672 #rabbitmq.username=baseup #rabbitmq.password=wukong@123 #rabbitmq.vhost=/baseup #本地环境配置项-结束####################### #正式环境配置项-开始####################### rabbitmq.host=139.159.138.142 rabbitmq.port=5672 rabbitmq.username=baseup rabbitmq.password=whkj@2019 rabbitmq.vhost=/baseup #正式环境配置项-结束#######################
baseup-rabbitmq.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd"> <!-- 定义连接工厂 --> <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}" virtual-host="${rabbitmq.vhost}" /> <!-- MQ的管理,包括队列、交换器等--> <rabbit:admin id="rabbitAdmin" connection-factory="connectionFactory" /> <!-- 定义交换机 --> <rabbit:direct-exchange name="baseup_exchange_topic01" auto-declare="true" /> <rabbit:direct-exchange name="baseup_exchange_topic02" auto-declare="true" /> <!-- 定义Rabbit模板,指定连接工厂以及定义exchange --> <rabbit:template id="rabbitTemplate01" connection-factory="connectionFactory" exchange="baseup_exchange_topic01" /> <rabbit:template id="rabbitTemplate02" connection-factory="connectionFactory" exchange="baseup_exchange_topic02" /> <!-- 定义队列,自动声明 --> <rabbit:queue name="topic01_queue01" auto-declare="true" declared-by="rabbitAdmin" /> <rabbit:queue name="topic01_queue02" auto-declare="true" declared-by="rabbitAdmin" /> <rabbit:queue name="topic02_queue01" auto-declare="true" declared-by="rabbitAdmin" /> <rabbit:queue name="topic02_queue02" auto-declare="true" declared-by="rabbitAdmin" /> <!-- 定义交换机绑定队列,自动声明 --> <rabbit:direct-exchange name="baseup_exchange_topic01" auto-declare="true" declared-by="rabbitAdmin"> <rabbit:bindings> <rabbit:binding queue="topic01_queue01" key="wh.public" /> <rabbit:binding queue="topic01_queue02" key="wh.public.info" /> </rabbit:bindings> </rabbit:direct-exchange> <!-- 定义交换机绑定队列,自动声明 --> <rabbit:direct-exchange name="baseup_exchange_topic02" auto-declare="true" declared-by="rabbitAdmin"> <rabbit:bindings> <rabbit:binding queue="topic02_queue01" key="k2_queue01" /> <rabbit:binding queue="topic02_queue02" key="k2_queue01" /> </rabbit:bindings> </rabbit:direct-exchange> <!-- 队列监听 --> <rabbit:listener-container connection-factory="connectionFactory"> <rabbit:listener ref="user01" queues="topic01_queue01" /> <rabbit:listener ref="user02" queues="topic01_queue02" /> </rabbit:listener-container> <rabbit:listener-container connection-factory="connectionFactory"> <rabbit:listener ref="customer01" queues="topic02_queue01" /> <rabbit:listener ref="customer02" queues="topic02_queue02" /> </rabbit:listener-container> <!--定影消费者bean--> <bean id="user01" class="com.tyh.baseup.web.server.mq.consumer.User01" /> <bean id="user02" class="com.tyh.baseup.web.server.mq.consumer.User02" /> <bean id="customer01" class="com.tyh.baseup.web.server.mq.consumer.Customer01" /> <bean id="customer02" class="com.tyh.baseup.web.server.mq.consumer.Customer02" /> </beans>
User01.java
package com.tyh.baseup.web.server.mq.consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; import java.io.UnsupportedEncodingException; /** * 消费者1 * 交换机为- 》direct: 代表 "路由模式" 《可选择性接收消息, 指定路由Key》 */ public class User01 implements MessageListener { private Logger logger = LoggerFactory.getLogger(User01.class); @Override public void onMessage(Message message) { logger.info("消费者User01->>开始######################################"); try { logger.info("消费者User01->>Body:" + message.getBody()); logger.info("消费者User01->>Body.转换:" + new String(message.getBody(),"UTF-8")); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } logger.info("消费者User01->>toString:" + message.toString()); logger.info("消费者User01->>MessageProperties:" + message.getMessageProperties()); logger.info("消费者User01->>MessageProperties.AppId:" + message.getMessageProperties().getAppId()); logger.info("消费者User01->>MessageProperties.ClusterId:" + message.getMessageProperties().getClusterId()); logger.info("消费者User01->>MessageProperties.ConsumerQueue:" + message.getMessageProperties().getConsumerQueue()); logger.info("消费者User01->>MessageProperties.ConsumerTag:" + message.getMessageProperties().getConsumerTag()); logger.info("消费者User01->>MessageProperties.ContentEncoding:" + message.getMessageProperties().getContentEncoding()); logger.info("消费者User01->>MessageProperties.ContentType:" + message.getMessageProperties().getContentType()); logger.info("消费者User01->>MessageProperties.ContentLength:" + message.getMessageProperties().getContentLength()); logger.info("消费者User01->>MessageProperties.CorrelationIdString:" + message.getMessageProperties().getCorrelationIdString()); logger.info("消费者User01->>MessageProperties.Delay:" + message.getMessageProperties().getDelay()); logger.info("消费者User01->>MessageProperties.DeliveryMode:" + message.getMessageProperties().getDeliveryMode()); logger.info("消费者User01->>MessageProperties.DeliveryTag:" + message.getMessageProperties().getDeliveryTag()); logger.info("消费者User01->>MessageProperties.Expiration:" + message.getMessageProperties().getExpiration()); logger.info("消费者User01->>MessageProperties.Headers:" + message.getMessageProperties().getHeaders()); logger.info("消费者User01->>MessageProperties.InferredArgumentType:" + message.getMessageProperties().getInferredArgumentType()); logger.info("消费者User01->>MessageProperties.ReplyTo:" + message.getMessageProperties().getReplyTo()); logger.info("消费者User01->>MessageProperties.ReceivedExchange:" + message.getMessageProperties().getReceivedExchange()); logger.info("消费者User01->>MessageProperties.ReceivedRoutingKey:" + message.getMessageProperties().getReceivedRoutingKey()); logger.info("消费者User01->>MessageProperties.ReceivedUserId:" + message.getMessageProperties().getReceivedUserId()); logger.info("消费者User01->>MessageProperties.ReceivedDelay:" + message.getMessageProperties().getReceivedDelay()); logger.info("消费者User01->>MessageProperties.ReceivedDeliveryMode:" + message.getMessageProperties().getReceivedDeliveryMode()); logger.info("消费者User01->>MessageProperties.Redelivered:" + message.getMessageProperties().getRedelivered()); logger.info("消费者User01->>MessageProperties.ReplyToAddress:" + message.getMessageProperties().getReplyToAddress()); logger.info("消费者User01->>MessageProperties.Type:" + message.getMessageProperties().getType()); logger.info("消费者User01->>MessageProperties.TargetBean:" + message.getMessageProperties().getTargetBean()); logger.info("消费者User01->>MessageProperties.TargetMethod:" + message.getMessageProperties().getTargetMethod()); logger.info("消费者User01->>MessageProperties.Timestamp:" + message.getMessageProperties().getTimestamp()); logger.info("消费者User01->>MessageProperties.UserId:" + message.getMessageProperties().getUserId()); logger.info("消费者User01->>MessageProperties.MessageId:" + message.getMessageProperties().getMessageId()); logger.info("消费者User01->>结束######################################"); } }
User02.java
package com.tyh.baseup.web.server.mq.consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; import java.io.UnsupportedEncodingException; /** * 消费者2 * 交换机为- 》direct: 代表 "路由模式" 《可选择性接收消息, 指定路由Key》 */ public class User02 implements MessageListener { private Logger logger = LoggerFactory.getLogger(User02.class); @Override public void onMessage(Message message) { logger.info("消费者User02->>开始######################################"); try { logger.info("消费者User02->>Body:" + message.getBody()); logger.info("消费者User02->>Body.转换:" + new String(message.getBody(),"UTF-8")); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } logger.info("消费者User02->>toString:" + message.toString()); logger.info("消费者User02->>MessageProperties:" + message.getMessageProperties()); logger.info("消费者User02->>MessageProperties.AppId:" + message.getMessageProperties().getAppId()); logger.info("消费者User02->>MessageProperties.ClusterId:" + message.getMessageProperties().getClusterId()); logger.info("消费者User02->>MessageProperties.ConsumerQueue:" + message.getMessageProperties().getConsumerQueue()); logger.info("消费者User02->>MessageProperties.ConsumerTag:" + message.getMessageProperties().getConsumerTag()); logger.info("消费者User02->>MessageProperties.ContentEncoding:" + message.getMessageProperties().getContentEncoding()); logger.info("消费者User02->>MessageProperties.ContentType:" + message.getMessageProperties().getContentType()); logger.info("消费者User02->>MessageProperties.ContentLength:" + message.getMessageProperties().getContentLength()); logger.info("消费者User02->>MessageProperties.CorrelationIdString:" + message.getMessageProperties().getCorrelationIdString()); logger.info("消费者User02->>MessageProperties.Delay:" + message.getMessageProperties().getDelay()); logger.info("消费者User02->>MessageProperties.DeliveryMode:" + message.getMessageProperties().getDeliveryMode()); logger.info("消费者User02->>MessageProperties.DeliveryTag:" + message.getMessageProperties().getDeliveryTag()); logger.info("消费者User02->>MessageProperties.Expiration:" + message.getMessageProperties().getExpiration()); logger.info("消费者User02->>MessageProperties.Headers:" + message.getMessageProperties().getHeaders()); logger.info("消费者User02->>MessageProperties.InferredArgumentType:" + message.getMessageProperties().getInferredArgumentType()); logger.info("消费者User02->>MessageProperties.ReplyTo:" + message.getMessageProperties().getReplyTo()); logger.info("消费者User02->>MessageProperties.ReceivedExchange:" + message.getMessageProperties().getReceivedExchange()); logger.info("消费者User02->>MessageProperties.ReceivedRoutingKey:" + message.getMessageProperties().getReceivedRoutingKey()); logger.info("消费者User02->>MessageProperties.ReceivedUserId:" + message.getMessageProperties().getReceivedUserId()); logger.info("消费者User02->>MessageProperties.ReceivedDelay:" + message.getMessageProperties().getReceivedDelay()); logger.info("消费者User02->>MessageProperties.ReceivedDeliveryMode:" + message.getMessageProperties().getReceivedDeliveryMode()); logger.info("消费者User02->>MessageProperties.Redelivered:" + message.getMessageProperties().getRedelivered()); logger.info("消费者User02->>MessageProperties.ReplyToAddress:" + message.getMessageProperties().getReplyToAddress()); logger.info("消费者User02->>MessageProperties.Type:" + message.getMessageProperties().getType()); logger.info("消费者User02->>MessageProperties.TargetBean:" + message.getMessageProperties().getTargetBean()); logger.info("消费者User02->>MessageProperties.TargetMethod:" + message.getMessageProperties().getTargetMethod()); logger.info("消费者User02->>MessageProperties.Timestamp:" + message.getMessageProperties().getTimestamp()); logger.info("消费者User02->>MessageProperties.UserId:" + message.getMessageProperties().getUserId()); logger.info("消费者User02->>MessageProperties.MessageId:" + message.getMessageProperties().getMessageId()); logger.info("消费者User02->>结束######################################"); } }
Customer01.java
package com.tyh.baseup.web.server.mq.consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; import java.io.UnsupportedEncodingException; /** * 消费者1 * 交换机为- 》topic: 代表 "通配符模式"《可选择性接收消息, 采用通配符模式 * 符号代表1个, # 符号代表一个或多个》 */ public class Customer01 implements MessageListener { private Logger logger = LoggerFactory.getLogger(Customer01.class); @Override public void onMessage(Message message) { logger.info("消费者Customer01->>开始######################################"); try { logger.info("消费者Customer01->>Body:" + message.getBody()); logger.info("消费者Customer01->>Body.转换:" + new String(message.getBody(),"UTF-8")); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } logger.info("消费者Customer01->>toString:" + message.toString()); logger.info("消费者Customer01->>MessageProperties:" + message.getMessageProperties()); logger.info("消费者Customer01->>MessageProperties.AppId:" + message.getMessageProperties().getAppId()); logger.info("消费者Customer01->>MessageProperties.ClusterId:" + message.getMessageProperties().getClusterId()); logger.info("消费者Customer01->>MessageProperties.ConsumerQueue:" + message.getMessageProperties().getConsumerQueue()); logger.info("消费者Customer01->>MessageProperties.ConsumerTag:" + message.getMessageProperties().getConsumerTag()); logger.info("消费者Customer01->>MessageProperties.ContentEncoding:" + message.getMessageProperties().getContentEncoding()); logger.info("消费者Customer01->>MessageProperties.ContentType:" + message.getMessageProperties().getContentType()); logger.info("消费者Customer01->>MessageProperties.ContentLength:" + message.getMessageProperties().getContentLength()); logger.info("消费者Customer01->>MessageProperties.CorrelationIdString:" + message.getMessageProperties().getCorrelationIdString()); logger.info("消费者Customer01->>MessageProperties.Delay:" + message.getMessageProperties().getDelay()); logger.info("消费者Customer01->>MessageProperties.DeliveryMode:" + message.getMessageProperties().getDeliveryMode()); logger.info("消费者Customer01->>MessageProperties.DeliveryTag:" + message.getMessageProperties().getDeliveryTag()); logger.info("消费者Customer01->>MessageProperties.Expiration:" + message.getMessageProperties().getExpiration()); logger.info("消费者Customer01->>MessageProperties.Headers:" + message.getMessageProperties().getHeaders()); logger.info("消费者Customer01->>MessageProperties.InferredArgumentType:" + message.getMessageProperties().getInferredArgumentType()); logger.info("消费者Customer01->>MessageProperties.ReplyTo:" + message.getMessageProperties().getReplyTo()); logger.info("消费者Customer01->>MessageProperties.ReceivedExchange:" + message.getMessageProperties().getReceivedExchange()); logger.info("消费者Customer01->>MessageProperties.ReceivedRoutingKey:" + message.getMessageProperties().getReceivedRoutingKey()); logger.info("消费者Customer01->>MessageProperties.ReceivedUserId:" + message.getMessageProperties().getReceivedUserId()); logger.info("消费者Customer01->>MessageProperties.ReceivedDelay:" + message.getMessageProperties().getReceivedDelay()); logger.info("消费者Customer01->>MessageProperties.ReceivedDeliveryMode:" + message.getMessageProperties().getReceivedDeliveryMode()); logger.info("消费者Customer01->>MessageProperties.Redelivered:" + message.getMessageProperties().getRedelivered()); logger.info("消费者Customer01->>MessageProperties.ReplyToAddress:" + message.getMessageProperties().getReplyToAddress()); logger.info("消费者Customer01->>MessageProperties.Type:" + message.getMessageProperties().getType()); logger.info("消费者Customer01->>MessageProperties.TargetBean:" + message.getMessageProperties().getTargetBean()); logger.info("消费者Customer01->>MessageProperties.TargetMethod:" + message.getMessageProperties().getTargetMethod()); logger.info("消费者Customer01->>MessageProperties.Timestamp:" + message.getMessageProperties().getTimestamp()); logger.info("消费者Customer01->>MessageProperties.UserId:" + message.getMessageProperties().getUserId()); logger.info("消费者Customer01->>MessageProperties.MessageId:" + message.getMessageProperties().getMessageId()); logger.info("消费者Customer01->>结束######################################"); } }
Customer02.java
package com.tyh.baseup.web.server.mq.consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; import java.io.UnsupportedEncodingException; /** * 消费者2 * 交换机为- 》topic: 代表 "通配符模式"《可选择性接收消息, 采用通配符模式 * 符号代表1个, # 符号代表一个或多个》 */ public class Customer02 implements MessageListener { private Logger logger = LoggerFactory.getLogger(Customer02.class); @Override public void onMessage(Message message) { logger.info("消费者Customer02->>开始######################################"); try { logger.info("消费者Customer02->>Body:" + message.getBody()); logger.info("消费者Customer02->>Body.转换:" + new String(message.getBody(),"UTF-8")); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } logger.info("消费者Customer02->>toString:" + message.toString()); logger.info("消费者Customer02->>MessageProperties:" + message.getMessageProperties()); logger.info("消费者Customer02->>MessageProperties.AppId:" + message.getMessageProperties().getAppId()); logger.info("消费者Customer02->>MessageProperties.ClusterId:" + message.getMessageProperties().getClusterId()); logger.info("消费者Customer02->>MessageProperties.ConsumerQueue:" + message.getMessageProperties().getConsumerQueue()); logger.info("消费者Customer02->>MessageProperties.ConsumerTag:" + message.getMessageProperties().getConsumerTag()); logger.info("消费者Customer02->>MessageProperties.ContentEncoding:" + message.getMessageProperties().getContentEncoding()); logger.info("消费者Customer02->>MessageProperties.ContentType:" + message.getMessageProperties().getContentType()); logger.info("消费者Customer02->>MessageProperties.ContentLength:" + message.getMessageProperties().getContentLength()); logger.info("消费者Customer02->>MessageProperties.CorrelationIdString:" + message.getMessageProperties().getCorrelationIdString()); logger.info("消费者Customer02->>MessageProperties.Delay:" + message.getMessageProperties().getDelay()); logger.info("消费者Customer02->>MessageProperties.DeliveryMode:" + message.getMessageProperties().getDeliveryMode()); logger.info("消费者Customer02->>MessageProperties.DeliveryTag:" + message.getMessageProperties().getDeliveryTag()); logger.info("消费者Customer02->>MessageProperties.Expiration:" + message.getMessageProperties().getExpiration()); logger.info("消费者Customer02->>MessageProperties.Headers:" + message.getMessageProperties().getHeaders()); logger.info("消费者Customer02->>MessageProperties.InferredArgumentType:" + message.getMessageProperties().getInferredArgumentType()); logger.info("消费者Customer02->>MessageProperties.ReplyTo:" + message.getMessageProperties().getReplyTo()); logger.info("消费者Customer02->>MessageProperties.ReceivedExchange:" + message.getMessageProperties().getReceivedExchange()); logger.info("消费者Customer02->>MessageProperties.ReceivedRoutingKey:" + message.getMessageProperties().getReceivedRoutingKey()); logger.info("消费者Customer02->>MessageProperties.ReceivedUserId:" + message.getMessageProperties().getReceivedUserId()); logger.info("消费者Customer02->>MessageProperties.ReceivedDelay:" + message.getMessageProperties().getReceivedDelay()); logger.info("消费者Customer02->>MessageProperties.ReceivedDeliveryMode:" + message.getMessageProperties().getReceivedDeliveryMode()); logger.info("消费者Customer02->>MessageProperties.Redelivered:" + message.getMessageProperties().getRedelivered()); logger.info("消费者Customer02->>MessageProperties.ReplyToAddress:" + message.getMessageProperties().getReplyToAddress()); logger.info("消费者Customer02->>MessageProperties.Type:" + message.getMessageProperties().getType()); logger.info("消费者Customer02->>MessageProperties.TargetBean:" + message.getMessageProperties().getTargetBean()); logger.info("消费者Customer02->>MessageProperties.TargetMethod:" + message.getMessageProperties().getTargetMethod()); logger.info("消费者Customer02->>MessageProperties.Timestamp:" + message.getMessageProperties().getTimestamp()); logger.info("消费者Customer02->>MessageProperties.UserId:" + message.getMessageProperties().getUserId()); logger.info("消费者Customer02->>MessageProperties.MessageId:" + message.getMessageProperties().getMessageId()); logger.info("消费者Customer02->>结束######################################"); } }
版权声明:本文为suenpeng原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。