RabbitMQ--02--RabbitMQ传递对象

方案一:对象序列化
其实和hello world类似,只不过增加了一个对象序列化:
序列化就是将一个对象的状态(各个属性量)保存起来,然后在适当的时候再获得。序列化分为两大部分:序列化和反序列化。序列化是这个过程的第一部分,将数据分解成字节流,以便存储在文件中或在网络上传输。反序列化就是打开字节流并重构对象。对象序列化不仅要将基本数据类型转换成字节表示,有时还要恢复数据。恢复数据要求有恢复数据的对象实例。

发送端:

public class Send {
    /**
     * mq通信的名称
     */
    private final static String QUEUE_NAME="hello";

    public static void main(String[] args) throws IOException, TimeoutException{
        ConnectionFactory connFactory=new ConnectionFactory();
        //设置服务器位置
        connFactory.setHost("localhost");
        //设置服务器端口号
        //connFactory.setPort(5672);
        //创建连接
        Connection con=connFactory.newConnection();
        //创建channel
        Channel channel=con.createChannel();
        //设置队列的属性第一个参数为队列名。第二个参数为是否创建一个持久队列,第三个是否创建一个专用的队列,
        //第四个参数为是否自动删除队列,第五个参数为其他属性(结构参数)
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //String message="hello world";
        //创建一个对象
        User user=new User();
        user.setId(1);
        user.setName("dema");
        user.setPassword("123");
        //将创建的对象序列化后传递
        //第一个参数为,第二个参数为队列名。第三个参数为其他属性。第四个参数为消息体
        channel.basicPublish("",QUEUE_NAME,null,SerializationUtils.serialize(user));
        System.out.println("正在发送消息:"+user.getId());     
        //关闭连接
        channel.close();
        con.close();
    }
}

接收端:

import java.io.IOException;
import java.util.concurrent.TimeoutException;
import org.apache.commons.lang3.SerializationUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
import com.rabbitmq.client.ShutdownSignalException;
public class Receive {

    /**
     * 定义rm通信的名称
     */
    private final static String QUEUE_NAME="hello";


    public static void main(String[] args) throws IOException, TimeoutException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
        // TODO Auto-generated method stub

        ConnectionFactory connFactory=new ConnectionFactory();

        //设置服务器位置
        connFactory.setHost("localhost");
        //设置端口号
        //connFactory.setPort(15672);
        //连接登录用户名
        //connFactory.setPassword("guest");
        //连接登录密码
        //connFactory.setUsername("guest");

        //创建连接
        Connection con=connFactory.newConnection();

        //创建channel
        Channel channel=con.createChannel();

        //设置队列的属性第一个参数为队列名。第二个参数为是否创建一个持久队列,第三个是否创建一个专用的队列,
                //第四个参数为是否自动删除队列,第五个参数为其他属性(结构参数)
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        QueueingConsumer consumer=new QueueingConsumer(channel);
        //第一个参数为队列名,第二个参数是否考虑已发送的消息,第三个参数为消费对象的接口
        channel.basicConsume(QUEUE_NAME, true, consumer);

        System.out.println("Receiv类正在等待Send类发送消息");
        while(true){
            Delivery delivery=consumer.nextDelivery();
            //String message=new String(delivery.getBody()); 
            //将传递过来的对象反序列化
            @SuppressWarnings("deprecation")
            User user=(User)SerializationUtils.deserialize(delivery.getBody());
            //System.out.println("Receive类接收到Send类发送的信息:"+message);
            System.out.println(user.getName());
        }

        //关闭连接
    }
}

方案二:通过ObjectMapper将对象转换成JSON数据:
发送端:

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import org.apache.commons.lang3.SerializationUtils;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


public class Send {
    /**
     * mq通信的名称
     */
    private final static String QUEUE_NAME="hello";

    public static void main(String[] args) throws IOException, TimeoutException{
        ConnectionFactory connFactory=new ConnectionFactory();
        //设置服务器位置
        connFactory.setHost("localhost");
        //设置服务器端口号
        //connFactory.setPort(5672);
        //创建连接
        Connection con=connFactory.newConnection();
        //创建channel
        Channel channel=con.createChannel();
        //设置队列的属性第一个参数为队列名。第二个参数为是否创建一个持久队列,第三个是否创建一个专用的队列,
        //第四个参数为是否自动删除队列,第五个参数为其他属性(结构参数)
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //String message="hello world";

        //创建一个对象
        User user=new User();
        user.setId(1);
        user.setName("dema");
        user.setPassword("123");

        //将Java对象匹配JSON结构
        ObjectMapper mapper=new ObjectMapper();
        String message=mapper.writeValueAsString(user);
        channel.basicPublish("",QUEUE_NAME,null,message.getBytes());

        System.out.println("正在发送消息:"+user.getId());     
        //关闭连接
        channel.close();
        con.close();
        //SimpleMessageConverter


    }
}

接收端:

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import org.apache.commons.lang3.SerializationUtils;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
import com.rabbitmq.client.ShutdownSignalException;


public class Receive {

    /**
     * 定义rm通信的名称
     */
    private final static String QUEUE_NAME="hello";


    public static void main(String[] args) throws IOException, TimeoutException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
        // TODO Auto-generated method stub

        ConnectionFactory connFactory=new ConnectionFactory();

