生产者消费者模式: 间隔队列消费执行器算法实现及其用法介绍

简介

利用生产者消费者模式,可以解决生产者生产速度快、消费者消费速度慢,双方处理速度不一致的问题。
生产者将产出直接塞入队列,继续执行生产任务;消费者则循环有序的从产品队列去除产品进行消费。

本算法及其实现,在最基本的生产者消费者模式中添加了间隔概念,即消费者调度线程从产品队列中
取出相应数量产品提交给所有消费者后,调度线程会睡眠相应时间,即间隔多少时间执行一次消费。

设计

原始程序流程设计框图:
image

核心代码片段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

List<T> list = new ArrayList<>();
while (!shutdown.get()) {
try {
sleep(10);
T t = producer.poll();
if (t != null) {
list.add(t);
} else {
if (list.size() > 0) {
doConsume(list, consumers);
list = new ArrayList<>();
sleep(consumeInterval);
}
}
if (list.size() >= consumeMaxSize) {
doConsume(list, consumers);
list = new ArrayList<>();
sleep(consumeInterval);
}
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
}

说明:

  • 执行器不断循环获取产品队列中的产品
  • 若获取不到,则判断临时产品列表非空否,非空则执行消费
  • 若获取到产品则将其添加至临时产品列表
  • 最后判断临时产品列表大小是否超过配置的最大值,超过则执行消费
  • 每次成功消费后清空临时产品列表,并睡眠配置的间隔时间

用法示例

用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

@Test
public void test() {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");

QueueProducer<String> producer = new QueueProducer<>(1000);
QueueExecutor<String> executor = new QueueExecutor<String>()
.consumeMaxSize(100)
.consumeInterval(300L)
.producer(producer)
.consumer(list -> System.out.println(String.format("[%s] Consumes list, size: %s", simpleDateFormat.format(new Date()), list.size())));

executor.start();

for (int i = 0; i < 2000; i++) {
producer.put("Product_" + i);
try {
Thread.sleep(2);
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
}

executor.stop();
}

结果输出

1
2
3
4
5
6
7
8
9
10
11

[2018-12-12 15:06:46:363] Consumes list, size: 100
[2018-12-12 15:06:47:810] Consumes list, size: 100
[2018-12-12 15:06:49:271] Consumes list, size: 100
[2018-12-12 15:06:50:724] Consumes list, size: 100
[2018-12-12 15:06:52:187] Consumes list, size: 100
[2018-12-12 15:06:53:626] Consumes list, size: 100
[2018-12-12 15:06:55:088] Consumes list, size: 100
[2018-12-12 15:06:56:540] Consumes list, size: 100
[2018-12-12 15:06:57:969] Consumes list, size: 100
[2018-12-12 15:06:59:415] Consumes list, size: 100

代码仓库

https://github.com/johnsonmoon/queue-producer-consumer-model.git