常用并发容器

线程安全集合类概述

notion image
线程安全集合类可以分为三大类:
  1. 遗留的线程安全集合如 Hashtable , Vector
  1. 使用 Collections 装饰的线程安全集合,如:
      • Collections.synchronizedCollection
      • Collections.synchronizedList
      • Collections.synchronizedMap
      • Collections.synchronizedSet
      • Collections.synchronizedNavigableMap
      • Collections.synchronizedNavigableSet
      • Collections.synchronizedSortedMap
      • Collections.synchronizedSortedSet
  1. java.util.concurrent.*
 
java.util.concurrent.* 下的线程安全集合类,可以发现它们有规律,里面包含三类关键词:Blocking、CopyOnWrite、Concurrent
  1. Blocking 大部分实现基于锁,并提供用来阻塞的方法
  1. CopyOnWrite 之类容器修改开销相对较重
  1. Concurrent 类型的容器
    1. 内部很多操作使用 cas 优化,一般可以提供较高吞吐量
    2. 弱一致性
        • 遍历时弱一致性,例如,当利用迭代器遍历时,如果容器发生修改,迭代器仍然可以继续进行遍历,这时内容是旧的
        • 求大小弱一致性,size 操作未必是 100% 准确
        • 读取弱一致性
        遍历时如果发生了修改,对于非安全容器来讲,使用 fail-fast 机制也就是让遍历立刻失败,抛出ConcurrentModificationException,不再继续遍历
         

LinkedBlockingQueue

public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { static class Node<E> { E item; /** * 下列三种情况之一 * - 真正的后继节点 * - 自己, 发生在出队时 * - null, 表示是没有后继节点, 是最后了 */ Node<E> next; Node(E x) { item = x; } } }
 
初始化链表 last = head = new Node<E>(null);
Dummy 结点用来占位,item 为 null
notion image

入队和出队

当一个节点入队 last = last.next = node;
notion image
再来一个节点入队 last = last.next = node;
notion image
 
Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC head = first; E x = first.item; first.item = null; return x;
h = head
notion image
first = h.next
notion image
h.next = h
notion image
head = first
notion image
E x = first.item; first.item = null; return x;

加锁分析

高明之处在于用了两把锁和 dummy 节点
  • 用一把锁,同一时刻,最多只允许有一个线程(生产者或消费者,二选一)执行
  • 用两把锁,同一时刻,可以允许两个线程同时(一个生产者与一个消费者)执行
    • 消费者与消费者线程仍然串行
    • 生产者与生产者线程仍然串行
 
线程安全分析
  • 当节点总数大于 2 时(包括 dummy 节点),putLock 保证的是 last 节点的线程安全,takeLock 保证的是head 节点的线程安全。两把锁保证了入队和出队没有竞争
  • 当节点总数等于 2 时(即一个 dummy 节点,一个正常节点)这时候,仍然是两把锁锁两个对象,不会竞争
  • 当节点总数等于 1 时(就一个 dummy 节点)这时 take 线程会被 notEmpty 条件阻塞,有竞争,会阻塞
// 用于 put(阻塞) offer(非阻塞) private final ReentrantLock putLock = new ReentrantLock(); // 用户 take(阻塞) poll(非阻塞) private final ReentrantLock takeLock = new ReentrantLock();

源码分析

put 操作

public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; // count 用来维护元素计数 final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { // 满了等待 while (count.get() == capacity) { // 倒过来读就好: 等待 notFull notFull.await(); } // 有空位, 入队且计数加一 enqueue(node); c = count.getAndIncrement(); // 除了自己 put 以外, 队列还有空位, 由自己叫醒其他 put 线程 if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } // 如果队列中有一个元素, 叫醒 take 线程 if (c == 0) // 这里调用的是 notEmpty.signal() 而不是 notEmpty.signalAll() 是为了减少竞争 signalNotEmpty(); }

take 操作

public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } // 如果队列中只有一个空位时, 叫醒 put 线程 // 如果有多个线程进行出队, 第一个线程满足 c == capacity, 但后续线程 c < capacity if (c == capacity) // 这里调用的是 notFull.signal() 而不是 notFull.signalAll() 是为了减少竞争 signalNotFull() return x; }

性能比较

主要列举 LinkedBlockingQueue 与 ArrayBlockingQueue 的性能比较
  • Linked 支持有界,Array 强制有界
  • Linked 实现是链表,Array 实现是数组
  • Linked 是懒惰的,而 Array 需要提前初始化 Node 数组
  • Linked 每次入队会生成新 Node,而 Array 的 Node 是提前创建好的
  • Linked 两把锁,Array 一把锁

