线程池消费MQ消息队列解决方案

最近优化我手头上的短信平台,发现使用消息队列发送短信一个消息队列的监听只能处理完一次发送之后,才能获取第二个消息内容。就想着能不能用线程池来消费MQ里的任务,但是问题来了,如果使用线程池的话线程池满了之后会有决绝接收。而且线程池里的队列如果存了很多消息,重启服务的时候会造成消息的丢失。 怎么办好呢,思来想去,还是继承线程池自己封装下线程池的实现吧。最先想到的是继承ThreadPoolExecutor利用 重写beforeExecute 和afterExecute 做前置后置处理的时候加锁和释放锁让线程池执行任务前阻塞。

public class BlockThreadPoolExecute extends ThreadPoolExecutor {
    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
    }
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        try{
            lock.lock();
            this.condition.signal();
        }finally {
            this.lock.unlock();
        }
    }
}

 试验后发现beforeExecute不起作用。看源码后才发现beforeExecute方法是在线程任务执行的时候才会执行,结果阻塞的是 新开的线程,不影响MQ往线程池里放任务。beforeExecute是用不成了,没办法只能在线程池执行的方法上做文章.如下

/**
 * 阻塞线程池
 * 线程池的线程数到达最大线程数阻塞等待
 * 可用于多线程获取MQ消息任务
 * 因为会阻塞,就不用考虑拒绝策略这一块的重写
 */
public class BlockThreadPoolExecute extends ThreadPoolExecutor {
    private ReentrantLock lock = new ReentrantLock();
    private Condition condition = this.lock.newCondition();

    public BlockThreadPoolExecute(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    @Override
    public void execute(Runnable command) {
        //进行同步锁定
        this.lock.lock();
        super.execute(command);
        try {
            //如果线程池的数量已经达到最大线程池的数量,则进行挂起操作
            if (getPoolSize() == getMaximumPoolSize()) {
                this.condition.await();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            this.lock.unlock();
        }
    }
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        try{
            lock.lock();
            this.condition.signal();
        }finally {
            this.lock.unlock();
        }


    }
}

经测试

public class TestThreadPools {
/*这里的队列本来想用 SynchronousQueue 不存储队列的,不知道为啥没成功 */
public static final  ExecutorService poolExecuter = new BlockThreadPoolExecute(3
			, 3 , 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1));
    public static void main(String[]args) {
        
        for (int i=0;i<100;i++) {
            poolExecuter.execute(()->{
                String threadName= Thread.currentThread().getName();
                System.out.println(threadName+"开始执行");
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(threadName+"执行结束");
            });
        }

    }
}

执行结果

pool-1-thread-1开始执行1562132691801
pool-1-thread-3开始执行1562132691801
pool-1-thread-2开始执行1562132691801
pool-1-thread-1执行结束1562132693802
pool-1-thread-3执行结束1562132693802
pool-1-thread-2执行结束1562132693802
pool-1-thread-1开始执行1562132693803
pool-1-thread-3开始执行1562132693803
pool-1-thread-3执行结束1562132695807
pool-1-thread-2开始执行1562132695808
pool-1-thread-1执行结束1562132695809
pool-1-thread-3开始执行1562132695809
pool-1-thread-2执行结束1562132697809
pool-1-thread-1开始执行1562132697809
pool-1-thread-3执行结束1562132697810
pool-1-thread-2开始执行1562132697810

确认线程池会阻塞,只有到有空闲线程时才会运行,就可以放心的拿到消息队列中消费消息了

 


版权声明:本文为cainiao_xiaowu原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。