Java并发 | 23.[设计模式] 保护性暂停

1. 保护性暂停——join、Future、FutureTask的核心

保护性暂停模式是让一个线程等待另一个线程的结果。java中的join、Future、FutureTask均采用了该模式实现。

img

2. 实现代码

2.1. 代码

核心类 GuardedObject.java

/**
 * @author xyx-Eshang
 */
public class GuardedObject {
    private Object response;

    // 获取结果
    public synchronized Object get(long outTime){
        long startTime = System.currentTimeMillis();

        // while防止虚假唤醒
        while (response == null){
            long passTime = System.currentTimeMillis() - startTime;
            // 仍需等待的时间
            long waitTime = outTime - passTime;

            // 超时 - 跳出while循环,直接返回空结果
            if (passTime >= outTime){
                break;
            }

            try{
                // 等待waitTime,因为期间有可能会被虚假唤醒,等待时间需要重新计算
                this.wait(waitTime);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        // 得到结果后
        return response;
    }

    // 设置结果
    public synchronized void complete(Object response){
        this.response = response;
        this.notifyAll();
    }
}

入口 Main.java

import lombok.extern.slf4j.Slf4j;

/**
 * @author xyx-Eshang
 */
@Slf4j(topic=" ")
public class Main {
    private static void baseFunction(){
        GuardedObject guardedObject = new GuardedObject();
        // 线程1:获取结果
        new Thread(() -> {
            Object response = null;
            synchronized (guardedObject){
                while (response == null){
                    log.debug("等待获取结果");
                    response = guardedObject.get(3 * 1000);
                    if (response == null){
                        log.debug("等待超时,即将重试");
                    }
                }
                // 成功获取结果后
                log.debug("成功获取结果response: " + response);
            }
        }, "t1").start();


        // 线程2:7s后设置结果
        new Thread(() -> {
            synchronized (guardedObject) {
                log.debug("正在获取结果,预计还需7s...");
                try {
                    guardedObject.wait(7 * 1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                guardedObject.complete(new Object());
            }
        }, "t2").start();
    }

    public static void main(String[] args) {
        baseFunction();
    }
}

image-20220906004652480

2.2. 关键点讲解

2.2.1. while( )循环条件

while循环的条件是response != null。正处于 TIMED_WAITING 的线程被唤醒后重新进入 while 循环:

  • [接收到response后被notifyAll( )唤醒]当 response 被成功赋值,此时就应该将waitSet中的所有线程唤醒,因此调用了notifyAll( )。若线程在检测到自己的 response 接收到之后,就会跳出 while 循环;而没有接收到 response 的线程将重新进入 while 循环,并重新进入 TIMED_WAITING 状态;

    while (response == null){
        // code here...
        try {
            this.wait(waitTime);	// 每次进入while循环,waitTime都会重新计算
        } catch (InterruptedException e){
            e.printStackTrace();
        }
    }
    return response;
    
    this.response = response;
    this.notifyAll();
    
  • [超出时间被唤醒]若线程是因超出了等待时间而被唤醒,那仍然满足 while 条件,但会进入if ( )并通过 break 跳出循环。

    long passTime = System.currentTimeMillis - startTime;	// 已过去的时间
    long waitTime = outTime - passTime;		// 仍需等待的时间
    if (waitTime <= 0){
        break;
    }
    

2.2.2. 等待时间

等待时间 waitTime 并不是固定的,因为线程有可能会被「虚假唤醒」,因此每次进入 while 循环都需要重新计算:

long startTime = System.currentTimeMillis();		// 进入方法后获取初始时间

while (response == null){
    long passTime = System.currentTimeMillis - startTime;	// 已过去的时间
    long waitTime = outTime - passTime;		// 仍需等待的时间
    if (waitTime <= 0){
        break;
    }
    try {
        this.wait(waitTime);	// waitTime不是固定不变的,因为有可能会被虚假唤醒
    } catch (InterruptedException e){
        e.printStackTrace();
    }    
}

2.2.3. Main中锁住的对象必须是 guardedObject

3. join( )源码

public final synchronized void join(long millis)
    throws InterruptedException {
    long base = System.currentTimeMillis();
    long now = 0;

    if (millis < 0) {
        throw new IllegalArgumentException("timeout value is negative");
    }
    // 无参的join方法会调用join(0),从而进入该分支
    if (millis == 0) {
        // 判断线程是否还存活,存活则调用wait等待
        while (isAlive()) {
            wait(0);
        }
    } else {
        // 判断线程是否还存活
        while (isAlive()) {
            // 计算剩余时间
            long delay = millis - now;
            // <=0 表示join等待已经超时,退出等待
            if (delay <= 0) {
                break;
            }
            // 使用带有时间的wait方法等待
            wait(delay);
            // 计算已过去的时间
            now = System.currentTimeMillis() - base;
        }
    }
}

参考文章

[文章] 同步模式之保护性暂停


版权声明:本文为xyxyxyxyxyxyx原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。