Java线程池队列LinkedTransferQueue的详细原理分析-刘宇

CSDN博客地址:https://blog.csdn.net/liuyu973971883
文章来源:转载,原文地址:https://blog.csdn.net/weixin_41622183/article/details/89004174,感谢这位老哥的辛勤付出,写的非常棒,各位看完别忘了给这位老哥点个赞啊。如有侵权,请联系删除。

一、什么是LinkedTransferQueue?

  • LinkedTransferQueue是一个由链表结构组成的无界阻塞TransferQueue队列。
  • LinkedTransferQueue比其他阻塞队列多了tryTransfer和transfer方法。
  • LinkedTransferQueue采用一种预占模式。意思就是消费者线程取元素时,如果队列不为空,则直接取走数据,若队列为空,那就生成一个节点(节点元素为null)入队,然后消费者线程被等待在这个节点上,生产者调用transfer方法就不会将元素存入队列,而是直接传递给消费者
  • 如果调用transfer方法的生产者发现没有在等待的消费者,则会将元素入队,然后阻塞等待,直到有一个消费者来获取该元素

二、LinkedTransferQueue类的结构图

在这里插入图片描述

三、TransferQueue讲解

BlockingQueue这里接不过多的介绍了,还不知道的小伙伴可以看我前面的博客:点击查看详情,我们来看看TransferQueue接口提供了哪些方法

//该方法放入元素后,一定要被消费者消费后,线程才释放,否则会一直堵塞
void transfer(E e) throws InterruptedException;
/**
* tryTransfer 和上面的 transfer 方法相比,
* 该方入队元素后,无论是否消费都立即返回
* 如果没有消费者接收元素,则元素不入队,返回的是 false
*/
boolean tryTransfer(E e);
/**
* 该方法加入了时间等待,假设超过时间没有消费者线程接收
* 则元素不会入队,并返回false
*/
boolean tryTransfer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;
 /**
 *  判断是否有等待中的客户端线程
 */       
boolean hasWaitingConsumer();
/**
* 获取等待接收元素的消费者数量
*/
int getWaitingConsumerCount();

四、LinkedTransferQueue源码讲解

1、重要的字段

//获取处理器数量,判断是否是多个
private static final boolean MP = Runtime.getRuntime().availableProcessors() > 1;
//自旋次数,阻塞前的自旋次数(这里向左偏移,一定是 2 的 n 次方)
private static final int FRONT_SPINS   = 1 << 7;
//自旋次数,一样是为 2 的 n 次方
private static final int CHAINED_SPINS = FRONT_SPINS >>> 1;
//达到该阈值时关闭
static final int SWEEP_THRESHOLD = 32;

2、Node 内部类

isData 这个字段是判断是请求节点还是数据节点。
item 是我们存储的对象
next 指向下一个节点
waiter 等待的线程

static final class Node {
    final boolean isData;   
    volatile Object item;  
    volatile Node next;
    volatile Thread waiter;

    // CAS 方式将设置当前节点的 next 指向下一个节点
    final boolean casNext(Node cmp, Node val) {
        return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }
	//将当前节点的对象设为 val(传入的值对象)
    final boolean casItem(Object cmp, Object val) {
        return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
    }
    //不用说了吧。isData 上面有说
    Node(Object item, boolean isData) {
        UNSAFE.putObject(this, itemOffset, item); // relaxed write
        this.isData = isData;
    }

   	//将当前节点 next 指向自己
    final void forgetNext() {
        UNSAFE.putObject(this, nextOffset, this);
    }
    
    //将当前节点的 next 指向自己,并将等待的线程设置为空 
    final void forgetContents() {
        UNSAFE.putObject(this, itemOffset, this);
        UNSAFE.putObject(this, waiterOffset, null);
    }
    
	//判断是否匹配
    final boolean isMatched() {
        Object x = item;
        return (x == this) || ((x == null) == isData);
    }

 	//判断是否为未匹配的请求结点
    final boolean isUnmatchedRequest() {
        return !isData && item == null;
    }
    
	//校验当前节点
    final boolean cannotPrecede(boolean haveData) {
        boolean d = isData;
        Object x;
        return d != haveData && (x = item) != this && (x != null) == d;
    }

	//尝试匹配数据节点
    final boolean tryMatchData() {
        // assert isData;
        Object x = item;
        if (x != null && x != this && casItem(x, null)) {
            LockSupport.unpark(waiter);
            return true;
        }
        return false;
    }
}

3、重要的几个常量

