并发常用工具

 

AQS

AQS

ReentrantLock

ReentrantLock原理

ReentrantReadWriteLock

ReentrantReadWriteLock原理

StampedLock

StampedLock

Semaphore

Semaphore

CountdownLatch

用来进行线程同步协作,等待所有线程完成倒计时。
其中构造参数用来初始化等待计数值,await() 用来等待计数归零,countDown() 用来让计数减一
@Slf4j(topic = "c.TestCountDownLatch") public class TestCountDownLatch { public static void main(String[] args) throws InterruptedException, ExecutionException { test3(); } private static void test5() { CountDownLatch latch = new CountDownLatch(3); ExecutorService service = Executors.newFixedThreadPool(4); service.submit(() -> { log.debug("begin..."); sleep(1); latch.countDown(); log.debug("end...{}", latch.getCount()); }); service.submit(() -> { log.debug("begin..."); sleep(1.5); latch.countDown(); log.debug("end...{}", latch.getCount()); }); service.submit(() -> { log.debug("begin..."); sleep(2); latch.countDown(); log.debug("end...{}", latch.getCount()); }); service.submit(()->{ try { log.debug("waiting..."); latch.await(); log.debug("wait end..."); } catch (InterruptedException e) { e.printStackTrace(); } }); } private static void test4() throws InterruptedException { CountDownLatch latch = new CountDownLatch(3); new Thread(() -> { log.debug("begin..."); sleep(1); latch.countDown(); log.debug("end...{}", latch.getCount()); }).start(); new Thread(() -> { log.debug("begin..."); sleep(2); latch.countDown(); log.debug("end...{}", latch.getCount()); }).start(); new Thread(() -> { log.debug("begin..."); sleep(1.5); latch.countDown(); log.debug("end...{}", latch.getCount()); }).start(); log.debug("waiting..."); latch.await(); log.debug("wait end..."); } private static void test3() throws InterruptedException, ExecutionException { RestTemplate restTemplate = new RestTemplate(); log.debug("begin"); ExecutorService service = Executors.newCachedThreadPool(); CountDownLatch latch = new CountDownLatch(4); Future<Map<String,Object>> f1 = service.submit(() -> { Map<String, Object> response = restTemplate.getForObject("http://localhost:8080/order/{1}", Map.class, 1); return response; }); Future<Map<String, Object>> f2 = service.submit(() -> { Map<String, Object> response1 = restTemplate.getForObject("http://localhost:8080/product/{1}", Map.class, 1); return response1; }); Future<Map<String, Object>> f3 = service.submit(() -> { Map<String, Object> response1 = restTemplate.getForObject("http://localhost:8080/product/{1}", Map.class, 2); return response1; }); Future<Map<String, Object>> f4 = service.submit(() -> { Map<String, Object> response3 = restTemplate.getForObject("http://localhost:8080/logistics/{1}", Map.class, 1); return response3; }); System.out.println(f1.get()); System.out.println(f2.get()); System.out.println(f3.get()); System.out.println(f4.get()); log.debug("执行完毕"); service.shutdown(); } private static void test2() throws InterruptedException { AtomicInteger num = new AtomicInteger(0); ExecutorService service = Executors.newFixedThreadPool(10, (r) -> { return new Thread(r, "t" + num.getAndIncrement()); }); CountDownLatch latch = new CountDownLatch(10); String[] all = new String[10]; Random r = new Random(); for (int j = 0; j < 10; j++) { int x = j; service.submit(() -> { for (int i = 0; i <= 100; i++) { try { Thread.sleep(r.nextInt(100)); } catch (InterruptedException e) { } all[x] = Thread.currentThread().getName() + "(" + (i + "%") + ")"; System.out.print("\r" + Arrays.toString(all)); } latch.countDown(); }); } latch.await(); System.out.println("\n游戏开始..."); service.shutdown(); } }
 

CyclicBarrier

[ˈsaɪklɪk ˈbæriɚ] 循环栅栏,用来进行线程协作,等待线程满足某个计数。
构造时设置『计数个数』,每个线程执行到某个需要“同步”的时刻调用 await() 方法进行等待,当等待的线程数满足『计数个数』时,继续执行
@Slf4j(topic = "c.TestCyclicBarrier") public class TestCyclicBarrier { public static void main(String[] args) { ExecutorService service = Executors.newFixedThreadPool(3); CyclicBarrier barrier = new CyclicBarrier(2, ()-> { log.debug("task1, task2 finish..."); }); for (int i = 0; i < 3; i++) { // task1 task2 task1 service.submit(() -> { log.debug("task1 begin..."); sleep(1); try { barrier.await(); // 2-1=1 } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }); service.submit(() -> { log.debug("task2 begin..."); sleep(2); try { barrier.await(); // 1-1=0 } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }); } service.shutdown(); } private static void test1() { ExecutorService service = Executors.newFixedThreadPool(5); for (int i = 0; i < 3; i++) { CountDownLatch latch = new CountDownLatch(2); service.submit(() -> { log.debug("task1 start..."); sleep(1); latch.countDown(); }); service.submit(() -> { log.debug("task2 start..."); sleep(2); latch.countDown(); }); try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("task1 task2 finish..."); } service.shutdown(); } }
注意 CyclicBarrier 与 CountDownLatch 的主要区别在于 CyclicBarrier 是可以重用的 CyclicBarrier 可以被比喻为『人满发车』
22:00:38.553 c.TestCyclicBarrier [pool-1-thread-2] - task2 begin... 22:00:38.553 c.TestCyclicBarrier [pool-1-thread-1] - task1 begin... 22:00:38.553 c.TestCyclicBarrier [pool-1-thread-3] - task1 begin... 22:00:39.563 c.TestCyclicBarrier [pool-1-thread-3] - task1, task2 finish... 22:00:39.563 c.TestCyclicBarrier [pool-1-thread-3] - task2 begin... 22:00:39.563 c.TestCyclicBarrier [pool-1-thread-1] - task1 begin... 22:00:40.563 c.TestCyclicBarrier [pool-1-thread-1] - task1, task2 finish... 22:00:40.563 c.TestCyclicBarrier [pool-1-thread-1] - task2 begin... 22:00:42.564 c.TestCyclicBarrier [pool-1-thread-1] - task1, task2 finish...
线程数要和栅栏数保持一致
 

UnSafe

Unsafe类