Java并发包提供了哪些并发工具类

我们通常所说的并发包也就是 java.util.concurrent 及其子包,集中了 Java 并发的各种基础工具类

结构图

图来自: Java 并发工具箱之concurrent包

Java并发包提供了比synchronized更加高级的各种同步结构,包括CountDownLatch、CyclicBarrier、Semaphore等,可以实现更加丰富的多线程操作,比如利用Semaphore作为资源控制器,限制同时进行工作的线程数量。

CountDownLatch

CountDownLatch是一个计数器闭锁,通过它可以完成类似于阻塞当前线程的功能,即:一个线程或多个线程一直等待,直到其他线程执行的操作完成。CountDownLatch用一个给定的计数器来初始化,该计数器的操作是原子操作,即同时只能有一个线程去操作该计数器。调用该类await方法的线程会一直处于阻塞状态,直到其他线程调用countDown方法使当前计数器的值变为零,每次调用countDown计数器的值减1。当计数器值减至零时,所有因调用await()方法而处于等待状态的线程就会继续往下执行。这种现象只会出现一次,因为计数器不能被重置,如果业务上需要一个可以重置计数次数的版本,可以考虑使用CycliBarrier。

在某些业务场景中,程序执行需要等待某个条件完成后才能继续执行后续的操作。典型的应用如并行计算,当某个处理的运算量很大时,可以将该运算任务拆分成多个子任务,等待所有的子任务都完成之后,父任务再拿到所有子任务的运算结果进行汇总。

模型:
CountDownLatch模型

实例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import java.util.concurrent.CountDownLatch;

public class CountDownLatchTest implements Testable {
private CountDownLatch latch = new CountDownLatch(3);

private final Runnable subRunnable = () -> {
log("waste time");
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
log("countDown");
latch.countDown();
log("do whatever then");
};

private final Runnable superRunnable = () -> {
log("dispatch event");
for (int i = 1; i < 4; i++) {
new Thread(subRunnable, String.valueOf(i)).start();
}
log("wait for latch");
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
log("do whatever then");
};


@Override
public void test() {
new Thread(superRunnable, "A").start();
}

private void log(String log) {
System.out.println(String.valueOf(System.nanoTime()).substring(8) + "\t" + Thread.currentThread().getName() + "\t" + log);
}
}

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
6225400	A	dispatch event
6618800 A wait for latch
6643800 1 waste time
6694300 2 waste time
6754999 3 waste time
6829100 3 countDown
6829500 2 countDown
6829100 1 countDown
6964400 2 do whatever then
7071200 A do whatever then
6930100 3 do whatever then
7051900 1 do whatever then

总结:
不可重置的倒计时器,一个或一些线程等待某个事件完成,方法有:

1
2
3
4
void countDown() // 倒计时减一
long getCount() // 当前倒计时
void await() throws InterruptedException // 阻塞当前线程,直到倒计时为0
boolean await(long timeout, TimeUnit unit) throws InterruptedException // 最长等待时间,超时返回false,否则true

CyclicBarrier

CyclicBarrier也是一个同步辅助类,它允许一组线程相互等待,直到到达某个公共屏障点(common barrier point)。通过它可以完成多个线程之间相互等待,只有当每个线程都准备就绪后,才能各自继续往下执行后面的操作。类似于CountDownLatch,它也是通过计数器来实现的。当某个线程调用await方法时,该线程进入等待状态,且计数器加1,当计数器的值达到设置的初始值时,所有因调用await进入等待状态的线程被唤醒,继续执行后续操作。因为CycliBarrier在释放等待线程后可以重用,所以称为循环barrier。CycliBarrier支持一个可选的Runnable,在计数器的值到达设定值后(但在释放所有线程之前),该Runnable运行一次,注,Runnable在每个屏障点只运行一个。

CyclicBarrier与CountDownLatch的区别:

  • CountDownLatch主要是实现了1个或N个线程需要等待其他线程完成某项操作之后才能继续往下执行操作,描述的是1个线程或N个线程等待其他线程的关系。CyclicBarrier主要是实现了多个线程之间相互等待,直到所有的线程都满足了条件之后各自才能继续执行后续的操作,描述的多个线程内部相互等待的关系。
  • CountDownLatch是一次性的,而CyclicBarrier则可以被重置而重复使用。

模型:
CyclicBarrier模型

