android 局域网 推送,通过RabbitMQ实现消息推送功能 可同时实现局域网推送和广域网推送...

服务端

pom文件

org.springframework.boot

spring-boot-starter-amqp

org.springframework.boot

spring-boot-starter-web

配置文件

server:

port: 8022

spring:

#给项目来个名字

application:

name: rabbitmq-consumer

#配置rabbitMq 服务器

rabbitmq:

host: 127.0.0.1

port: 5672

username: snakey

password: 123

#虚拟host 可以不设置,使用server默认host

virtual-host: /

RabbitMQ配置文件

package com.xiaoge.mqdemo.config;

import org.springframework.amqp.core.Binding;

import org.springframework.amqp.core.BindingBuilder;

import org.springframework.amqp.core.FanoutExchange;

import org.springframework.amqp.core.Queue;

import org.springframework.amqp.core.TopicExchange;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

/**

* 作者:guoyzh

* 时间:2019/10/13 13:22

* 功能:测试mq

*/

@Configuration

public class RabbitMQConfig {

// 设置使用的交换机

public final static String EXCHANGE_TOP = "test";

public final static String EXCHANGE_FANOUT = "amq.fanout";

// 设置队列

public final static String QUEUE_MSG = "topic.msg";

public final static String QUEUE_MSG_ALL = "topic.msg.all";

public final static String QUEUE_ALL = "topic.#";

@Bean

public Queue pushMsgQueue() {

return new Queue(RabbitMQConfig.QUEUE_MSG, false);

}

@Bean

TopicExchange topExchange() {

return new TopicExchange(EXCHANGE_TOP);

}

@Bean

FanoutExchange fanoutExchange() {

return new FanoutExchange(EXCHANGE_FANOUT);

}

// queue绑定exchange的时候 routingKey(with())中的参数很重要 决定着这个queue会接收哪些路由key的消息

@Bean

Binding bindingTopExchangeMessage() {

return BindingBuilder.bind(pushMsgQueue()).to(topExchange()).with(QUEUE_MSG);

}

@Bean

Binding bindingFanoutExchangeMessage() {

return BindingBuilder.bind(pushMsgQueue()).to(fanoutExchange());

}

}

接口

package com.xiaoge.mqdemo.web;

import com.fasterxml.jackson.core.JsonProcessingException;

import com.fasterxml.jackson.databind.ObjectMapper;

import com.xiaoge.mqdemo.config.RabbitMQConfig;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Controller;

import org.springframework.web.bind.annotation.GetMapping;

import org.springframework.web.bind.annotation.ResponseBody;

import java.time.LocalDateTime;

import java.time.format.DateTimeFormatter;

import java.util.HashMap;

import java.util.Map;

import java.util.UUID;

/**

* 作者:guoyzh

* 时间:2019/10/13 13:24

* 功能:

*/

@Controller

public class TestController {

@Autowired

RabbitTemplate mRabbitTemplate;

@GetMapping("/send_msg")

@ResponseBody

public String sendTopicMessage() {

// 组建消息

String messageId = String.valueOf(UUID.randomUUID());

String messageData = "message: 分诊室1号发来请求";

String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));

MapmanMap = new HashMap<>();

manMap.put("messageId", messageId);

manMap.put("messageData", messageData);

manMap.put("createTime", createTime);

// 将消息转换为json

ObjectMapper mapper = new ObjectMapper();

String writeValueAsString = null;

try {

writeValueAsString = mapper.writeValueAsString(manMap);

} catch (JsonProcessingException e) {

e.printStackTrace();

}

// 发送消息

mRabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_TOP, RabbitMQConfig.QUEUE_MSG_ALL, writeValueAsString);

return "ok";

}

}

Android

gradle

implementation 'com.rabbitmq:amqp-client:4.4.1'

gradle指定jdk版本 放在android{…}中

compileOptions {

sourceCompatibility JavaVersion.VERSION_1_8

targetCompatibility JavaVersion.VERSION_1_8

}

界面功能

package com.hencego.myapplication;

import android.os.Bundle;

import android.os.Handler;

import android.os.Message;

import android.util.Log;

import com.rabbitmq.client.AMQP;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import com.rabbitmq.client.DefaultConsumer;

import com.rabbitmq.client.Envelope;

import java.io.IOException;

import java.util.UUID;

import java.util.concurrent.TimeoutException;

import androidx.appcompat.app.AppCompatActivity;

/**

* 作者:guoyzh

* 时间:2019/10/13 15:41

* 功能:

*/

public class MainActivity extends AppCompatActivity {

ConnectionFactory factory;

@Override

protected void onCreate(Bundle savedInstanceState) {

super.onCreate(savedInstanceState);

setContentView(R.layout.activity_main);

//连接设置

setupConnectionFactory();

//用于从线程中获取数据,更新ui

final Handler incomingMessageHandler = new Handler() {

@Override

public void handleMessage(Message msg) {

String message = msg.getData().getString("msg");

Log.i("test", "msg:" + message);

}

};

//开启消费者线程

//subscribe(incomingMessageHandler);

new Thread(new Runnable() {

@Override

public void run() {

basicConsume(incomingMessageHandler);

}

}).start();

}

/**

* 连接设置

*/

private void setupConnectionFactory() {

factory = new ConnectionFactory();

factory.setHost("192.168.1.222");

factory.setPort(5672);

factory.setUsername("snakey");

factory.setPassword("123");

factory.setVirtualHost("/");

}

/**

* 收消息(从发布者那边订阅消息)

*/

private void basicConsume(final Handler handler) {

try {

// 连接

Connection connection = factory.newConnection();

// 通道

final Channel channel = connection.createChannel();

String uuid = String.valueOf(UUID.randomUUID());

String QUEUE_NAME = "topic." + uuid;

// 创建非永久队列

channel.queueDeclare(QUEUE_NAME, false, false, true, null);

// 绑定队列到交换机

channel.queueBind(QUEUE_NAME, "test", "topic.#");

// 接收消息 实现Consumer的最简单方法是将便捷类DefaultConsumer子类化。可以在basicConsume 调用上传递此子类的对象以设置订阅:

channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

super.handleDelivery(consumerTag, envelope, properties, body);

String msg = new String(body);

long deliveryTag = envelope.getDeliveryTag();

channel.basicAck(deliveryTag, false);

//从message池中获取msg对象更高效

Message uimsg = handler.obtainMessage();

Bundle bundle = new Bundle();

bundle.putString("msg", msg);

uimsg.setData(bundle);

handler.sendMessage(uimsg);

}

});

} catch (IOException e) {

e.printStackTrace();

} catch (TimeoutException e) {

e.printStackTrace();

}

}

}