Java并发工具类

CountDownLatch

依赖于AbstractQueuedSynchronizer实现的共享同步器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;

Sync(int count) {
setState(count);
}

int getCount() {
return getState();
}

protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}

protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}

await

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}

// 当state不等于0,返回-1,则加入等待队列中
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}

countDown

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public void countDown() {
sync.releaseShared(1);
}

public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();// 唤醒等待节点
return true;
}
return false;
}

protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}

实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class Test {
public static void main(String[] args) {
final CountDownLatch latch = new CountDownLatch(2);

new Thread(){
public void run() {
try {
System.out.println("子线程"+Thread.currentThread().getName()+"正在执行");
Thread.sleep(3000);
System.out.println("子线程"+Thread.currentThread().getName()+"执行完毕");
latch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
};
}.start();

new Thread(){
public void run() {
try {
System.out.println("子线程"+Thread.currentThread().getName()+"正在执行");
Thread.sleep(3000);
System.out.println("子线程"+Thread.currentThread().getName()+"执行完毕");
latch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
};
}.start();

try {
System.out.println("等待2个子线程执行完毕...");
latch.await();
System.out.println("2个子线程已经执行完毕");
System.out.println("继续执行主线程");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

CyclicBarrier

利用ReentrantLock和Condition,来阻塞调用await方法的线程。当满足一定条件,自动唤醒所有的线程。

1
2
3
4
/** The lock for guarding barrier entry */
private final ReentrantLock lock = new ReentrantLock();
/** Condition to wait on until tripped */
private final Condition trip = lock.newCondition();

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}

/**
* Main barrier code, covering the various policies.
*/
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;

if (g.broken)
throw new BrokenBarrierException();

if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}

int index = --count;
if (index == 0) { // tripped,当最后一个线程调用await时,唤醒所有线程
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();// 唤醒所有线程
}
}

// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}

if (g.broken)
throw new BrokenBarrierException();

if (g != generation)
return index;

if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}

private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();// 唤醒所有线程
}

实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class CyclicBarrierTest {

static CyclicBarrier c = new CyclicBarrier(2);

public static void main(String[] args) {
new Thread(new Runnable() {

@Override
public void run() {
try {
c.await();
} catch (Exception e) {

}
System.out.println(1);
}
}).start();

try {
c.await();
} catch (Exception e) {

}
System.out.println(2);
}
}

CountDownLatch和CyclicBarrier的区别

  1. CountDownLatch只能用一次,CyclicBarrier可以使用reset()方法重置。

Semaphore

用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。

semapthore依赖于AbstractQueuedSynchronizer实现的同步器。可以用来做流量控制。

1
2
3
4
5
6
Semaphore s = new Semaphore(10);

s.acquire();
...
...
s.release();

实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class SemaphoreTest {
public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
final Semaphore sp = new Semaphore(3);
for (int i = 0; i < 10; i++) {
Runnable runnable = new Runnable() {
public void run() {
try {
sp.acquire();
} catch (InterruptedException e1) {
e1.printStackTrace();
}
System.out.println("线程" + Thread.currentThread().getName() +
"进入,当前已有" + (3 - sp.availablePermits()) + "个并发");
try {
Thread.sleep((long) (Math.random() * 10000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程" + Thread.currentThread().getName() +
"即将离开");
sp.release();
//下面代码有时候执行不准确,因为其没有和上面的代码合成原子单元
System.out.println("线程" + Thread.currentThread().getName() +
"已离开,当前已有" + (3 - sp.availablePermits()) + "个并发");
}
};
service.execute(runnable);
}
}
}

Exchanger

用于线程间的数据交换。它提供一个同步点,两个线程可以交换彼此数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**`
* @author joyo
* @date 2018/3/15
*/
public class ExchangerTest {
private static final Exchanger<String> exgr = new Exchanger<>();

private static ExecutorService threadPool = Executors.newFixedThreadPool(2);

public static void main(String[] args) {

threadPool.execute(new Runnable() {
@Override
public void run() {
try {
String A = "银行流水A";
exgr.exchange(A);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});

threadPool.execute(new Runnable() {
@Override
public void run() {
try {
String B = "银行流水A";
String A = exgr.exchange(B);
System.out.println("A和B数据是否一致:" + A.equals(B) + ", A录入的是:" + A + ", B录入的是:" + B);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});

threadPool.shutdown();
}
}