实例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierTest implements Testable {
private CyclicBarrier barrier;

public CyclicBarrierTest() {
barrier = new CyclicBarrier(3, () -> log("sync progress"));
Thread.currentThread().setName("A");
}

private Runnable work = () -> {
log("work start");
for (int i = 0; i < 4; i++) {
try {
Thread.sleep(50);
log("part" + i + " end,wait for sync");
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
};


@Override
public void test() {
for (int i = 0; i < 3; i++) {
new Thread(work, String.valueOf(i + 1)).start();
}
}

private synchronized void log(String log) {
System.out.println(String.valueOf(System.nanoTime()).substring(6,13) + "\t" + Thread.currentThread().getName() + "\t" + barrier.getNumberWaiting() + "\t" + log);
}
}

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
9365108	1	0	work start
9367170 3 0 work start
9367894 2 0 work start
9866428 1 0 part0 end,wait for sync
9876561 2 1 part0 end,wait for sync
9877540 3 2 part0 end,wait for sync
9878223 3 3 sync progress
0385298 1 0 part1 end,wait for sync
0386297 2 1 part1 end,wait for sync
0386930 3 2 part1 end,wait for sync
0387449 3 3 sync progress
0893686 1 0 part2 end,wait for sync
0894693 3 1 part2 end,wait for sync
0895290 2 2 part2 end,wait for sync
0895780 2 3 sync progress
1402234 2 0 part3 end,wait for sync
1402908 1 1 part3 end,wait for sync
1403358 3 2 part3 end,wait for sync
1404103 3 3 sync progress

总结:
多个线程相互等待协调的计数器,方法有:

1
2
3
4
5
6
int getNumberWaiting() // 当前计数
int getParties() // 获取目标数
int await() throws InterruptedException // 阻塞当前线程,直到计数达到目标数
int await(long timeout, TimeUnit unit) throws InterruptedException // 最长等待时间,超时返回false,否则true
boolean isBroken() // 某个线程在等待的时候抛异常了
void reset() // 重置计时,一般不需要手动重置,除非broken

action通过在CyclicBarrier初始化时传runnable设置

Semaphore

Semaphore 是一个计数信号量,表示某种资源的量,必须由获取它的线程释放。它被更多地用来限制流量,类似阀门的功能。如果限定某些资源最多有N个线程可以访问,那么超过N个主不允许再有线程来访问,同时当现有线程结束后,就会释放,然后允许新的线程进来。有点类似于锁的lock与 unlock过程。

模型:
Semaphore模型

实例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import java.util.concurrent.Semaphore;

public class SemaphoreTest implements Testable {
private Semaphore semaphore;

public SemaphoreTest() {
semaphore = new Semaphore(2);
}

private Runnable work = () -> {
try {
log("want to work");
semaphore.acquire();
log("go to work");
Thread.sleep(50);
log("done");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
log("leave");
}
};


@Override
public void test() {
for (int i = 0; i < 4; i++) {
new Thread(work, String.valueOf(i + 1)).start();
}
}

private synchronized void log(String log) {
System.out.println(String.valueOf(System.nanoTime()).substring(6, 13) + "\t" + Thread.currentThread().getName() + "\t" + semaphore.availablePermits() + "\t" + log);
}
}

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
8070770	1	2	want to work
8072553 2 1 want to work
8073135 2 0 go to work
8073653 3 0 want to work
8074344 4 0 want to work
8074986 1 0 go to work
8582545 1 0 done
8583281 2 1 done
8583992 4 0 go to work
8584477 3 0 go to work
8585190 1 0 leave
8585643 2 0 leave
9091398 4 0 done
9092511 4 1 leave
9092874 3 1 done
9093478 3 2 leave

总结:
一个计数信号量,表示某种资源的量。

方法很多,不全列出来,重要的是void acquire() throws InterruptedException 获取一个资源和void release()释放资源,可加permits参数要求数量。

构造对象时可加boolean fair

支持acquireUninterruptibly

If the current thread is interrupted while waiting for a permit then it will continue to wait, but the time at which the thread is assigned a permit may change compared to the time it would have received the permit had no interruption occurred. When the thread does return from this method its interrupt status will be set.

int drainPermits()获取全部可用资源

int availablePermits()获取可用数量

还可以获取等待队列长度,以及使用tryAcquire设定等待超时。

Exchanger

Exchanger是在两个任务之间交换对象的栅栏,当这些任务进入栅栏时,它们各自拥有一个对象。当他们离开时,它们都拥有之前由对象持有的对象。它典型的应用场景是:一个任务在创建对象,这些对象的生产代价很高昂,而另一个任务在消费这些对象。通过这种方式,可以有更多的对象在被创建的同时被消费。第一个先拿出对象的线程会一直等待第二个线程,直到第二个线程到来时才能彼此交换对象。

模型:

Exchanger模型

实例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class ExchangerTest implements Testable {
private Exchanger<Data> exchanger;

public ExchangerTest() {
exchanger = new Exchanger<>();
}

class Data {
int id;
}

@Override
public void test() {
Thread consumer = new Thread(() -> {
Data data = new Data(); // empty data
while (true) {
log("ask for data", data);
try {
data = exchanger.exchange(data, 1000, TimeUnit.MILLISECONDS);
log("consume data", data);
Thread.sleep(80);
data.id = -1;
} catch (InterruptedException e) {
e.printStackTrace();
} catch (TimeoutException e) {
log("no more data", data);
break;
}
}
}, "consumer");

Thread producer = new Thread(() -> {
Data data = new Data();
for (int i = 1; i < 4; i++) {
try {
Thread.sleep(50);
data.id = i;
log("data prepared", data);
data = exchanger.exchange(data);
} catch (InterruptedException e) {
e.printStackTrace();
}
log("rebuild data", data);
}

}, "producer");

producer.start();
consumer.start();
}

private synchronized void log(String log, Data data) {
System.out.println(String.valueOf(System.nanoTime()).substring(6, 13) + "\t" + Thread.currentThread().getName() + "\t" + data.id + "\t" + log);
}
}

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
0683404	consumer	0	ask for data
1187820 producer 1 data prepared
1188986 producer 0 rebuild data
1189377 consumer 1 consume data
1695804 producer 2 data prepared
1995440 consumer -1 ask for data
1997091 consumer 2 consume data
1997827 producer -1 rebuild data
2503975 producer 3 data prepared
2803389 consumer -1 ask for data
2804178 consumer 3 consume data
2804779 producer -1 rebuild data
3640267 consumer -1 ask for data
3655087 consumer -1 no more data

总结:
两个线程间交换对象,没那么多花里胡哨的,就俩方法:

1
2
V exchange(V x) throws InterruptedException // 阻塞,等待交换
V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException // 超时异常

Phaser

Phaser是Java7引入的,它可以协调线程同步,它允许线程动态注册并相互等待,直到全部到达下一个周期(onAdvance)。通过它可以完成多个线程之间相互等待,只有当每个线程都准备就绪后,才能各自继续往下执行后面的操作。类似于CountDownLatchCyclicBarrier,但他支持动态改变parties个数,以及支持对phase的访问。Phaser未提供查询是否已经注册的功能,但你可以通过继承自己实现。它通过两个计数器和一个非AQS的阻塞队列来实现,当某个线程调用registerderegister方法,将触发parties变更(CAS),而全部parties到达(arrive)会触发phase变更,之后等待状态的线程被唤醒,继续执行后续操作(Phaser支持的可唤醒节点有两个:所有parties到达、所有parties到达且advance)。因为Phaser通过onAdvance进入下一周期,因此可多次使用,可通过isTerminated()来判断是否结束。Phaser支持重写onAdvance,在进入下个周期前回调。

Phaser与CyclicBarrier的区别:

Phaser支持动态增减parties计数,支持在指定周期处等待,使用java并发类对线程进行分组同步控制的时候,Phaser比CyclicBarrier类更加强大,建议使用。

实例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package com.hosigus.concurent;

import java.util.concurrent.Phaser;

public class PhaserTester implements Testable {
private Phaser phaser;
private final int WORK_COUNT = 2;

public PhaserTester() {
Thread.currentThread().setName("A");
phaser = new Phaser(1) { // one for self
@Override
protected boolean onAdvance(int phase, int registeredParties) {
log("onAdvance " + phase);
return phase >= WORK_COUNT || super.onAdvance(phase, registeredParties); // decide isTerminated
}
};
}

private Runnable work = () -> {
log("work start");
do {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
log("work end,wait for sync");
phaser.arriveAndAwaitAdvance();
} while (!phaser.isTerminated());
};

@Override
public void test() {
for (int i = 0; i < 4; i++) {
phaser.register();
new Thread(work, String.valueOf(i + 1)).start();
}
// allow threads to start and deregister self
phaser.arriveAndDeregister();
}

private void log(String log) {
System.out.println(String.valueOf(System.nanoTime()).substring(6, 13) + "\t" + Thread.currentThread().getName() + "\t" + log);
}
}

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
0843100	1	work start
0844450 4 work start
0843524 3 work start
0843129 2 work start
1346007 1 work end,wait for sync
1346007 3 work end,wait for sync
1346007 2 work end,wait for sync
1346007 4 work end,wait for sync
1348580 4 onAdvance 0
1854865 2 work end,wait for sync
1854747 4 work end,wait for sync
1854935 1 work end,wait for sync
1854747 3 work end,wait for sync
1857051 3 onAdvance 1
2362999 4 work end,wait for sync
2363399 2 work end,wait for sync
2362999 1 work end,wait for sync
2363489 3 work end,wait for sync
2366005 3 onAdvance 2

总结:

这篇文章官方文档都说的很好了,建议阅读

文章目录
  1. 1. CountDownLatch
  2. 2. CyclicBarrier
  3. 3. Semaphore
  4. 4. Exchanger
  5. 5. Phaser