即
Guarded Suspension
,用在一个线程等待另一个线程的执行结果
要点
有一个结果需要从一个线程传递到另一个线程,让他们关联同一个 GuardedObject
如果有结果不断从一个线程到另一个线程那么可以使用消息队列(见生产者/消费者)
JDK 中,join 的实现、Future 的实现,采用的就是此模式 因为要等待另一方的结果,因此归类到同步模式@Slf4j(topic = "c.TestGuardedObject") public class TestGuardedObject { public static void main(String[] args) { GuardedObject guardedObject = new GuardedObject(); new Thread(() -> { try { List<String> response = download(); log.debug("download complete..."); guardedObject.complete(response); } catch (IOException e) { e.printStackTrace(); } }).start(); log.debug("waiting..."); Object response = guardedObject.get(); log.debug("get response: [{}] lines", ((List<String>) response).size()); } } class GuardedObject { private Object response; private final Object lock = new Object(); public Object get() { synchronized (lock) { // 条件不满足则等待 while (response == null) { try { lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } return response; } } public void complete(Object response) { synchronized (lock) { // 条件满足,通知等待线程 this.response = response; lock.notifyAll(); } } }
输出结果
08:42:18.568 [main] c.TestGuardedObject - waiting... 08:42:23.312 [Thread-0] c.TestGuardedObject - download complete... 08:42:23.312 [main] c.TestGuardedObject - get response: [3] lines
添加超时处理
/** * 添加超时处理 */ @Slf4j(topic = "c.GuardedObjectV2") class GuardedObjectV2 { private Object response; private final Object lock = new Object(); public Object get(long millis) { synchronized (lock) { // 1) 记录最初时间 long last = System.currentTimeMillis(); // 2) 已经经历的时间 long timePassed = 0; while (response == null) { // 4) 假设 millis 是 1000,结果在 400 时唤醒了,那么还有 600 要等 long waitTime = millis - timePassed; log.debug("waitTime: {}", waitTime); if (waitTime <= 0) { log.debug("break..."); break; } try { lock.wait(waitTime); } catch (InterruptedException e) { e.printStackTrace(); } // 3) 如果提前被唤醒,这时已经经历的时间假设为 400 timePassed = System.currentTimeMillis() - last; log.debug("timePassed: {}, object is null {}", timePassed, response == null); } return response; } } public void complete(Object response) { synchronized (lock) { // 条件满足,通知等待线程 this.response = response; log.debug("notify..."); lock.notifyAll(); } } }
测试,没有超时
@Slf4j(topic = "c.TestGuardedObjectV2") public class TestGuardedObjectV2 { public static void main(String[] args) { GuardedObjectV2 v2 = new GuardedObjectV2(); new Thread(() -> { sleep(1); v2.complete(null); sleep(1); v2.complete(Arrays.asList("a", "b", "c")); }).start(); Object response = v2.get(2500); if (response != null) { log.debug("get response: [{}] lines", ((List<String>) response).size()); } else { log.debug("can't get response"); } } }
输出结果
08:49:39.917 [main] c.GuardedObjectV2 - waitTime: 2500 08:49:40.917 [Thread-0] c.GuardedObjectV2 - notify... 08:49:40.917 [main] c.GuardedObjectV2 - timePassed: 1003, object is null true 08:49:40.917 [main] c.GuardedObjectV2 - waitTime: 1497 08:49:41.918 [Thread-0] c.GuardedObjectV2 - notify... 08:49:41.918 [main] c.GuardedObjectV2 - timePassed: 2004, object is null false 08:49:41.918 [main] c.TestGuardedObjectV2 - get response: [3] lines
测试,超时
// 等待时间不足 List<String> lines = v2.get(1500);
08:47:54.963 [main] c.GuardedObjectV2 - waitTime: 1500 08:47:55.963 [Thread-0] c.GuardedObjectV2 - notify... 08:47:55.963 [main] c.GuardedObjectV2 - timePassed: 1002, object is null true 08:47:55.963 [main] c.GuardedObjectV2 - waitTime: 498 08:47:56.461 [main] c.GuardedObjectV2 - timePassed: 1500, object is null true 08:47:56.461 [main] c.GuardedObjectV2 - waitTime: 0 08:47:56.461 [main] c.GuardedObjectV2 - break... 08:47:56.461 [main] c.TestGuardedObjectV2 - can't get response 08:47:56.963 [Thread-0] c.GuardedObjectV2 - notify...
多任务版 GuardedObject
图中 Futures 就好比居民楼一层的信箱(每个信箱有房间编号),左侧的 t0,t2,t4 就好比等待邮件的居民,右侧的 t1,t3,t5 就好比邮递员
如果需要在多个类之间使用 GuardedObject 对象,作为参数传递不是很方便,因此设计一个用来解耦的中间类,这样不仅能够解耦【结果等待者】和【结果生产者】,还能够同时支持多个任务的管理
新增 id 用来标识 Guarded Object
/** * 添加多任务处理 */ class GuardedObjectV3 { private int id; private Object response; private final Object lock = new Object(); public GuardedObjectV3(int id) { this.id = id; } public int getId() { return id; } public Object get() { synchronized (lock) { while (response == null) { try { lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } return response; } } public void complete(Object response) { synchronized (lock) { this.response = response; lock.notifyAll(); } } }
中间解耦类
class Fetures { private static final ConcurrentHashMap<Integer, GuardedObjectV3> FETURES = new ConcurrentHashMap<>(); private static final AtomicInteger ID_GENERATOR = new AtomicInteger(); public static GuardedObjectV3 createFeture() { // 为每个 GuardedObject 分配一个 id int id = ID_GENERATOR.incrementAndGet(); GuardedObjectV3 v3 = new GuardedObjectV3(id); // 放入公共位置,将来异步响应返回时,根据编号获取 FETURES.put(id, v3); return v3; } public static void complete(int id, Object response) { // 异步响应完成,根据编号获取并移除 GuardedObjectV3 v3 = FETURES.remove(id); if (v3 != null) { v3.complete(response); } } }
@Slf4j(topic = "c.TestGuardedObjectV3") public class TestGuardedObjectV3 { public static void main(String[] args) { for (int i = 0; i < 3; i++) { GuardedObjectV3 v3 = Fetures.createFeture(); new Thread(() -> { log.debug("waiting id({})...", v3.getId()); log.debug("get response id({}): [{}] lines", v3.getId(), ((List<String>) v3.get()).size()); }).start(); new Thread(() -> { try { List<String> lines = download(); log.debug("download complete id({})...", v3.getId()); Fetures.complete(v3.getId(), lines); } catch (IOException e) { e.printStackTrace(); } }).start(); } } }
08:24:36.998 c.TestGuardedObjectV3 [Thread-4] - waiting id(3)... 08:24:36.998 c.TestGuardedObjectV3 [Thread-2] - waiting id(2)... 08:24:36.998 c.TestGuardedObjectV3 [Thread-0] - waiting id(1)... 08:24:38.082 c.TestGuardedObjectV3 [Thread-3] - download complete id(2)... 08:24:38.082 c.TestGuardedObjectV3 [Thread-1] - download complete id(1)... 08:24:38.082 c.TestGuardedObjectV3 [Thread-5] - download complete id(3)... 08:24:38.082 c.TestGuardedObjectV3 [Thread-2] - get response id(2): [3] lines 08:24:38.082 c.TestGuardedObjectV3 [Thread-0] - get response id(1): [3] lines 08:24:38.082 c.TestGuardedObjectV3 [Thread-4] - get response id(3): [3] lines