简介
简单的实现一个任务队列算法,用户提交任务给执行器,执行器根据’先进先出’原则依次执行用户提交的任务。
用户提交任务后即接收到一个任务控制对象,用户可以用该对象进行 获取任务当前状态、等待任务执行、
获取任务执行返回值以及取消任务等操作。
设计
原始程序流程设计框图:
核心代码片段
执行器主循环片段
1 | while (!shutdown.get()) { |
说明:
- 任务锁为循环计数锁,初始容量为执行器配置的任务并发执行数量,即同时执行的最大任务数量。
- 主循环循环获取任务锁,一旦获取锁,即执行任务分发程序。
- 任务执行结束(或抛出异常),释放锁,为下一个任务分发释放资源。
任务执行分发程序片段
1 | final TaskFuture<T> taskFuture = taskFutureBlockingQueue.poll(); |
说明:
- 分发程序从任务队列中取出’等待’状态的任务,并将其状态置为’执行中’。
- 分发程序提交任务给任务执行线程池,任务正式执行。
- 分发程序阻塞等待任务执行。
- 任务执行完成、超时、异常,分发程序记录状态,并将状态置入任务状态对象中。
用法示例
基础用法
用法1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
@Test
public void test0() {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");
//--- configure the task executor and start it
QueueTaskExecutor<String> executor = new QueueTaskExecutor<String>()
.defaultTaskTimeout(30_000)//Set default timeout for single task execution
.taskConcurrenceCount(2)//Set max count of task executing at the same time
.taskMaxCount(100)//Set max block waiting task count
.start();
{
//--- submit task
TaskFuture<String> taskFuture = executor.submit(() -> {
// TODO task codes
try {
Thread.sleep(3_000);
} catch (Exception e) {
logger.debug(e.getMessage());
}
return "-DONE-";
});
//--- wait task done
String result = taskFuture.waitFor();
System.out.println(String.format("[%s] taskId: %s, result: %s", simpleDateFormat.format(new Date()), taskFuture.getTaskId(), result));
}
{
//--- submit task
TaskFuture<String> taskFuture = executor.submit(() -> {
// TODO task codes
try {
Thread.sleep(3_000);
} catch (Exception e) {
logger.debug(e.getMessage());
}
return "-DONE-";
});
//--- wait task done
if (taskFuture.waitFor(10_000)) {
String result = taskFuture.exitValue();
System.out.println(String.format("[%s] taskId: %s, result: %s", simpleDateFormat.format(new Date()), taskFuture.getTaskId(), result));
}
}
{
//--- submit task
TaskFuture<String> taskFuture = executor.submit(new Task<String>() {
@Override
public void before() {
//TODO code before this task executing
}
@Override
public void after(String s) {
//TODO code after this task executed
}
@Override
public String execute() {
//TODO task codes
try {
Thread.sleep(3_000);
} catch (Exception e) {
logger.debug(e.getMessage());
}
return "-DONE-";
}
});
//--- wait task done
String result = taskFuture.waitFor();
System.out.println(String.format("[%s] taskId: %s, result: %s", simpleDateFormat.format(new Date()), taskFuture.getTaskId(), result));
}
//--- stop the executor
executor.stop();
}
结果输出1
2
3
4
[2018-12-12 16:24:31:589] taskId: 2afd1912e76c400987804e83ab6fc422, result: -DONE-
[2018-12-12 16:24:34:631] taskId: d48f1df1c4364d28b30d4b14535cdd12, result: -DONE-
[2018-12-12 16:24:37:667] taskId: b5a08e9d105b49b38acd9d544a72145f, result: -DONE-
用法
示例 No.1
用法1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
@Test
public void test1() {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");
//--- configure the task executor and start it
QueueTaskExecutor<String> executor = new QueueTaskExecutor<String>()
.defaultTaskTimeout(30_000)//Set default timeout for single task execution
.taskConcurrenceCount(2)//Set max count of task executing at the same time
.taskMaxCount(100)//Set max block waiting task count
.start();
//--- submit tasks
List<TaskFuture<String>> taskFutures = new ArrayList<>();
for (int i = 0; i < 5; i++) {
TaskFuture<String> taskFuture = executor.submit(() -> {
// TODO task codes
try {
Thread.sleep(3_000);
} catch (Exception e) {
logger.debug(e.getMessage());
}
return "-DONE-";
});
taskFutures.add(taskFuture);
}
//--- wait all task done
while (true) {
boolean alldone = true;
System.out.println("\r\n---------------------");
for (TaskFuture<String> taskFuture : taskFutures) {
System.out.println(String.format("[%s] taskId: %s, status: %s", simpleDateFormat.format(new Date()), taskFuture.getTaskId(), taskFuture.getTaskStatus()));
alldone = alldone && taskFuture.isDone();
}
if (alldone) {
break;
}
try {
Thread.sleep(3000);
} catch (Exception e) {
logger.debug(e.getMessage());
}
}
//--- get all task exit values
System.out.println("\r\n---------------------");
for (TaskFuture<String> taskFuture : taskFutures) {
System.out.println(taskFuture.exitValue());
}
//--- stop the executor
executor.stop();
}
结果输出1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
---------------------
[2018-12-12 16:25:12:396] taskId: 28248cdd933641fcac24415c3a5632b9, status: WAITING
[2018-12-12 16:25:12:397] taskId: 58c9e34b53014ef091812eb7ee7f06a6, status: WAITING
[2018-12-12 16:25:12:397] taskId: 0318d0a048d643839e5ca4bc4e2748d2, status: WAITING
[2018-12-12 16:25:12:397] taskId: a4ee0322de974c61b1c409eb11b9580e, status: WAITING
[2018-12-12 16:25:12:398] taskId: 406aa6d336f948cb81e641c1ed6b59bb, status: WAITING
---------------------
[2018-12-12 16:25:15:402] taskId: 28248cdd933641fcac24415c3a5632b9, status: COMPLETED
[2018-12-12 16:25:15:402] taskId: 58c9e34b53014ef091812eb7ee7f06a6, status: EXECUTING
[2018-12-12 16:25:15:403] taskId: 0318d0a048d643839e5ca4bc4e2748d2, status: WAITING
[2018-12-12 16:25:15:403] taskId: a4ee0322de974c61b1c409eb11b9580e, status: WAITING
[2018-12-12 16:25:15:403] taskId: 406aa6d336f948cb81e641c1ed6b59bb, status: WAITING
---------------------
[2018-12-12 16:25:18:408] taskId: 28248cdd933641fcac24415c3a5632b9, status: COMPLETED
[2018-12-12 16:25:18:408] taskId: 58c9e34b53014ef091812eb7ee7f06a6, status: COMPLETED
[2018-12-12 16:25:18:409] taskId: 0318d0a048d643839e5ca4bc4e2748d2, status: EXECUTING
[2018-12-12 16:25:18:409] taskId: a4ee0322de974c61b1c409eb11b9580e, status: EXECUTING
[2018-12-12 16:25:18:409] taskId: 406aa6d336f948cb81e641c1ed6b59bb, status: WAITING
---------------------
[2018-12-12 16:25:21:413] taskId: 28248cdd933641fcac24415c3a5632b9, status: COMPLETED
[2018-12-12 16:25:21:413] taskId: 58c9e34b53014ef091812eb7ee7f06a6, status: COMPLETED
[2018-12-12 16:25:21:414] taskId: 0318d0a048d643839e5ca4bc4e2748d2, status: COMPLETED
[2018-12-12 16:25:21:414] taskId: a4ee0322de974c61b1c409eb11b9580e, status: COMPLETED
[2018-12-12 16:25:21:414] taskId: 406aa6d336f948cb81e641c1ed6b59bb, status: EXECUTING
---------------------
[2018-12-12 16:25:24:417] taskId: 28248cdd933641fcac24415c3a5632b9, status: COMPLETED
[2018-12-12 16:25:24:417] taskId: 58c9e34b53014ef091812eb7ee7f06a6, status: COMPLETED
[2018-12-12 16:25:24:418] taskId: 0318d0a048d643839e5ca4bc4e2748d2, status: COMPLETED
[2018-12-12 16:25:24:418] taskId: a4ee0322de974c61b1c409eb11b9580e, status: COMPLETED
[2018-12-12 16:25:24:418] taskId: 406aa6d336f948cb81e641c1ed6b59bb, status: COMPLETED
---------------------
-DONE-
-DONE-
-DONE-
-DONE-
-DONE-
示例 No.2
用法1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
@Test
public void test2() {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");
//--- configure the task executor and start it
QueueTaskExecutor<String> executor = new QueueTaskExecutor<String>()
.defaultTaskTimeout(30_000)//Set default timeout for single task execution
.taskConcurrenceCount(2)//Set max count of task executing at the same time
.taskMaxCount(100)//Set max block waiting task count
.start();
//--- submit tasks
List<TaskFuture<String>> taskFutures = new ArrayList<>();
for (int i = 0; i < 5; i++) {
TaskFuture<String> taskFuture = executor.submit(new Task<String>() {
@Override
public void before() {
//TODO code before this task executing
}
@Override
public void after(String s) {
//TODO code after this task executed
}
@Override
public String execute() {
//TODO task codes
try {
Thread.sleep(3_000);
} catch (Exception e) {
logger.debug(e.getMessage());
}
return "-DONE-";
}
});
taskFutures.add(taskFuture);
}
//--- wait all task done
while (true) {
boolean alldone = true;
System.out.println("\r\n---------------------");
for (TaskFuture<String> taskFuture : taskFutures) {
System.out.println(String.format("[%s] taskId: %s, status: %s", simpleDateFormat.format(new Date()), taskFuture.getTaskId(), taskFuture.getTaskStatus()));
alldone = alldone && (taskFuture.getTaskStatus().getStatus() >= TaskStatus.COMPLETED.getStatus());
}
if (alldone) {
break;
}
try {
Thread.sleep(3000);
} catch (Exception e) {
logger.debug(e.getMessage());
}
}
//--- get all task exit values
System.out.println("\r\n---------------------");
for (TaskFuture<String> taskFuture : taskFutures) {
System.out.println(taskFuture.exitValue());
}
//--- stop the executor
executor.stop();
}
结果输出1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
---------------------
[2018-12-12 16:25:37:302] taskId: 48766295a2584ef49ddc374e1f3ba0ed, status: WAITING
[2018-12-12 16:25:37:303] taskId: e63440f37ffa4532b0a663519d687c4c, status: WAITING
[2018-12-12 16:25:37:303] taskId: 0c9c34813171454eab4a2de4b21ba282, status: WAITING
[2018-12-12 16:25:37:304] taskId: ff7f55a6fe4f47009e8418601430e9d3, status: WAITING
[2018-12-12 16:25:37:304] taskId: ae711ef97cb14a908552f030601f7ff9, status: WAITING
---------------------
[2018-12-12 16:25:40:307] taskId: 48766295a2584ef49ddc374e1f3ba0ed, status: COMPLETED
[2018-12-12 16:25:40:308] taskId: e63440f37ffa4532b0a663519d687c4c, status: EXECUTING
[2018-12-12 16:25:40:308] taskId: 0c9c34813171454eab4a2de4b21ba282, status: WAITING
[2018-12-12 16:25:40:308] taskId: ff7f55a6fe4f47009e8418601430e9d3, status: WAITING
[2018-12-12 16:25:40:308] taskId: ae711ef97cb14a908552f030601f7ff9, status: WAITING
---------------------
[2018-12-12 16:25:43:311] taskId: 48766295a2584ef49ddc374e1f3ba0ed, status: COMPLETED
[2018-12-12 16:25:43:312] taskId: e63440f37ffa4532b0a663519d687c4c, status: COMPLETED
[2018-12-12 16:25:43:312] taskId: 0c9c34813171454eab4a2de4b21ba282, status: EXECUTING
[2018-12-12 16:25:43:312] taskId: ff7f55a6fe4f47009e8418601430e9d3, status: EXECUTING
[2018-12-12 16:25:43:312] taskId: ae711ef97cb14a908552f030601f7ff9, status: WAITING
---------------------
[2018-12-12 16:25:46:318] taskId: 48766295a2584ef49ddc374e1f3ba0ed, status: COMPLETED
[2018-12-12 16:25:46:319] taskId: e63440f37ffa4532b0a663519d687c4c, status: COMPLETED
[2018-12-12 16:25:46:319] taskId: 0c9c34813171454eab4a2de4b21ba282, status: COMPLETED
[2018-12-12 16:25:46:319] taskId: ff7f55a6fe4f47009e8418601430e9d3, status: COMPLETED
[2018-12-12 16:25:46:319] taskId: ae711ef97cb14a908552f030601f7ff9, status: EXECUTING
---------------------
[2018-12-12 16:25:49:320] taskId: 48766295a2584ef49ddc374e1f3ba0ed, status: COMPLETED
[2018-12-12 16:25:49:320] taskId: e63440f37ffa4532b0a663519d687c4c, status: COMPLETED
[2018-12-12 16:25:49:321] taskId: 0c9c34813171454eab4a2de4b21ba282, status: COMPLETED
[2018-12-12 16:25:49:321] taskId: ff7f55a6fe4f47009e8418601430e9d3, status: COMPLETED
[2018-12-12 16:25:49:321] taskId: ae711ef97cb14a908552f030601f7ff9, status: COMPLETED
---------------------
-DONE-
-DONE-
-DONE-
-DONE-
-DONE-