        //设置服务器位置
        connFactory.setHost("localhost");
        //设置端口号
        //connFactory.setPort(15672);
        //连接登录用户名
        //connFactory.setPassword("guest");
        //连接登录密码
        //connFactory.setUsername("guest");

        //创建连接
        Connection con=connFactory.newConnection();

        //创建channel
        Channel channel=con.createChannel();

        //设置队列的属性第一个参数为队列名。第二个参数为是否创建一个持久队列,第三个是否创建一个专用的队列,
                //第四个参数为是否自动删除队列,第五个参数为其他属性(结构参数)
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        QueueingConsumer consumer=new QueueingConsumer(channel);
        //第一个参数为队列名,第二个参数是否考虑已发送的消息,第三个参数为消费对象的接口
        channel.basicConsume(QUEUE_NAME, true, consumer);

        System.out.println("Receiv类正在等待Send类发送消息");
        while(true){

            //将json数据转成对象
            ObjectMapper mapper=new ObjectMapper();
            Delivery delivery=consumer.nextDelivery();
            String message=new String(delivery.getBody());
            User user=mapper.readValue(message.getBytes("utf-8"),User.class);
            System.out.println(user.getName());
        }

        //关闭连接
    }

}

说明都在注释中。
附带遇到的问题及解决办法:

Exception in thread "main" java.lang.NoClassDefFoundError: org/slf4j/ILoggerFactory
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:791)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:423)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:356)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:186)
..........

Failed to instantiate SLF4J LoggerFactory
Reported exception:
java.lang.NoClassDefFoundError: org/apache/log4j/Level
    at org.slf4j.LoggerFactory.bind(LoggerFactory.java:150)
    at org.slf4j.LoggerFactory.performInitialization(LoggerFactory.java:124)
    at org.slf4j.LoggerFactory.getILoggerFactory(LoggerFactory.java:412)
    at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:357)
    at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:383)
    at com.rabbitmq.client.impl.AMQConnection.<clinit>(AMQConnection.java:49)
    at com.rabbitmq.client.ConnectionFactory.<init>(ConnectionFactory.java:91)
    at Receive.main(Receive.java:24)
Caused by: java.lang.ClassNotFoundException: org.apache.log4j.Level
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 8 more
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/log4j/Level
    at org.slf4j.LoggerFactory.bind(LoggerFactory.java:150)
    at org.slf4j.LoggerFactory.performInitialization(LoggerFactory.java:124)
    at org.slf4j.LoggerFactory.getILoggerFactory(LoggerFactory.java:412)
    at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:357)
    at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:383)
    at com.rabbitmq.client.impl.AMQConnection.<clinit>(AMQConnection.java:49)
    at com.rabbitmq.client.ConnectionFactory.<init>(ConnectionFactory.java:91)
    at Receive.main(Receive.java:24)
Caused by: java.lang.ClassNotFoundException: org.apache.log4j.Level
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 8 more

解决方案:缺少日志架包
添加三个架包:
log4j-1.2.17.jar
slf4j-api-1.6.4-sources.jar
slf4j-api-1.6.4.jar
slf4j-log4j12-1.7.0-sources.jar
slf4j-log4j12-1.7.0.jar
参考网站:
http://outofmemory.cn/code-snippet/6776/java.lang.NoClassDefFoundError-org-slf4j-LoggerFactory

Exception in thread "main" java.net.ConnectException: Connection refused: connect
    at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method)
    at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:85)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
    at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:589)
    at com.rabbitmq.client.impl.SocketFrameHandlerFactory.create(SocketFrameHandlerFactory.java:50)
    at com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory.newConnection(RecoveryAwareAMQConnectionFactory.java:60)
    at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.init(AutorecoveringConnection.java:99)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:900)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:859)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:817)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:954)
    at Receive.main(Receive.java:30)

安装后在浏览器中通过url无法访问
解决方案:
给path变量添加内容,在其后面增加:;%RABBITMQ_SERVER%\sbin (注意前面的分号),然后确定即可
添加环境变量:RABBITMQ_SERVER
环境变量RABBITMQ_SERVER 的值为:D:\My-Softwar-Installed\RabbitMQ Server\rabbitmq_server-
配置环境变量后cmd中输入
rabbitmq-plugins enable rabbitmq_management
然后运行下面的命令来安装:
rabbitmq-service stop
rabbitmq-service install
rabbitmq-service start

Exception in thread "main" log4j:WARN No appenders could be found for logger (com.rabbitmq.client.impl.ForgivingExceptionHandler).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
java.util.concurrent.TimeoutException
    at com.rabbitmq.utility.BlockingCell.get(BlockingCell.java:77)
    at com.rabbitmq.utility.BlockingCell.uninterruptibleGet(BlockingCell.java:120)
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:372)
    at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:297)
    at com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory.newConnection(RecoveryAwareAMQConnectionFactory.java:62)
    at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.init(AutorecoveringConnection.java:99)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:900)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:859)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:817)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:954)
    at Receive.main(Receive.java:31)

解决方案:
将log4j.properties添加到classpath下
http://blog.csdn.net/huoyin/article/details/41593013

rabbitmq的web管理界面无法使用guest用户登录
解决方案:由chrome换成360浏览器
附demo下载链接:http://download.csdn.net/detail/btwangzhi/9752777



版权声明:本文为UncleTian原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。