队列总结(八)LinkedTransferQueue

LinkedTransferQueue

无界阻塞队列LinkedTransferQueue,此队列也是基于链表实现,对于所有给定的元素都是先入先出的。LinkedTransferQueue可以算是 LinkedBolckingQueue 和 SynchronousQueue 的合体。SynchronousQueue 内部无法存储元素,当要添加元素的时候,需要阻塞。LinkedBolckingQueue 则内部使用了大量的锁,性能有所下降。

public class LinkedTransferQueue<E> extends AbstractQueue<E>
    implements TransferQueue<E>, java.io.Serializable {

TransferQueue接口

public interface TransferQueue<E> extends BlockingQueue<E> {
    // 如果可能,立即将元素转移给等待的消费者。 
    // 更确切地说,如果存在消费者已经等待接收它(在 take 或 timed poll(long,TimeUnit)poll)中,则立即传送指定的元素,否则返回 false。
    boolean tryTransfer(E e);

    // 将元素转移给消费者,如果需要的话等待。 
    // 更准确地说,如果存在一个消费者已经等待接收它(在 take 或timed poll(long,TimeUnit)poll)中,则立即传送指定的元素,否则等待直到元素由消费者接收。
    void transfer(E e) throws InterruptedException;

    // 上面方法的基础上设置超时时间
    boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException;

    // 如果至少有一位消费者在等待,则返回 true
    boolean hasWaitingConsumer();

    // 返回等待消费者人数的估计值
    int getWaitingConsumerCount();
}

TransferQueue是JDK7才定义的一个接口,继承BlockingQueue 。通常阻塞队里,生产者放入元素,消费者使用元素,只会考虑当前队列的容量,生产者生产时不需要考虑是否有消费者消费其产品。TransferQueue定义了transfer方法,阻塞生产直至等待消费者调用take或poll接收元素。

xfer方法

在LinkedTransferQueue中,所有添加元素的方法,都是通过 xfer方法来实现的。

  private E xfer(E e, boolean haveData, int how, long nanos) {
  		// 不允许空元素
        if (haveData && (e == null))
            throw new NullPointerException();
        Node s = null;                        // the node to append, if needed

        retry:
        for (;;) {                            // restart on append race

            for (Node h = head, p = h; p != null;) { // find & match first node
                boolean isData = p.isData;
                Object item = p.item;
                if (item != p && (item != null) == isData) { // unmatched
                    if (isData == haveData)   // can't match
                        break;
                    if (p.casItem(item, e)) { // match
                        for (Node q = p; q != h;) {
                            Node n = q.next;  // update by 2 unless singleton
                            if (head == h && casHead(h, n == null ? q : n)) {
                                h.forgetNext();
                                break;
                            }                 // advance and retry
                            if ((h = head)   == null ||
                                (q = h.next) == null || !q.isMatched())
                                break;        // unless slack < 2
                        }
                        LockSupport.unpark(p.waiter);
                        return LinkedTransferQueue.<E>cast(item);
                    }
                }
                Node n = p.next;
                p = (p != n) ? n : (h = head); // Use head if p offlist
            }

            if (how != NOW) {                 // No matches available
                if (s == null)
                    s = new Node(e, haveData);
                Node pred = tryAppend(s, haveData);
                if (pred == null)
                    continue retry;           // lost race vs opposite mode
                if (how != ASYNC)      // 阻塞发生在awaitMatch中
                    return awaitMatch(s, pred, e, (how == TIMED), nanos);
            }
            return e; // not waiting
        }
    }
  /*
   * Possible values for "how" argument in xfer method.
   */
  private static final int NOW   = 0; // for untimed poll, tryTransfer
  private static final int ASYNC = 1; // for offer, put, add
  private static final int SYNC  = 2; // for transfer, take
  private static final int TIMED = 3; // for timed poll, tryTransfer

	// xfer方法中的部分代码
     if (how != NOW) {                 // No matches available
         if (s == null)
             s = new Node(e, haveData);
         Node pred = tryAppend(s, haveData);
         if (pred == null)
             continue retry;           // lost race vs opposite mode
         if (how != ASYNC)  // 阻塞发生在这里
             return awaitMatch(s, pred, e, (how == TIMED), nanos);
     }
     return e; // not waiting

其中put,offer,add方法符合接口BlockingQueue的规范,参数how都是ASYNC,表示异步,指的是生产者与消费者是异步,生产者不需要等待消费者,只需要考虑队列容量。又由于队列是无界的,所以都不会阻塞

  public void put(E e) {
        xfer(e, true, ASYNC, 0);
    }

    public boolean offer(E e, long timeout, TimeUnit unit) {
        xfer(e, true, ASYNC, 0);
        return true;
    }

    public boolean offer(E e) {
        xfer(e, true, ASYNC, 0);
        return true;
    }
    
    public boolean add(E e) {
        xfer(e, true, ASYNC, 0);
        return true;
    }

不指定阻塞时间的transfer方法以及take方法,参数how为SYNC,表示同步,take方法语义符合接口BlockingQueue的规范,一直阻塞直至获取到元素,transfer方法表示生产者必须在有消费者等待消费的情况下才可以进行生产,否则会一直阻塞

    public void transfer(E e) throws InterruptedException {
        if (xfer(e, true, SYNC, 0) != null) {
            Thread.interrupted(); // failure possible only due to interrupt
            throw new InterruptedException();
        }
    }

    public E take() throws InterruptedException {
        E e = xfer(null, false, SYNC, 0);
        if (e != null)
            return e;
        Thread.interrupted();
        throw new InterruptedException();
    }

不指定时间的tryTransfer方法以及poll方法,参数how为NOW,tryTransfer尝试插入元素,假如此时无消费者在等待,直接返回false,插入失败,不会阻塞

   public E poll() {
       return xfer(null, false, NOW, 0);
   }

	public boolean tryTransfer(E e) {
	    return xfer(e, true, NOW, 0) == null;
	}

指定时间的tryTransfer方法以及poll方法,参数how为TIMED,若成功提前返回,若失败,最多阻塞指定时间。

    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        E e = xfer(null, false, TIMED, unit.toNanos(timeout));
        if (e != null || !Thread.interrupted())
            return e;
        throw new InterruptedException();
    }
    
   public boolean tryTransfer(E e, long timeout, TimeUnit unit)
       throws InterruptedException {
       if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null)
           return true;
       if (!Thread.interrupted())
           return false;
       throw new InterruptedException();
   }

