简介
利用生产者消费者模式,可以解决生产者生产速度快、消费者消费速度慢,双方处理速度不一致的问题。
生产者将产出直接塞入队列,继续执行生产任务;消费者则循环有序的从产品队列去除产品进行消费。
本算法及其实现,在最基本的生产者消费者模式中添加了间隔概念,即消费者调度线程从产品队列中
取出相应数量产品提交给所有消费者后,调度线程会睡眠相应时间,即间隔多少时间执行一次消费。
设计
原始程序流程设计框图:
核心代码片段
1 |
|
说明:
- 执行器不断循环获取产品队列中的产品
- 若获取不到,则判断临时产品列表非空否,非空则执行消费
- 若获取到产品则将其添加至临时产品列表
- 最后判断临时产品列表大小是否超过配置的最大值,超过则执行消费
- 每次成功消费后清空临时产品列表,并睡眠配置的间隔时间
用法示例
用法1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Test
public void test() {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");
QueueProducer<String> producer = new QueueProducer<>(1000);
QueueExecutor<String> executor = new QueueExecutor<String>()
.consumeMaxSize(100)
.consumeInterval(300L)
.producer(producer)
.consumer(list -> System.out.println(String.format("[%s] Consumes list, size: %s", simpleDateFormat.format(new Date()), list.size())));
executor.start();
for (int i = 0; i < 2000; i++) {
producer.put("Product_" + i);
try {
Thread.sleep(2);
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
}
executor.stop();
}
结果输出1
2
3
4
5
6
7
8
9
10
11
[2018-12-12 15:06:46:363] Consumes list, size: 100
[2018-12-12 15:06:47:810] Consumes list, size: 100
[2018-12-12 15:06:49:271] Consumes list, size: 100
[2018-12-12 15:06:50:724] Consumes list, size: 100
[2018-12-12 15:06:52:187] Consumes list, size: 100
[2018-12-12 15:06:53:626] Consumes list, size: 100
[2018-12-12 15:06:55:088] Consumes list, size: 100
[2018-12-12 15:06:56:540] Consumes list, size: 100
[2018-12-12 15:06:57:969] Consumes list, size: 100
[2018-12-12 15:06:59:415] Consumes list, size: 100
代码仓库
https://github.com/johnsonmoon/queue-producer-consumer-model.git