多线程推送消息

闲来无事,研究了下多线程,写了个多线程推送消息的demo。
码上代码之前,我们必须先了解一点多线程的场景知识。

为什么要使用多线程?

提高程序效率。假设要向上万甚至十万百万名用户发送消息,单线程推送效率就有些相形见拙了,此时为了提高推送效率,采用多线程是不可避免的,可以很大程度上减少推送时间。

创建线程的方式?

创建线程池的方式有ThreadPoolExecutor和Executors,而Executors利用工厂模式向我们提供了4种线程池实现方式,这里不做过多解释。
在阿里发布的 Java开发手册中强制线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。

我应该创建多少线程?

因为每创建一个线程需要耗费大量CPU资源,所以应根据CPU的性能来决定使用多少线程。

接下来码上代码

这里使用了创建了线程池ThreadPoolExecutor poolExecutor去执行消息的推送

 /**
     * 多线程推送消息处理
     * @param msg
     * @param successList
     * @param failList
     * @param countDownLatch
     */
    private static void poolSend(String msg, LinkedList<String> successList, LinkedList<String> failList,CountDownLatch countDownLatch) {

        poolExecutor.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    boolean result = sendMessage(msg, successList);
                     //消息推送失败处理
                    if (!result) {
                        repeatSent(result,msg, successList);
                    }

                } catch (Exception e) {
                    throw new RuntimeException("多线程推送消息异常"+e);
                }
                countDownLatch.countDown();
            }
        });

    }

消息推送的方法

/**
     * 推送消息,这里假设了消息推送失败的场景
     *
     * @param msg
     * @param successList
     * @return
     */
    private static boolean sendMessage(String msg, LinkedList<String> successList) {

        boolean isFlag = false;

        if (!isFlag) {
            System.out.println(Thread.currentThread().getName() + "推送消息" + msg);
            //这一步可加锁,避免add冲突
            successList.add(msg);
            isFlag = true;
        }

        return isFlag;
    }

消息推送失败处理

    /**
     * 推送失败,重复推送
     *
     * @param msg
     * @param successList
     */
    private static void repeatSent(boolean result,String msg, LinkedList<String> successList) {
        //重复推送不大于5次
        for (int i = 0; i < sentNum; i++) {
            result = sendMessage(msg, successList);
            if (result) {
                break;
            }
        }
        //大于5次则直接推送失败,不再推送
        if (!result) {
            System.out.println(Thread.currentThread().getName() + "消息推送失败" + msg);
        }


    }

消息推送的主方法

 /**
     * 推送消息的主方法
     * @param msgList
     */
    public static void send(LinkedList<String> msgList) {

        if (msgList == null) {
            throw new RuntimeException("无消息可推送");
        }
        LinkedList<String> successList = new LinkedList<>();//成功发送消息队列
        LinkedList<String> failList = new LinkedList<>();//失败发送消息队列

        int pushNum=THREADSZIE>msgList.size()?msgList.size():THREADSZIE;
        CountDownLatch countDownLatch= new CountDownLatch(pushNum);

        for (int i=0;i<pushNum;i++){
            String msg = msgList.removeFirst();
            //消息推送
            poolSend(msg,successList,failList,countDownLatch);
        }

        try {
            //阻塞主线程,等待所有子线程执行完
            countDownLatch.await();
            TimeUnit.SECONDS.sleep(1);

        }catch (InterruptedException e){
            throw new RuntimeException("线程中断"+e);
        }

        //发送成功的消息内容展示
        if (successList.size()!=0){
            System.out.print("消息成功列表数据有:");
            for (String msg : successList){
                System.out.print(msg+" ");
            }
            System.out.println();
            //清楚发送成功的消息
            successList.clear();
            if (msgList.size()!=0){
                System.out.print("还未发送的消息列表数据有:");
                for (String msg : msgList){
                    System.out.print(msg+" ");
                }
            }
        }


    }

代码运行结果
在这里插入图片描述
github代码地址:https://github.com/AsyouTobe/pushMsg/


版权声明:本文为qq_40733432原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。