安装rabbitMQ
windows安装包,安装时通常需要先安装erlang开发环境
常用启用命令在安装目录的sbin/目录下执行,rabbitmq-service.bat install
安装服务 rabbitmq-service.bat start
启动服务rabbitmq-service.bat stop
停止服务rabbitmq-plugins enable rabbitmq_management
安装web管理界面插件rabbitmq-plugins enable rabbitmq_stomp rabbitmq_web_stomp
安装websocket连接插件
首次进入管理界面只能通过http://localhost:15672 登录,密码默认为guest/guest
Producer生产者 连接工厂
服务端通过spring-rabbit建立服务端
- 配置信息,
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
@Conditional(RabbitMqEnableCondition.class)
@Configuration
@EnableRabbit
public class RabbitConfiguration {
@Value("${rabbitmq.host}")
private String mqHost;
@Value("${rabbitmq.port}")
private Integer mqPort;
@Value("${rabbitmq.vhost}")
private String mqVhost;
@Value("${rabbitmq.username}")
private String mqUsername;
@Value("${rabbitmq.password}")
private String mqPd;
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(rabbitConnectionFactory());
factory.setConcurrentConsumers(3);
factory.setMaxConcurrentConsumers(10);
return factory;
}
@Bean
public ConnectionFactory rabbitConnectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory(mqHost, mqPort);
connectionFactory.setUsername(mqUsername);
connectionFactory.setPassword(mqPd);
connectionFactory.setVirtualHost(mqVhost);
return connectionFactory;
}
}
- 注解@Conditional(RabbitMqEnableCondition.class)
import org.springframework.context.annotation.Condition;
import org.springframework.context.annotation.ConditionContext;
import org.springframework.core.type.AnnotatedTypeMetadata;
public class RabbitMqEnableCondition implements Condition {
@Override
public boolean matches(ConditionContext conditionContext, AnnotatedTypeMetadata annotatedTypeMetadata) {
Boolean rabbitmqEnabled = Boolean.valueOf(conditionContext.getEnvironment().getProperty("rabbitmq.enabled"));
return rabbitmqEnabled;
}
}
- 生产者
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.stereotype.Component;
@Conditional(RabbitMqEnableCondition.class)
@Component
public class RabbitProducer {
Logger logger = LoggerFactory.getLogger(RabbitProducer.class);
@Autowired
private RabbitConfiguration rabbitConfiguration;
@Bean
public RabbitTemplate amqpTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitConfiguration.rabbitConnectionFactory());
return rabbitTemplate;
}
}
- 消费者
@Conditional(RabbitMqEnableCondition.class)
@Component
public class RabbitConsumer {
Logger logger = LoggerFactory.getLogger(RabbitConsumer.class);
//巡检任务结果反馈
@RabbitListener(queues = "patrolTask")
public void processTasks(Message message) {
try {
String jsonString = new String(message.getBody());
JSONArray jsonArray = JSONArray.fromObject(jsonString);
List<RobortTask> tasks = new ArrayList<>();
List<?> list = JSONArray.toList(jsonArray, new RobortTask(), new JsonConfig());
if (logger.isDebugEnabled()) {
logger.debug(tasks.toString());
}
} catch (Exception e){
logger.warn(e.getMessage(), e);
}
}
}
- 消息的发送
public class ContextTest extends AbstractTest {
@Autowired(required = false)
private AmqpTemplate rabbitTemplate;
@Test
public void getBean() {
LocaleService service = AppContext.getBean(LocaleService.class);
assertNotNull(service);
}
@Test
public void getAmqp() {
try {
assertNotNull(rabbitTemplate);
ObjectMapper mapper = new ObjectMapper();
Map<String,String> map = new HashMap<>();
String jsonInString = mapper.writeValueAsString(map);
String routingKey = "AVS" + "." + "test";
rabbitTemplate.convertAndSend("amq.topic", routingKey, jsonInString);
} catch (JsonProcessingException e) {
Assert.fail(e.getMessage(), e);
}
}
}
- 客服端websocket手动
var ws = new WebSocket("ws:localhost:15674/ws");
// 建立连接
var client = Stomp.over(ws);
// 定义连接成功回调函数
var on_connect = function(x) {
console.log("connect successfully");
client.subscribe("/topic/VAS.#", function(data) {
var result = JSON.parse(data.body);
});
};
// 定义连接失败回调函数
var on_error = function(error) {
alert(error);
};
// 连接消息服务器
client.connect('guest', 'guest', on_connect, on_error, '/');
参考
https://www.sojson.com/blog/48.html
RabbitMQ Web STOMP Plugin
RabbitMQ Web STOMP examples
转载于:https://my.oschina.net/Oyiersan/blog/2999290