private static final int NOW = 0;   // for untimed poll, tryTransfer
private static final int ASYNC = 1; // for offer, put, add
private static final int SYNC = 2; // for transfer, take
private static final int TIMED = 3; // for timed poll, tryTransfer
  • NOW:表示即时操作,不会对线程造成阻塞。但是有一点需要注意,NOW 常量是 tryTransfer 调用传入的,上面说过 tryTransfer 会可能会出现失败的情况(没有消费线程在等待下会失败),失败的情况下,元素是不会入队的。
  • ASYNC:表示异步操作,会往队尾加入元素,插入成功后悔返回 true。上面代码的注释也说明了哪些操作是异步的。注意一点:LinkedTransferQueue 是无界队列,属于不可控。
  • SYNC:表示同步入队,会造成线程阻塞。使用 transfer 方法时,如果没有一个消费线程来消费,那它会一直等待。take 消费线程也同理,只不过 take 操作是队列为空的时候,才会进行阻塞线程。
  • TIMED:表示 有限制时间的阻塞线程。

4、xfer方法

这个方法基本上每个入队,出队操作都是调用该方法,haveData 参数传入 true 或者 false,来判别是入队操作还是出队操作。

private E xfer(E e, boolean haveData, int how, long nanos) {
	//判断传入元素是否为空
    if (haveData && (e == null))
        throw new NullPointerException();
    Node s = null;                        // the node to append, if needed

    retry:
    for (;;) {                            // restart on append race
		//判断当前头结点是否为空,为空就跳过,执行下面的操作
        for (Node h = head, p = h; p != null;) { // find & match first node
        	//拿到这个节点,判断属于是数据节点还是请求节点
            boolean isData = p.isData;
            //拿到该节点的元素
            Object item = p.item;
            //匹配的情况下
            if (item != p && (item != null) == isData) { // unmatched
                if (isData == haveData)   // can't match
                    break;
                //匹配节点,会从队首一直往后,直到匹配上
                if (p.casItem(item, e)) { // match
                    for (Node q = p; q != h;) {
                        Node n = q.next;  // update by 2 unless singleton
                        if (head == h && casHead(h, n == null ? q : n)) {
                            h.forgetNext();
                            break;
                        }                 // advance and retry
                        if ((h = head)   == null ||
                            (q = h.next) == null || !q.isMatched())
                            break;        // unless slack < 2
                    }
                    //有阻塞的消费线程
                    LockSupport.unpark(p.waiter);
                    //将元素返回
                    return LinkedTransferQueue.<E>cast(item);
                }
            }
            Node n = p.next;
            p = (p != n) ? n : (h = head); // Use head if p offlist
        }
		//以下是入队的操作,下面会介绍几个常量
        if (how != NOW) {                 // No matches available
            if (s == null)
                s = new Node(e, haveData);
            //入队操作
            Node pred = tryAppend(s, haveData);
            //如果入队失败返回空,则重新再一次 CAS
            if (pred == null)
                continue retry;           // lost race vs opposite mode
            //线程阻塞
            if (how != ASYNC)
                return awaitMatch(s, pred, e, (how == TIMED), nanos);
        }
        return e; // not waiting
    }
}

5、入队的具体实现

入队操作有 3 种情况:

  • 队列中没有节点,也就是 head tail 为空,直接返回节点自己
  • 入队失败,这里没理解透(个人感觉是被其他线程取值后,isData 不匹配就直接返回入队失败),返回空
  • 执行入队操作,入队成功后发现队列中有多个节点,此时返回该成功入队节点的前置节点
private Node tryAppend(Node s, boolean haveData) {
	//
    for (Node t = tail, p = t;;) {        // move p to last node and append
        Node n, u;                        // temps for reads of next & tail

		/**
		* 分支一
		* 尾节点为空且头结点也为空的情况下,说明队列为空,就直接入队
		*/
        if (p == null && (p = head) == null) {   
            if (casHead(null, s))
                return s;                 // initialize
        }
        /**
        * 分支二  
        * 节点连接失败,入队失败 ,返回空重新进行 CAS
        */
        else if (p.cannotPrecede(haveData))      
            return null;                  // lost race vs opposite mode
        /**
        * 分支三
        * 从头遍历到尾
        */
        else if ((n = p.next) != null)   
            p = p != t && t != (u = tail) ? (t = u) : 
                (p != n) ? n : null;      // restart if off list
        /**
        * 分支四
        * 将节点放入队列中  
        */
        else if (!p.casNext(null, s))	//分支四		
            p = p.next;                   // re-read on CAS failure
        /**
        * 分支五
        * 松弛操作,这个下面讲
        */
        else {								
            if (p != t) {                 // update if slack now >= 2
                while ((tail != t || !casTail(t, s)) &&
                       (t = tail)   != null &&
                       (s = t.next) != null && // advance and retry
                       (s = s.next) != null && s != t);
            }
            return p;
        }
    }
}

