Java并发编程基础

线程的状态

名称 说明
NEW 初始状态,线程被构建,但还没有调用start()方法
RUNNABLE 运行状态,java线程将操作系统中的就绪和运行统称为运行中
BLOCKED 阻塞状态,表示线程阻塞于锁
WAITING 等待状态,当前线程需要等待其他线程做出一些特定动作(通知或中断)
TIME_WAITING 超时等待状态,不同于WAITING,它可以在指定的时间自行返回的
TERMINATED 终止状态

image

Daemon线程

线程默认不是daemon线程,需要setDaemon(true)。daemon线程的finally方法,不一定有机会执行。

启动和终止

suspend()、resume()和stop()

suspend方法在调用后,线程不会释放已经占有的资源,而是占用资源进入睡眠状态。
同样,stop方法在中介一个线程时不会保证线程的资源正常释放,通常是没有给予线程完成资源释放工作的机会。

进程间通信

volatile和synchronized的方式

通过共享变量的方式

等待/通知机制

wait/notify(notifyAll)

指线程A调用了对象O的wait方法进入等待状态,而线程B通过调用对象O的notify或者notifyAll方法来唤醒A线程,A线程从wait方法返回继续执行。

  1. 调用waie、notify、notifyAll方法时,需要先对调用对象加锁
  2. 调用wait后,释放锁。线程状态由running变为waiting
  3. notify、notifyAll释放所之后,wait的线程才能获取锁。
  4. 调用notify、notifyAll后,等待线程从等待队列中移到同步队列中,被移动的线程的状态由WATING变为BLOCKED

使用方式:

  1. Thread.join
  2. 线程池
  3. 数据库连接池

LockSupport

  1. Condition(await、signal、signalAll)
  2. AbstractQueuedSynchronizer
    1. ReentrantLock
    2. CountDownLatch

ThreadLocal

线程应用实例

等待超时模式

1
2
3
4
5
6
long future = System.currentTimeMillis() + mills;
long remaining = mills;
while(remaining > 0){
wait(remaing);
remaining = future - System.currentTimeMillis();
}

线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package com.iforfee.common.thread;


import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

/**
* @author joyo
* @date 2018/3/14
*/
public class DefaultThreadPool<Job extends Runnable> implements ThreadPool<Job> {

private static final int MAX_WORKER_NUMBERS = 10;
private static final int DEFAULT_WORKER_NUMBERS = 5;
private static final int MIN_WORKER_NUMBERS = 1;

private final LinkedList<Job> jobs = new LinkedList<Job>();
private final List<Worker> workers = Collections.synchronizedList(new ArrayList<Worker>());

private int workerNum = DEFAULT_WORKER_NUMBERS;
private AtomicLong threadNum = new AtomicLong();

public DefaultThreadPool() {
initializeWokers(DEFAULT_WORKER_NUMBERS);
}

public DefaultThreadPool(int num) {
workerNum = num > MAX_WORKER_NUMBERS ? MAX_WORKER_NUMBERS : num < MIN_WORKER_NUMBERS ? MIN_WORKER_NUMBERS : num;
initializeWokers(workerNum);
}

@Override
public void execute(Job job) {
if (job != null) {
synchronized (jobs) {
jobs.addLast(job);
jobs.notify();
}
}
}

@Override
public void shutdown() {
workers.forEach(job -> job.shutdown());
}

@Override
public void addWorkers(int num) {
synchronized (jobs) {
if (num + this.workerNum > MAX_WORKER_NUMBERS) {
num = MAX_WORKER_NUMBERS - this.workerNum;
}
initializeWokers(num);
this.workerNum += num;
}
}

@Override
public void removeWorker(int num) {
synchronized (jobs) {
if (num >= this.workerNum) {
throw new IllegalArgumentException("beyond workerNum");
}

int count = 0;
while (count < num) {
Worker worker = workers.get(count);
if (workers.remove(worker)) {
worker.shutdown();
count++;
}
this.workerNum -= count;
}
}
}

@Override
public int getJobSize() {
return jobs.size();
}

private void initializeWokers(int num) {
for (int i = 0; i < num; i++) {
Worker worker = new Worker();
workers.add(worker);
Thread thread = new Thread(worker, "ThreadPool-Worker-" + threadNum.incrementAndGet());
thread.start();
}
}


class Worker implements Runnable {

private volatile boolean running = true;

@Override
public void run() {
while (running) {
Job job = null;
synchronized (jobs) {
while (jobs.isEmpty()) {
try {
jobs.wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
job = jobs.removeFirst();
}
if (job != null) {
try {
job.run();
} catch (Exception ex) {
//ignore exception from job
}
}
}
}

public void shutdown() {
running = false;
}
}

}

interface ThreadPool<Job extends Runnable> {
void execute(Job job);

void shutdown();

void addWorkers(int num);

void removeWorker(int num);

int getJobSize();
}

数据库连接池

web服务器