线程间的通信
线程之间协调工作的方式
基于等待通知模型的通信
等待/通知的相关方法是任意Java对象都具备的,因为这些方法被定义在所有对象的超类java.lang.Object上。
相关API
- notify: 通知一个对象上等待的线程,使其从wait方法返回,而返回的前提是该线程获取到了对象的锁
- notifyAll: 通知对象上所有等待的线程,使其从wait方法返回
- wait: 使线程进入WAITING(后面线程的生命周期讲)状态,只有等待另一个线程通知或者被中断才返回,需要注意的是,调用wait方法后需要释放对象的锁
- wait(long): 和wait类似,加入了超时时间,超时了还没被通知就直接返回
- wait(long, int): 纳秒级,不常用
一些需要注意的点:
- 使用wait()、notify()和notifyAll()时需要先对调用对象加锁。
- 调用wait()方法后,线程状态由RUNNING变为WAITING,并将当前线程放置到对象的等待队列,释放锁。
- notify()或notifyAll()方法调用后,等待线程不会立即从wait()返回,需要调用notify()或notifAll()的线程释放锁之后,等待线程才有机会从wait()返回。
- notify()方法将等待队列中的一个等待线程从等待队列中移到同步队列中,而notifyAll()方法则是将等待队列中所有的线程全部移到同步队列,被移动的线程状态由WAITING变为BLOCKED。
- 从wait()方法返回的前提是获得了调用对象的锁。
关于等待队列和同步队列
- 同步队列(锁池):假设线程A已经拥有了某个对象(注意:不是类)的锁,而其它的线程想要调用这个对象的某个synchronized方法(或者synchronized块),由于这些线程在进入对象的synchronized方法之前必须先获得该对象的锁的拥有权,但是该对象的锁目前正被线程A拥有,所以这些线程就进入了该对象的同步队列(锁池)中,这些线程状态为Blocked。
- 等待队列(等待池):假设一个线程A调用了某个对象的wait()方法,线程A就会释放该对象的锁(因为wait()方法必须出现在synchronized中,这样自然在执行wait()方法之前线程A就已经拥有了该对象的锁),同时 线程A就进入到了该对象的等待队列(等待池)中,此时线程A状态为Waiting。如果另外的一个线程调用了相同对象的notifyAll()方法,那么 处于该对象的等待池中的线程就会全部进入该对象的同步队列(锁池)中,准备争夺锁的拥有权。如果另外的一个线程调用了相同对象的notify()方法,那么 仅仅有一个处于该对象的等待池中的线程(随机)会进入该对象的同步队列(锁池)。
等待通知模型的示例
class WaitNotifyModel {
Object lock = new Object();
boolean flag = false;
public void start() {
Thread A = new Thread(() -> {
synchronized (lock) {
while (!flag) {
try {
System.out.println(Thread.currentThread().getName()+":等待通知");
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName()+ ":收到通知,处理业务逻辑");
}
});
A.setName("我是等待者");
Thread B = new Thread(() -> {
synchronized (lock) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
flag = true;
System.out.println(Thread.currentThread().getName()+":发出通知");
lock.notify();
}
});
B.setName("通知者");
A.start();
B.start();
}
}
模型
等待者
synchronized (对象) {
while (不满足条件) {
对象.wait()
}
处理业务逻辑
}
通知者
synchronized (对象) {
改变条件
对象.notify();
}
基于Condition的通信
上述的这种等待通知需要使用synchronized, 如果使用Lock的话就要用Condition
了
Condition接口也提供了类似Object的监视器方法,与Lock配合可以实现等待/通知模式
Condition与Object监视器的区别
项目 | Object的监视器方法 | Condition |
---|---|---|
前置条件 | 获得对象的锁 | Lock.lock()获取锁 Lock.newCondition()获取Condition |
调用方式 | obj.wait() | condition.await() |
等待队列个数 | 一个 | 可以多个 |
当前线程释放锁并进入等待状态 | 支持 | 支持 |
等待状态中不响应中断 | 不支持 | 支持 |
释放锁进入超时等待状态 | 支持 | 支持 |
进入等待状态到将来的某个时间 | 不支持 | 支持 |
唤醒等待中的一个或多个线程 | 支持 notify notifyAll | 支持signal signalAll |
这里有一些线程的状态,可以看完后边的线程的生命周期再回过头看看
示例
一般都会将Condition对象作为成员变量。当调用await()方法后,当前线程会释放锁并在此等待,而其他线程调用Condition对象的signal()方法,通知当前线程后,当前线程才从await()方法返回,并且在返回前已经获取了锁。
实现一个有界队列,当队列为空时阻塞消费线程,当队列满时阻塞生产线程
class BoundList<T> {
private LinkedList<T> list;
private int size;
private Lock lock = new ReentrantLock();
// 拿两个condition,一个是非空,一个是不满
private Condition notEmpty = lock.newCondition();
private Condition notFullCondition = lock.newCondition();
public BoundList(int size) {
this.size = size;
list = new LinkedList<>();
}
public void push(T x) throws InterruptedException {
lock.lock();
try {
while (list.size() >= size) {
// 满了就等待
notFullCondition.await();
}
list.push(x);
// 唤醒等待的消费者
notEmpty.signalAll();
} finally {
lock.unlock();
}
}
public T get() throws InterruptedException {
lock.lock();
try {
while (list.isEmpty()) {
// 空了就等
notEmpty.await();
}
T x = list.poll();
// 唤醒生产者
notFullCondition.signalAll();
return x;
} finally {
lock.unlock();
}
}
}
public class Demo_02_05_1_Condition {
public static void main(String[] args) {
BoundList<Integer> list = new BoundList<>(10);
// 生产数据的线程
new Thread(() -> {
for (int i = 0; i < 20; i++) {
try {
Thread.sleep(1000);
list.push(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
// 消费数据的线程
new Thread(() -> {
for (int i = 0; i < 20; i++) {
try {
System.out.println(list.get());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
基于BlockingQueue实现线程通信
这里列个标题,后边说到了BlockingQueue就明白怎么做了。
可以参看第05章的相关内容