[JavaEE]阻塞队列
专栏简介: JavaEE从入门到进阶
题目来源: leetcode,牛客,剑指offer.
创作目标: 记录学习JavaEE学习历程
希望在提升自己的同时,帮助他人,,与大家一起共同进步,互相成长.
学历代表过去,能力代表现在,学习能力代表未来!
目录:
1.阻塞队列的概念
2.标准库中的阻塞队列
3.生产者消费者模型
3.1 生产者消费者模型的意义:
3.2 生产者消费者模型的作用:
3.2 生产者消费者模型:
4.阻塞队列实现
1.阻塞队列的概念
阻塞队列是一种特殊的队列 , 也遵循"先进先出"的原则.
阻塞队列是一种线程安全的数据结构 , 具体特性如下:
- 当队列满时 , 继续向队列中添加元素就会产生阻塞 , 直到其他线程从队列中取走元素.
- 当队列空时 , 继续取队列中的元素就会产生阻塞 , 直到其他线程向队列中添加元素.
阻塞队列的一个典型应用场景就是"生产者消费者模型" , 是一种典型的开发方式.
2.标准库中的阻塞队列
Java标准库中内置了阻塞队列 , 如果我们需要在一些程序中使用阻塞队列 , 直接调用库即可.
阻塞队列队列是一个接口 , 具体的实现类是优先级阻塞队列 , 顺序表阻塞队列 , 链表阻塞队列.
public interface BlockingQueue<E> extends Queue<E>
- put方法用于阻塞式的入队列 , take方法用于阻塞式的出队列
- 阻塞队列也有 poll() , offer() , peek()这些方法 , 但不具有阻塞性
BlockingQueue queue = new LinkedBlockingQueue();
queue.put("hello");
queue.take();
3.生产者消费者模型
3.1 生产者消费者模型的意义:
生产者消费者模型 , 本质上就是通过一个容器来解决生产者和消费者之间强耦合的问题.
通常意义下的生产者和消费者之间的耦合程度是很高的 , 如果生产者和消费者直接相关联 , 那么二者中任意一个出现错误就会导致另一个也出现错误. 如果要修改其中一个的代码 , 另一个也要修改不少代码 , 还要重新测试 , 重新发布 , 重新部署....非常麻烦.
那么如果我们使用"生产者消费者模型" , 生产者和消费者彼此之间不进行通信 , 而是通过一个阻塞队列来通信 , 所以生成者生成完数据不用直接给消费者 , 而是直接扔给阻塞队列.消费者也不再向生产者所要数据 , 而是直接从阻塞队列中取. 这样生产者和消费者之间的耦合程度就大大降低了.
3.2 生产者消费者模型的作用:
- 1."削峰填谷" , 阻塞队列相当于一个缓冲区 , 平衡了生产者和消费者的处理能力.
在"秒杀"场景下 , 服务器同一时刻可能收到大量的支付请求 , 如果直接处理这些请求 , 服务器可能会崩溃 , 这时如果把这些请求放入阻塞队列中 , 然后再由服务器来慢慢处理一个个请求 , 由此达到"削峰"的效果.
- 2.阻塞队列也能使生成者和消费者之间"解耦".
比如一家人过年包饺子 , 这个流程需要由明确的分工 , 擀饺子皮的人是"生产者" , 包饺子的人是"消费者" , 擀饺子皮的人不必关心谁包饺子 , 包饺子的人也不必关心谁擀饺子皮.
3.2 生产者消费者模型:
创建一个阻塞队列 , 一个生产者线程和一个消费者线程. 为了达到"生产一个 消费一个的效果" , 需要让生产者每生产一个就停顿一下.
public static void main(String[] args) {
BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>();
Thread customer = new Thread(()->{
try {
while (true){
int ret = blockingQueue.take();
System.out.println("消费"+ret);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
Thread producer = new Thread(()->{
int count = 0;
while (true){
try {
blockingQueue.put(count);
System.out.println("生产"+count);
count++;
Thread.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
customer.start();
producer.start();
}
4.阻塞队列实现
实现阻塞队列首先要实现一个队列 , 本文采用"循环队列的实现方式".
1) 实现循环队列
class MyBlockingQueue{
public int head;
public int tail;
public int size;
public int[] queue = new int[1000];
//入队
public void put(int value){
if(size == queue.length){
return;
}
queue[tail] = value;
tail++;
if(tail>=queue.length){
tail = 0;
}
size++;
}
//出队
public Integer take(){
if(size == 0){
return null;
}
int ret = queue[head];
head++;
if(head>=queue.length){
head = 0;
}
size--;
return ret;
}
}
2) 改进为阻塞队列
在循环队列的基础上 , 入队时发现队伍满了 , 就使用 wait 阻塞. 出队时发现队伍空了 , 也用 wait 阻塞. 当入队操作阻塞时 , 如果出队操作执行完毕就可以通知入队操作解除阻塞. 当出队操作阻塞时 , 如果入队操作执行完毕就可以通知出队操作解除阻塞.
- 那么是否会出现入队和出队操作同时陷入阻塞的情况?
答案是不会 , 因为入队和出队被同步代码块限定为同一个队列 , 同一个队列不可能出现即空又满的情况 , 否则就会成为薛定谔的队列.
- 那么是否会出现入队的 wait 被唤醒 , 队伍还是满或出队的 wait 被唤醒队伍还是空的情况?
理论上我们实现的代码不会产生这个问题 , 但为了保险起见 , 我们要将返回的 wait 再判定一次 , 看此时的条件是否具备.因此我们可以参考 Java 标准库中的 wait , 使用循环判定.
class MyBlockingQueue{
public int head = 0;
public int tail = 0;
public int size = 0;//表示元素个数
public int[] arrQ = new int[1000];
//入队 put
public void put(int value) throws InterruptedException {
synchronized (this) {
while (size == arrQ.length ){//为了防止出现wait被唤醒还是满的情况.符合标准库的规范
//队列满了 , 产生阻塞
this.wait();
}
arrQ[tail] = value;
tail++;
if(tail>=arrQ.length){
tail=0;
}
size++;
//这个notify take()中的wait
this.notify();
}
}
//出队 take
public Integer take() throws InterruptedException {
int ret;
synchronized (this) {
while (size == 0){
this.wait();
}
ret = arrQ[head];
head++;
if(head>=arrQ.length){
head=0;
}
size--;
//唤醒 put() 中的wait
this.notify();
}
return ret;
}
}
测试自定义阻塞队列:
public class ThreadDemo6 {
public static void main(String[] args) throws InterruptedException {
MyBlockingQueue myBlockingQueue = new MyBlockingQueue();
//创建两个线程来作为生产者和消费者
Thread customer = new Thread(()->{
while (true){
try {
int result = myBlockingQueue.take();
System.out.println("消费"+result);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
customer.start();
Thread producer = new Thread(()->{
int count = 0;
while (true){
try {
System.out.println("生产"+count);
myBlockingQueue.put(count);
count++;
Thread.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
producer.start();
}
}