appollo消息服务器,搭建Apache apollo消息队列服务、简单访问

官网:http://activemq.apache.org/apollo/index.html

环境:ubuntu 16.04

1.安装好jdk

2.下载解压

wget http://mirrors.hust.edu.cn/apache/activemq/activemq-apollo/1.7.1/apache-apollo-1.7.1-unix-distro.tar.gz

tar -zxvf apache-apollo-1.7.1-unix-distro.tar.gz

3.创建broker

(在当前用户工作目录创建即可)

apache-apollo-1.7.1/bin/apollo create apollo_broker

ae4d31a172fccb1a8e20f348f41990db.png

如果参照:http://activemq.apache.org/apollo/documentation/getting-started.html

在/var/lib目录创建broker,则创建和运行一般都需要root权限才可以;

4.修改配置,允许指定主机访问web管理控制台

参考:http://activemq.apache.org/apollo/documentation/user-manual.html#Web_Based_Administration

apollo_broker/etc/apollo.xml文件,不绑定回送地址127.0.0.1

7efed48612a0353ac0fd75b9cc030f87.png

apollo_broker/etc/login.config文件,添加白名单文件

f8920586a7b0cdf8a0f92ba64cf78f3a.png

允许以下主机访问web控制台

db44c4dd7fc63e9badfdbf16a7758e22.png

5.运行broker

apollo_broker/bin/apollo-broker run

IP地址:192.168.1.121

1ce4332547a8964e6f69030ac4c3c566.png

6.访问web控制台

http://192.168.1.121:61680/    (http地址,好像需要配置白名单)

或者:https://192.168.1.121:61681/    (https地址,不用配置白名单,就可以直接访问)

默认账户:admin                默认密码:password

69071a47374629643494b67757f1b999.png

cdbfb7700a48969fd5065a2dcd2a6719.png

7.创建Publish Demo

参考:apache-apollo-1.7.1/examples目录中,自带了demo

创建spring boot project

pom.xml,添加依赖组件

org.fusesource.mqtt-client

mqtt-client

1.0

org.eclipse.paho

org.eclipse.paho.client.mqttv3

1.2.0

Publish.java

package com.test;

import java.net.URISyntaxException;

import org.fusesource.hawtbuf.Buffer;

import org.fusesource.hawtbuf.UTF8Buffer;

import org.fusesource.mqtt.client.Future;

import org.fusesource.mqtt.client.FutureConnection;

import org.fusesource.mqtt.client.MQTT;

import org.fusesource.mqtt.client.QoS;

class Publisher {

public static void main(String[] args) {

System.out.println("publish main");

String user = "admin";

String password = "password";

String host = "192.168.1.121";

int port = 61613;

final String destination = "/topic/monitor";

MQTT mqtt = new MQTT();

try {

mqtt.setHost(host, port);

mqtt.setUserName(user);

mqtt.setPassword(password);

// 1.连接server

FutureConnection connection = mqtt.futureConnection();

connection.connect().await();

// 2.发布消息

UTF8Buffer topic = new UTF8Buffer(destination);

while (true) {

Buffer msg = new UTF8Buffer(getMessage());

Futurefuture = connection.publish(topic, msg, QoS.AT_LEAST_ONCE, false);

System.out.println("publish msg=" + msg + ", wait begin");

// 发送消息后,等待被处理

future.await();

// delay(1000);

System.out.println("publish wait end");

}

} catch (URISyntaxException e) {

e.printStackTrace();

} catch (Exception e) {

e.printStackTrace();

}

}

private static String getMessage() {

return String.valueOf(Math.random());

}

private static void delay(long time) {

try {

Thread.sleep(time);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

8.创建Subscriber Demo

创建spring boot project

Listener.java

package com.test;

import org.fusesource.hawtbuf.*;

import org.fusesource.mqtt.client.*;

class Listener {

public static void main(String[] args) throws Exception {

System.out.println("listener main");

String user = "admin";

String password = "password";

String host = "192.168.1.121";

int port = 61613;

final String destination = "/topic/monitor";

MQTT mqtt = new MQTT();

mqtt.setHost(host, port);

mqtt.setUserName(user);

mqtt.setPassword(password);

final CallbackConnection connection = mqtt.callbackConnection();

// 1.设置监听器

connection.listener(new org.fusesource.mqtt.client.Listener() {

long count = 0;

public void onConnected() {

System.out.println("onConnected");

}

public void onDisconnected() {

System.out.println("onDisconnected");

}

public void onFailure(Throwable value) {

System.out.println("onFailure");

value.printStackTrace();

System.exit(-2);

}

public void onPublish(UTF8Buffer topic, Buffer msg, Runnable ack) {

String message = msg.utf8().toString();

System.out.println("onPublish body=" + message + " count=" + count);

count++;

}

});

// 2.连接server

connection.connect(new Callback() {

@Override

public void onSuccess(Void value) {

System.out.println("onSuccess");

// 3.订阅消息

Topic[] topics = { new Topic(destination, QoS.AT_LEAST_ONCE) };

connection.subscribe(topics, new Callback() {

public void onSuccess(byte[] qoses) {

// UTF8Buffer buffer = new UTF8Buffer(qoses);

System.out.println("onSuccess");

}

public void onFailure(Throwable e) {

System.out.println("subscribe onFailure");

e.printStackTrace();

System.exit(-2);

}

});

}

@Override

public void onFailure(Throwable e) {

System.out.println("connect onFailure");

e.printStackTrace();

System.exit(-2);

}

});

// Wait forever..

synchronized (Listener.class) {

while (true) {

Listener.class.wait();

}

}

}

}