一、需求介绍
后端使用spring boot2.0框架,要实现ibm mq的实时数据jms监听接收处理,并形成回执通过mq队列发送。
二、引入依赖jar包
org.springframework
spring-jms
4.3.18.release
javax.jms
javax.jms-api
com.ibm.mq
com.ibm.mq.allclient
9.1.0.0
三、监听实现
代码中分为三大块:
1、mq通道连接,我这边是用的用户名密码连接,如果非密码的可不入参
2、mq的队列连接并实现监听
3、mq发送
@configuration
public class mqtestconfig {
@autowired
private mqproperties mqproperties;
/**=======================mq 通道工厂============================**/
@bean(name="mqqueueconnectionfactory")
public mqqueueconnectionfactory mqqueueconnectionfactory(){
mqqueueconnectionfactory mqqueueconnectionfactory = new mqqueueconnectionfactory();
mqqueueconnectionfactory.sethostname(mqproperties.gethostname());
try {
mqqueueconnectionfactory.settransporttype(wmqconstants.wmq_cm_client);
mqqueueconnectionfactory.setccsid(mqproperties.getccsid());
mqqueueconnectionfactory.setchannel(mqproperties.getchannel());
mqqueueconnectionfactory.setport(mqproperties.getport());
mqqueueconnectionfactory.setqueuemanager(mqproperties.getqueuemanager());
} catch (jmsexception e) {
e.printstacktrace();
}
return mqqueueconnectionfactory;
}
@bean(name="usercredentialsconnectionfactoryadapter")
public usercredentialsconnectionfactoryadapter usercredentialsconnectionfactoryadapter(mqqueueconnectionfactory mqqueueconnectionfactory){
usercredentialsconnectionfactoryadapter usercredentialsconnectionfactoryadapter = new usercredentialsconnectionfactoryadapter();
usercredentialsconnectionfactoryadapter.setusername(mqproperties.getusername());
usercredentialsconnectionfactoryadapter.setpassword(mqproperties.getpassword());
usercredentialsconnectionfactoryadapter.settargetconnectionfactory(mqqueueconnectionfactory);
return usercredentialsconnectionfactoryadapter;
}
/**============================mq 消息监听接收=============================**/
//队列连接
@bean(name="mqueue")
public mqqueue mqueue(){
mqqueue mqqueue = new mqqueue();
try {
mqqueue.setbasequeuename(mqproperties.getbasequeuenamerecv());
mqqueue.setbasequeuemanagername(mqproperties.getbasequeuemanagername());
} catch (jmsexception e) {
e.printstacktrace();
}
return mqqueue;
}
//对队列进行监听
@bean(name="simplemessagelistenercontainer")
public simplemessagelistenercontainer simplemessagelistenercontainer(usercredentialsconnectionfactoryadapter usercredentialsconnectionfactoryadapter,mqqueue mqueue){
simplemessagelistenercontainer simplemessagelistenercontainer = new simplemessagelistenercontainer();
simplemessagelistenercontainer.setconnectionfactory(usercredentialsconnectionfactoryadapter);
simplemessagelistenercontainer.setdestination(mqueue);
simplemessagelistenercontainer.setmessagelistener(decmqriskrecvservice());
return simplemessagelistenercontainer;
}
//报文处理类
@bean(name="decmqriskrecvservice")
public decmqriskrecvservice decmqriskrecvservice(){
return new decmqriskrecvservice();
}
/**============================mq 发送消息============================**/
@bean(name="cachingconnectionfactory")
public cachingconnectionfactory cachingconnectionfactory(usercredentialsconnectionfactoryadapter usercredentialsconnectionfactoryadapter){
cachingconnectionfactory cachingconnectionfactory = new cachingconnectionfactory();
cachingconnectionfactory.settargetconnectionfactory(usercredentialsconnectionfactoryadapter);
cachingconnectionfactory.setsessioncachesize(5);
cachingconnectionfactory.setreconnectonexception(true);
return cachingconnectionfactory;
}
@bean(name="jmstransactionmanager")
public platformtransactionmanager jmstransactionmanager(cachingconnectionfactory cachingconnectionfactory){
jmstransactionmanager jmstransactionmanager = new jmstransactionmanager();
jmstransactionmanager.setconnectionfactory(cachingconnectionfactory);
return jmstransactionmanager;
}
@bean(name="jmsoperations")
public jmsoperations jmsoperations(cachingconnectionfactory cachingconnectionfactory){
jmstemplate jmstemplate = new jmstemplate(cachingconnectionfactory);
jmstemplate.setreceivetimeout(mqproperties.getreceivetimeout());
return jmstemplate;
}
}
mq配置文件
记得要添加get和set方法
@configuration
@configurationproperties(prefix=mqproperties.mq_prefix)
public class mqproperties {
public static final string mq_prefix = "mq";
private string hostname;
private int port;
private string channel;
private int ccsid;
private string username;
private string password;
private string queuemanager;
private string basequeuemanagername;
private string basequeuenamerecv;
private string basequeuenamesend;
private long receivetimeout;
}
报文处理类及回执发送
1、实现类要实现messagelistener,重写onmessage方法,message就是监听到的消息。
2、读取报文时为防止乱码,我这边按照格式分两种方式读取转码。
3、发送回执,之前发送发现报文多出了一些报文头信息,所以在队列信息加了
"queue:///" + mqproperties.getbasequeuenamesend() + "?targetclient=1"
这样发送的报文会去掉报文头信息。
@service
public class decmqriskrecvservice implements messagelistener {
@autowired
private jmsoperations jmsoperations;
@autowired
private mqproperties mqproperties;
@override
public void onmessage(message message) {
string str = null;
// 1、读取报文
try {
if (message instanceof bytesmessage) {
bytesmessage bm = (bytesmessage) message;
byte[] bys = null;
bys = new byte[(int) bm.getbodylength()];
bm.readbytes(bys);
str = new string(bys, "utf-8");
} else {
str = ((textmessage) message).gettext();
str = new string(str.getbytes("iso-8859-1"), "utf-8");
}
} catch (jmsexception e) {
e.printstacktrace();
} catch (unsupportedencodingexception e) {
e.printstacktrace();
}
// 2、处理报文
// 3、组装回执发送
string receipt = "";
try {
jmsoperations.convertandsend("queue:///" + mqproperties.getbasequeuenamesend() + "?targetclient=1", receipt.getbytes("utf-8"));
} catch (jmsexception e) {
e.printstacktrace();
} catch (unsupportedencodingexception e) {
e.printstacktrace();
}
}
}
如您对本文有疑问或者有任何想说的,请点击进行留言回复,万千网友为您解惑!