Java并发容器和框架

主要介绍ConcurrentHashMap、ConcurrentLinkedQueue、阻塞队列、Fork/Join框架

ConcurrentHashMap

ConcurrentLinkedQueue

阻塞队列

阻塞队列是一个支持两个附加操作的队列。这两个附加的操作支持阻塞地插入和移除方法。

  1. 支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满。
  2. 支持阻塞地移除方法:意思是当队列空时,获取元素的线程会等待队列变空。

接口方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public interface BlockingQueue<E> extends Queue<E> {
boolean add(E e);

boolean offer(E e);

void put(E e) throws InterruptedException;

boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;

E take() throws InterruptedException;

E poll(long timeout, TimeUnit unit)
throws InterruptedException;

int remainingCapacity();

boolean remove(Object o);

public boolean contains(Object o);

int drainTo(Collection<? super E> c);

int drainTo(Collection<? super E> c, int maxElements);
}
方法/处理方式 抛出异常 返回特殊值 一直阻塞 超时退出
插入 add(e) offer(e) put(e) offer(e, time, unit)
移除 remove() poll () take() poll(time, unit)
检查 element() peek() 不可用 不可用

实现原理

使用通知模式实现。
以ArrayBlockingQueue为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/** Main lock guarding all access */
final ReentrantLock lock;

/** Condition for waiting takes */
private final Condition notEmpty;

/** Condition for waiting puts */
private final Condition notFull;

public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}

public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}

队列

  • ArrayBlockingQueue
  • LinkedBlockingQueue
  • PriorityBlockingQueue
  • DelayQueue
  • DelayedWorkQueue
  • SynchronousQueue
  • LinkedTransferQueue
  • LinkedBlockingDeque

https://javadoop.com/post/java-concurrent-queue

Fork/Join框架