使用BlockingQueue(阻塞队列)及多线程实现短信发送实例详解

一、BlockingQueue简介

      阻塞队列BlockingQueue顾名思义是一个队列,在多线程环境中可以通过队列非常方便的实现数据共享。比如常规的“生产者”、“消费者”模型中,常常会出现生产者、消费者数据处理速度不匹配的情况,当生产者产生数据速度过快,那么必须阻塞生产者线程,以便等待消费者线程把累积的数据处理完毕。在多线程环境下,程序员自己控制这些细节显得十分复杂,尤其还要兼顾效率和线程安全,使用BlockingQueue就简化了这个问题。不需要再关心什么时候需要阻塞线程,什么时候需要唤醒线程了。

、开发实例

下面代码是在spring框架下使用BlockingQueue实现短信发送功能

 @Service
 public class NotifyQueueService {
    @Autowired
    NotifyService notifyService;

	private static BlockingQueue<Runnable> runnableQueue;
	private static BlockingQueue<NotifyMsg> notifyQueue;

	private static ThreadPoolExecutor threadPoolExecutor;
	private static Thread noticThread;

        /*项目启动时初始化线程池,线程队列及消息队列,这里队列使用LinkedBlockingQueue实现类
        并启动监听线程及时消费。
        */
        @PostConstruct
	public void init() {
		if (notifyQueue== null) {
			notifyQueue = new LinkedBlockingQueue<NotifyMsg>(); // 消息延时发送队列
		}
		if (runnableQueue == null) {
			runnableQueue = new LinkedBlockingQueue<Runnable>();//线程池链表的阻塞队列
		}
		if (threadPoolExecutor == null) {
			// 线程池初始化,核心线程设为10,普通线程最大30,超时回收时间10秒
			threadPoolExecutor = new ThreadPoolExecutor(10, 30, 10,
					TimeUnit.SECONDS, runnableQueue);//线程池初始化设置,将线程池队列传入
		}

		if (noticThread== null) {
			noticThread= new Thread(new Runnable() {//实例化守护线程监听队列信息
				@Override
				public void run() {
					while (true) {
						try {
                        //从队列中取出消息线程池启动发送信息线程
							threadPoolExecutor.execute(new sendMsg(
								notifyQueue.take()));
						} catch (Exception e) {
							e.printStackTrace();
						}
					}
				}
			});
			noticThread.setDaemon(true);//设置为守护线程
			noticThread.setName("noticThread");//命名
			noticThread.start();//开启守护线程监听队列消息
		}
	}

putData方法供其他消息生产者调用,用于往消息队列中插入信息。

// 往消息队列中塞入消息
	public void putData(NoticMsg noticMsg) {
		if (noticMsg!= null) {
			notifyQueue.offer(noticMsg);// 生产者放入队列
		}
	}

sendMsg类用于消费信息队列中的信息。

/**
	 * 
	 * 消费线程
	 * 
	 */
	public class sendMsg implements Runnable {

		private NoticMsg noticMsg;

		public sendMsgUnit(NoticMsg noticMsg ) {
			super();
			this.noticMsg = noticMsg ;
		}
        
		@Override
		public void run() {
		    try {
		        logger.info("启动了一个发送线程");
		        if (noticMsg != null) {
                            //调用短信发送方法,发送短信。             
                            notifyService.notify(noticMsg);
		        }
                     } catch (Exception e) {
			e.printStackTrace();
			logger.info("发送短信失败");
		     }
              }
	}

 


版权声明:本文为lianjian6534原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。