ConcurrentLinkedQueue

ConcurrentLinkedQueue 的设计与 LinkedBlockingQueue 非常像,
  • 也是两把【锁】,同一时刻,可以允许两个线程同时(一个生产者与一个消费者)执行
  • dummy 节点的引入让两把【锁】将来锁住的是不同对象,避免竞争
  • 只是这【锁】使用了 cas 来实现。
事实上,ConcurrentLinkedQueue 应用还是非常广泛的
例如之前讲的 Tomcat 的 Connector 结构时,Acceptor 作为生产者向 Poller 消费者传递事件信息时,正是采用了ConcurrentLinkedQueue 将SocketChannel 给 Poller 使用
notion image
自定义实现
public class TestV1 { public static void main(String[] args) { MyQueue<String> queue = new MyQueue<>(); queue.offer("1"); queue.offer("2"); queue.offer("3"); System.out.println(queue); } } class MyQueue<E> implements Queue<E> { @Override public String toString() { StringBuilder sb = new StringBuilder(); for (Node<E> p = head; p != null; p = p.next.get()) { E item = p.item; if (item != null) { sb.append(item).append("->"); } } sb.append("null"); return sb.toString(); } public MyQueue() { head = last = new Node<>(null, null); } private volatile Node<E> last; private volatile Node<E> head; private E dequeue() { /*Node<E> h = head; Node<E> first = h.next; h.next = h; head = first; E x = first.item; first.item = null; return x;*/ return null; } @Override public boolean offer(E e) { Node<E> n = new Node<>(e, null); while(true) { // 获取尾节点 AtomicReference<Node<E>> next = last.next; // S1: 真正尾节点的 next 是 null, cas 从 null 到新节点 if(next.compareAndSet(null, n)) { // 这时的 last 已经是倒数第二, next 不为空了, 其它线程的 cas 肯定失败 // S2: 更新 last 为倒数第一的节点 last = n; return true; } } } static class Node<E> { volatile E item; public Node(E item, Node<E> next) { this.item = item; this.next = new AtomicReference<>(next); } AtomicReference<Node<E>> next; } }

CopyOnWriteArrayList

CopyOnWriteArraySet 是它的马甲 底层实现采用了 写入时拷贝 的思想,增删改操作会将底层数组拷贝一份,更改操作在新数组上执行,这时不影响其它线程的并发读,读写分离。
 
以新增为例:
public boolean add(E e) { synchronized (lock) { // 获取旧的数组 Object[] es = getArray(); int len = es.length; // 拷贝新的数组(这里是比较耗时的操作,但不影响其它读线程) es = Arrays.copyOf(es, len + 1); // 添加新元素 es[len] = e; // 替换旧的数组 setArray(es); return true; } }
这里的源码版本是 Java 11,在 Java 1.8 中使用的是可重入锁而不是 synchronized
 
其它读操作并未加锁,例如:
public void forEach(Consumer<? super E> action) { Objects.requireNonNull(action); for (Object x : getArray()) { @SuppressWarnings("unchecked") E e = (E) x; action.accept(e); } }
适合『读多写少』的应用场景
notion image
notion image

get 弱一致性

迭代器弱一致性

CopyOnWriteArrayList<Integer> list = new CopyOnWriteArrayList<>(); list.add(1); list.add(2); list.add(3); Iterator<Integer> iter = list.iterator(); new Thread(() -> { list.remove(0); System.out.println(list); }).start(); sleep1s(); while (iter.hasNext()) { System.out.println(iter.next()); }
不要觉得弱一致性就不好
数据库的 MVCC 都是弱一致性的表现
并发高和一致性是矛盾的,需要权衡

ConcurrentHashMap 原理

ConcurrentHashMap 原理
 

ConcurrentSkipListMap

跳表的实现;这是一个Map,使用跳表的数据结构进行快速查找;

ConcurrentSkipListSet

CopyOnWriteArraySet

 

线程安全的队列

阻塞队列

BlockingQueue接口:表示阻塞队列,常用于生产者——消费者模式,他有如下的实现类:
ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列,遵循FIFO原则。 LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列,遵循FIFO原则,默认和最大长度为Integer.MAX_VALUE。 PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。 DelayQueue:一个使用优先级队列实现的无界阻塞队列。 SynchronousQueue:一个不存储元素的阻塞队列。 LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。

非阻塞队列

ConcurrentLinkedQueue

高效的并发队列,使用链表实现,可以看做一个线程安全的LinkedList;