springboot 监控队列_SpringBoot 对IBM MQ进行数据监听接收以及数据发送

一、需求介绍

后端使用spring boot2.0框架,要实现ibm mq的实时数据jms监听接收处理,并形成回执通过mq队列发送。

二、引入依赖jar包

org.springframework

spring-jms

4.3.18.release

javax.jms

javax.jms-api

com.ibm.mq

com.ibm.mq.allclient

9.1.0.0

三、监听实现

代码中分为三大块:

1、mq通道连接,我这边是用的用户名密码连接,如果非密码的可不入参

2、mq的队列连接并实现监听

3、mq发送

@configuration

public class mqtestconfig {

@autowired

private mqproperties mqproperties;

/**=======================mq 通道工厂============================**/

@bean(name="mqqueueconnectionfactory")

public mqqueueconnectionfactory mqqueueconnectionfactory(){

mqqueueconnectionfactory mqqueueconnectionfactory = new mqqueueconnectionfactory();

mqqueueconnectionfactory.sethostname(mqproperties.gethostname());

try {

mqqueueconnectionfactory.settransporttype(wmqconstants.wmq_cm_client);

mqqueueconnectionfactory.setccsid(mqproperties.getccsid());

mqqueueconnectionfactory.setchannel(mqproperties.getchannel());

mqqueueconnectionfactory.setport(mqproperties.getport());

mqqueueconnectionfactory.setqueuemanager(mqproperties.getqueuemanager());

} catch (jmsexception e) {

e.printstacktrace();

}

return mqqueueconnectionfactory;

}

@bean(name="usercredentialsconnectionfactoryadapter")

public usercredentialsconnectionfactoryadapter usercredentialsconnectionfactoryadapter(mqqueueconnectionfactory mqqueueconnectionfactory){

usercredentialsconnectionfactoryadapter usercredentialsconnectionfactoryadapter = new usercredentialsconnectionfactoryadapter();

usercredentialsconnectionfactoryadapter.setusername(mqproperties.getusername());

usercredentialsconnectionfactoryadapter.setpassword(mqproperties.getpassword());

usercredentialsconnectionfactoryadapter.settargetconnectionfactory(mqqueueconnectionfactory);

return usercredentialsconnectionfactoryadapter;

}

/**============================mq 消息监听接收=============================**/

//队列连接

@bean(name="mqueue")

public mqqueue mqueue(){

mqqueue mqqueue = new mqqueue();

try {

mqqueue.setbasequeuename(mqproperties.getbasequeuenamerecv());

mqqueue.setbasequeuemanagername(mqproperties.getbasequeuemanagername());

} catch (jmsexception e) {

e.printstacktrace();

}

return mqqueue;

}

//对队列进行监听

@bean(name="simplemessagelistenercontainer")

public simplemessagelistenercontainer simplemessagelistenercontainer(usercredentialsconnectionfactoryadapter usercredentialsconnectionfactoryadapter,mqqueue mqueue){

simplemessagelistenercontainer simplemessagelistenercontainer = new simplemessagelistenercontainer();

simplemessagelistenercontainer.setconnectionfactory(usercredentialsconnectionfactoryadapter);

simplemessagelistenercontainer.setdestination(mqueue);

simplemessagelistenercontainer.setmessagelistener(decmqriskrecvservice());

return simplemessagelistenercontainer;

}

//报文处理类

@bean(name="decmqriskrecvservice")

public decmqriskrecvservice decmqriskrecvservice(){

return new decmqriskrecvservice();

}

/**============================mq 发送消息============================**/

@bean(name="cachingconnectionfactory")

public cachingconnectionfactory cachingconnectionfactory(usercredentialsconnectionfactoryadapter usercredentialsconnectionfactoryadapter){

cachingconnectionfactory cachingconnectionfactory = new cachingconnectionfactory();

cachingconnectionfactory.settargetconnectionfactory(usercredentialsconnectionfactoryadapter);

cachingconnectionfactory.setsessioncachesize(5);

cachingconnectionfactory.setreconnectonexception(true);

return cachingconnectionfactory;

}

@bean(name="jmstransactionmanager")

public platformtransactionmanager jmstransactionmanager(cachingconnectionfactory cachingconnectionfactory){

jmstransactionmanager jmstransactionmanager = new jmstransactionmanager();

jmstransactionmanager.setconnectionfactory(cachingconnectionfactory);

return jmstransactionmanager;

}

@bean(name="jmsoperations")

public jmsoperations jmsoperations(cachingconnectionfactory cachingconnectionfactory){

jmstemplate jmstemplate = new jmstemplate(cachingconnectionfactory);

jmstemplate.setreceivetimeout(mqproperties.getreceivetimeout());

return jmstemplate;

}

}

mq配置文件

记得要添加get和set方法

@configuration

@configurationproperties(prefix=mqproperties.mq_prefix)

public class mqproperties {

public static final string mq_prefix = "mq";

private string hostname;

private int port;

private string channel;

private int ccsid;

private string username;

private string password;

private string queuemanager;

private string basequeuemanagername;

private string basequeuenamerecv;

private string basequeuenamesend;

private long receivetimeout;

}

报文处理类及回执发送

1、实现类要实现messagelistener,重写onmessage方法,message就是监听到的消息。

2、读取报文时为防止乱码,我这边按照格式分两种方式读取转码。

3、发送回执,之前发送发现报文多出了一些报文头信息,所以在队列信息加了

"queue:///" + mqproperties.getbasequeuenamesend() + "?targetclient=1"

这样发送的报文会去掉报文头信息。

@service

public class decmqriskrecvservice implements messagelistener {

@autowired

private jmsoperations jmsoperations;

@autowired

private mqproperties mqproperties;

@override

public void onmessage(message message) {

string str = null;

// 1、读取报文

try {

if (message instanceof bytesmessage) {

bytesmessage bm = (bytesmessage) message;

byte[] bys = null;

bys = new byte[(int) bm.getbodylength()];

bm.readbytes(bys);

str = new string(bys, "utf-8");

} else {

str = ((textmessage) message).gettext();

str = new string(str.getbytes("iso-8859-1"), "utf-8");

}

} catch (jmsexception e) {

e.printstacktrace();

} catch (unsupportedencodingexception e) {

e.printstacktrace();

}

// 2、处理报文

// 3、组装回执发送

string receipt = "";

try {

jmsoperations.convertandsend("queue:///" + mqproperties.getbasequeuenamesend() + "?targetclient=1", receipt.getbytes("utf-8"));

} catch (jmsexception e) {

e.printstacktrace();

} catch (unsupportedencodingexception e) {

e.printstacktrace();

}

}

}

如您对本文有疑问或者有任何想说的,请点击进行留言回复,万千网友为您解惑!


版权声明:本文为weixin_39939661原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。