6、入队图解

单单看代码,估计也是一脸懵逼,下面用图来解释 LinkedTransferQueue 中 transfer 方法的入队原理。

队列刚刚开始时:head 和 tail 都指向空。
在这里插入图片描述
接下来,A 线程执行 transfer 方法要入队。线程 A 进来后,发现队列是为空的,且没有阻塞的消费线程。这时,入队操作,走的是分支一,将元素入队后,将 head 指向新入队的节点,最后返回自身节点。操作后如下图:
在这里插入图片描述
看完上图,可能有部分疑惑,怎么会线程阻塞了?这个问题下面在进行讲解。
接下来,我们继续进行入队操作:
这时候线程 B 调用 transfer 方法进行入队。最后,线程会走分支四,将元素入队到队尾,接下来循环走到分支五,将 tail 指向插入的节点。
假设无法连接到前置,那表示入队失败,返回空。执行结果结果如下:
在这里插入图片描述

线程 C 在入队,方式和上述一致,就不分析了:
在这里插入图片描述
注:上述 tail 没有更新,这属于一种 松弛 的方式,我们知道 LinkedTransferQueue 队列是无锁队列,使用 CAS 来进行入队、出队操作,目的是为了节省 CAS 操作的开销。假设保证强的 tail 位置一致,那无锁队列可能还不如直接加个锁更快。这是一个设计上的权衡吧。

7、出队图解

LinkedTransferQueue 中 take 方法的出队原理。队列初始状态如下图:
在这里插入图片描述
线程 D 是消费线程,来从队列中取元素。下面代码便是从队列中取出元素的实现:
在这里插入图片描述
认真看一下,将队首节点的 item 设置为 null,并唤醒阻塞的线程 A (线程 A 在队首),然后返回设置在头节点的值对象。


紧接着,线程 A 被唤醒后,会继续执行。因为唤醒线程 A 前将 item 值置 null 了,所以返回的值为空。之后调用 forgetContents() 方法,将 item 指向自己。下面我们来看看一个方法 awaitMatch:

private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    Thread w = Thread.currentThread();
    int spins = -1; // initialized after first item and cancel checks
    ThreadLocalRandom randomYields = null; // bound if needed

    for (;;) {
    	//只看着一点,如果对自旋有兴趣可以自行研究
        Object item = s.item;
        //上面图解,item被置空,所以肯定不相等
        if (item != e) {                  // matched
            // assert item != s;
            s.forgetContents();           // avoid garbage
            return LinkedTransferQueue.<E>cast(item);
        }
        if ((w.isInterrupted() || (timed && nanos <= 0)) &&
                s.casItem(e, s)) {        // cancel
            unsplice(pred, s);
            return e;
        }

        if (spins < 0) {                  // establish spins at/near front
            if ((spins = spinsFor(pred, s.isData)) > 0)
                randomYields = ThreadLocalRandom.current();
        }
        else if (spins > 0) {             // spin
            --spins;
            if (randomYields.nextInt(CHAINED_SPINS) == 0)
                Thread.yield();           // occasionally yield
        }
        else if (s.waiter == null) {
            s.waiter = w;                 // request unpark then recheck
        }
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos > 0L)
                LockSupport.parkNanos(this, nanos);
        }
        else {
            LockSupport.park(this);
        }
    }
}

从如上代码可以看出,这个一个经典的 “锁优化” 的实现方式,自旋 -> yield -> 阻塞,我们的线程不会立即进入阻塞状态,而是要经过自旋一定次数后依旧没被消费才会进行线程阻塞。
所以,从队列中取出队首会经历如下几个过程:
将值置为 null。
在这里插入图片描述
将值设为自己本身:
在这里插入图片描述
接下来我们再次调用 take 方法时:
在这里插入图片描述
根据上图,我们可以看到, 原来的队首节点将的 next 自己指向自己,head 指向第三个节点,一次性跳了两个节点。 这就是 “松弛” 策略。上图线程被唤醒后进行的一系列操作和原来相同,不继续介绍,直接到最后结构:
在这里插入图片描述
可以看到 tail 还是指向那个自身的节点,没有切换。下面在来一条线程 G:
在这里插入图片描述
线程 G 也是消费线程,入队后发现队列空了,只好线程阻塞,等下入队。不过这里有一点注意:因为是有阻塞的消费线程,这时如果入队线程一来,会直接将元素交给等待的消费线程,而不是入队后,在唤醒消费线程取消费。