数据结构

LinkedTransferQueue的数据结构采用的是双重队列Dual Queues,此队列的节点为Node

 static final class Node {
        final boolean isData;   // false if this is a request node
        volatile Object item;   // initially non-null if isData; CASed to match
        volatile Node next;
        volatile Thread waiter; // null until waiting
        ...
}

队列中的节点分为两种类型,每个节点只对应一种类型的操作,要么是put类型(isData为true),要么是take类型(isData为false)

在dual队列中,节点需要自动维护匹配状态。所以这里需要一些必要的变量:对于数据模式,匹配需要将一个item字段通过CAS从非null的数据转成null,反之对于请求模式,需要从null变成data。一旦一个节点匹配了,其状态将不再改变。

当一个线程试图添加一个数据节点,正好遇到一个请求数据的结点,会立刻匹配并移除这两个数据节点,反之亦然。
在这里插入图片描述
关于节点匹配及头结点更新的逻辑,有些复杂,暂时不深入,贴上相关参考文章
https://www.cnblogs.com/lighten/p/7505355.html

对比SynchronousQueue

相比较 SynchronousQueue 多了一个可以存储的队列,相比较 LinkedBlockingQueue 多了直接传递元素,少了用锁来同步。性能更高,用处更大。

LinkedTransferQueue是 SynchronousQueue 和 LinkedBlockingQueue 的合体,性能比 LinkedBlockingQueue 更高(没有锁操作),比 SynchronousQueue能存储更多的元素。

当 put 时,如果有等待的线程,就直接将元素 “交给” 等待者, 否则直接进入队列。

put和 transfer 方法的区别是,put 是立即返回的, transfer 是阻塞等待消费者拿到数据才返回。transfer方法和 SynchronousQueue的 put 方法类似。


版权声明:本文为java_lifeng原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。