任务队列算法实现及其用法

简介

简单的实现一个任务队列算法,用户提交任务给执行器,执行器根据’先进先出’原则依次执行用户提交的任务。
用户提交任务后即接收到一个任务控制对象,用户可以用该对象进行 获取任务当前状态、等待任务执行、
获取任务执行返回值以及取消任务等操作。

设计

原始程序流程设计框图:
image

核心代码片段

执行器主循环片段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
while (!shutdown.get()) {
loopSleep();
if (taskLatch.acquire()) {
taskDispatcherThreadPool.submit(() -> {
try {
dispatcherTask();
} catch (Exception e) {
logger.warn(e.getMessage(), e);
} finally {
taskLatch.release();
}
});
}
}

说明:

  • 任务锁为循环计数锁,初始容量为执行器配置的任务并发执行数量,即同时执行的最大任务数量。
  • 主循环循环获取任务锁,一旦获取锁,即执行任务分发程序。
  • 任务执行结束(或抛出异常),释放锁,为下一个任务分发释放资源。

任务执行分发程序片段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
final TaskFuture<T> taskFuture = taskFutureBlockingQueue.poll();
if (taskFuture != null) {
final String taskId = taskFuture.getTaskId();
final SimpleTask<T> task = taskFuture.getTask();
if (taskId != null && task != null) {
taskFuture.setTaskStatus(TaskStatus.EXECUTING);
Future<T> future = taskExecutionThreadPool.submit(() -> {
T result = null;
int step = 0;
try {
if (task instanceof Task) {
Task<T> tTask = (Task<T>) task;
step = 1;
tTask.before();
step = 2;
result = task.execute();
step = 3;
tTask.after(result);
} else {
result = task.execute();
}
} catch (Exception e) {
switch (step) {
case 1:
logger.warn(String.format("Exception happened while task before operation, message: %s", e.getMessage()), e);
break;
case 2:
logger.warn(String.format("Exception happened while task execution, message: %s", e.getMessage()), e);
break;
case 3:
logger.warn(String.format("Exception happened while task after operation, message: %s", e.getMessage()), e);
break;
default:
logger.warn(e.getMessage(), e);
break;
}
}
return result;
});
taskFuture.setFuture(future);
Long executionTimeout = taskFuture.getExecutionTimeout() == null ? defaultTaskTimeout : taskFuture.getExecutionTimeout();
try {
taskFuture.setResult(future.get(executionTimeout, TimeUnit.MILLISECONDS));
taskFuture.setTaskStatus(TaskStatus.COMPLETED);
} catch (CancellationException cancelE) {
taskFuture.setTaskStatus(TaskStatus.CANCELED);
} catch (InterruptedException interruptE) {
taskFuture.setTaskStatus(TaskStatus.INTERRUPTED);
} catch (ExecutionException executionE) {
taskFuture.setTaskStatus(TaskStatus.ERROR);
} catch (TimeoutException timeoutE) {
taskFuture.setTaskStatus(TaskStatus.TIMEOUT);
}
}
}

说明:

  • 分发程序从任务队列中取出’等待’状态的任务,并将其状态置为’执行中’。
  • 分发程序提交任务给任务执行线程池,任务正式执行。
  • 分发程序阻塞等待任务执行。
  • 任务执行完成、超时、异常,分发程序记录状态,并将状态置入任务状态对象中。

用法示例

基础用法

用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83

@Test
public void test0() {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");

//--- configure the task executor and start it
QueueTaskExecutor<String> executor = new QueueTaskExecutor<String>()
.defaultTaskTimeout(30_000)//Set default timeout for single task execution
.taskConcurrenceCount(2)//Set max count of task executing at the same time
.taskMaxCount(100)//Set max block waiting task count
.start();


{
//--- submit task
TaskFuture<String> taskFuture = executor.submit(() -> {
// TODO task codes
try {
Thread.sleep(3_000);
} catch (Exception e) {
logger.debug(e.getMessage());
}
return "-DONE-";
});

//--- wait task done
String result = taskFuture.waitFor();
System.out.println(String.format("[%s] taskId: %s, result: %s", simpleDateFormat.format(new Date()), taskFuture.getTaskId(), result));
}


{
//--- submit task
TaskFuture<String> taskFuture = executor.submit(() -> {
// TODO task codes
try {
Thread.sleep(3_000);
} catch (Exception e) {
logger.debug(e.getMessage());
}
return "-DONE-";
});

//--- wait task done
if (taskFuture.waitFor(10_000)) {
String result = taskFuture.exitValue();
System.out.println(String.format("[%s] taskId: %s, result: %s", simpleDateFormat.format(new Date()), taskFuture.getTaskId(), result));
}
}

{
//--- submit task
TaskFuture<String> taskFuture = executor.submit(new Task<String>() {
@Override
public void before() {
//TODO code before this task executing
}

@Override
public void after(String s) {
//TODO code after this task executed
}

@Override
public String execute() {
//TODO task codes
try {
Thread.sleep(3_000);
} catch (Exception e) {
logger.debug(e.getMessage());
}
return "-DONE-";
}
});

//--- wait task done
String result = taskFuture.waitFor();
System.out.println(String.format("[%s] taskId: %s, result: %s", simpleDateFormat.format(new Date()), taskFuture.getTaskId(), result));
}

//--- stop the executor
executor.stop();
}

结果输出

1
2
3
4

[2018-12-12 16:24:31:589] taskId: 2afd1912e76c400987804e83ab6fc422, result: -DONE-
[2018-12-12 16:24:34:631] taskId: d48f1df1c4364d28b30d4b14535cdd12, result: -DONE-
[2018-12-12 16:24:37:667] taskId: b5a08e9d105b49b38acd9d544a72145f, result: -DONE-

用法

示例 No.1

用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55

@Test
public void test1() {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");

//--- configure the task executor and start it
QueueTaskExecutor<String> executor = new QueueTaskExecutor<String>()
.defaultTaskTimeout(30_000)//Set default timeout for single task execution
.taskConcurrenceCount(2)//Set max count of task executing at the same time
.taskMaxCount(100)//Set max block waiting task count
.start();

//--- submit tasks
List<TaskFuture<String>> taskFutures = new ArrayList<>();
for (int i = 0; i < 5; i++) {
TaskFuture<String> taskFuture = executor.submit(() -> {
// TODO task codes
try {
Thread.sleep(3_000);
} catch (Exception e) {
logger.debug(e.getMessage());
}
return "-DONE-";
});
taskFutures.add(taskFuture);
}

//--- wait all task done
while (true) {
boolean alldone = true;
System.out.println("\r\n---------------------");
for (TaskFuture<String> taskFuture : taskFutures) {
System.out.println(String.format("[%s] taskId: %s, status: %s", simpleDateFormat.format(new Date()), taskFuture.getTaskId(), taskFuture.getTaskStatus()));

alldone = alldone && taskFuture.isDone();
}
if (alldone) {
break;
}
try {
Thread.sleep(3000);
} catch (Exception e) {
logger.debug(e.getMessage());
}
}

//--- get all task exit values
System.out.println("\r\n---------------------");
for (TaskFuture<String> taskFuture : taskFutures) {
System.out.println(taskFuture.exitValue());
}

//--- stop the executor
executor.stop();
}

结果输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42

---------------------
[2018-12-12 16:25:12:396] taskId: 28248cdd933641fcac24415c3a5632b9, status: WAITING
[2018-12-12 16:25:12:397] taskId: 58c9e34b53014ef091812eb7ee7f06a6, status: WAITING
[2018-12-12 16:25:12:397] taskId: 0318d0a048d643839e5ca4bc4e2748d2, status: WAITING
[2018-12-12 16:25:12:397] taskId: a4ee0322de974c61b1c409eb11b9580e, status: WAITING
[2018-12-12 16:25:12:398] taskId: 406aa6d336f948cb81e641c1ed6b59bb, status: WAITING

---------------------
[2018-12-12 16:25:15:402] taskId: 28248cdd933641fcac24415c3a5632b9, status: COMPLETED
[2018-12-12 16:25:15:402] taskId: 58c9e34b53014ef091812eb7ee7f06a6, status: EXECUTING
[2018-12-12 16:25:15:403] taskId: 0318d0a048d643839e5ca4bc4e2748d2, status: WAITING
[2018-12-12 16:25:15:403] taskId: a4ee0322de974c61b1c409eb11b9580e, status: WAITING
[2018-12-12 16:25:15:403] taskId: 406aa6d336f948cb81e641c1ed6b59bb, status: WAITING

---------------------
[2018-12-12 16:25:18:408] taskId: 28248cdd933641fcac24415c3a5632b9, status: COMPLETED
[2018-12-12 16:25:18:408] taskId: 58c9e34b53014ef091812eb7ee7f06a6, status: COMPLETED
[2018-12-12 16:25:18:409] taskId: 0318d0a048d643839e5ca4bc4e2748d2, status: EXECUTING
[2018-12-12 16:25:18:409] taskId: a4ee0322de974c61b1c409eb11b9580e, status: EXECUTING
[2018-12-12 16:25:18:409] taskId: 406aa6d336f948cb81e641c1ed6b59bb, status: WAITING

---------------------
[2018-12-12 16:25:21:413] taskId: 28248cdd933641fcac24415c3a5632b9, status: COMPLETED
[2018-12-12 16:25:21:413] taskId: 58c9e34b53014ef091812eb7ee7f06a6, status: COMPLETED
[2018-12-12 16:25:21:414] taskId: 0318d0a048d643839e5ca4bc4e2748d2, status: COMPLETED
[2018-12-12 16:25:21:414] taskId: a4ee0322de974c61b1c409eb11b9580e, status: COMPLETED
[2018-12-12 16:25:21:414] taskId: 406aa6d336f948cb81e641c1ed6b59bb, status: EXECUTING

---------------------
[2018-12-12 16:25:24:417] taskId: 28248cdd933641fcac24415c3a5632b9, status: COMPLETED
[2018-12-12 16:25:24:417] taskId: 58c9e34b53014ef091812eb7ee7f06a6, status: COMPLETED
[2018-12-12 16:25:24:418] taskId: 0318d0a048d643839e5ca4bc4e2748d2, status: COMPLETED
[2018-12-12 16:25:24:418] taskId: a4ee0322de974c61b1c409eb11b9580e, status: COMPLETED
[2018-12-12 16:25:24:418] taskId: 406aa6d336f948cb81e641c1ed6b59bb, status: COMPLETED

---------------------
-DONE-
-DONE-
-DONE-
-DONE-
-DONE-

示例 No.2

用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68

@Test
public void test2() {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");

//--- configure the task executor and start it
QueueTaskExecutor<String> executor = new QueueTaskExecutor<String>()
.defaultTaskTimeout(30_000)//Set default timeout for single task execution
.taskConcurrenceCount(2)//Set max count of task executing at the same time
.taskMaxCount(100)//Set max block waiting task count
.start();

//--- submit tasks
List<TaskFuture<String>> taskFutures = new ArrayList<>();
for (int i = 0; i < 5; i++) {
TaskFuture<String> taskFuture = executor.submit(new Task<String>() {
@Override
public void before() {
//TODO code before this task executing
}

@Override
public void after(String s) {
//TODO code after this task executed
}

@Override
public String execute() {
//TODO task codes
try {
Thread.sleep(3_000);
} catch (Exception e) {
logger.debug(e.getMessage());
}
return "-DONE-";
}
});
taskFutures.add(taskFuture);
}

//--- wait all task done
while (true) {
boolean alldone = true;
System.out.println("\r\n---------------------");
for (TaskFuture<String> taskFuture : taskFutures) {
System.out.println(String.format("[%s] taskId: %s, status: %s", simpleDateFormat.format(new Date()), taskFuture.getTaskId(), taskFuture.getTaskStatus()));

alldone = alldone && (taskFuture.getTaskStatus().getStatus() >= TaskStatus.COMPLETED.getStatus());
}
if (alldone) {
break;
}
try {
Thread.sleep(3000);
} catch (Exception e) {
logger.debug(e.getMessage());
}
}

//--- get all task exit values
System.out.println("\r\n---------------------");
for (TaskFuture<String> taskFuture : taskFutures) {
System.out.println(taskFuture.exitValue());
}

//--- stop the executor
executor.stop();
}

结果输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43


---------------------
[2018-12-12 16:25:37:302] taskId: 48766295a2584ef49ddc374e1f3ba0ed, status: WAITING
[2018-12-12 16:25:37:303] taskId: e63440f37ffa4532b0a663519d687c4c, status: WAITING
[2018-12-12 16:25:37:303] taskId: 0c9c34813171454eab4a2de4b21ba282, status: WAITING
[2018-12-12 16:25:37:304] taskId: ff7f55a6fe4f47009e8418601430e9d3, status: WAITING
[2018-12-12 16:25:37:304] taskId: ae711ef97cb14a908552f030601f7ff9, status: WAITING

---------------------
[2018-12-12 16:25:40:307] taskId: 48766295a2584ef49ddc374e1f3ba0ed, status: COMPLETED
[2018-12-12 16:25:40:308] taskId: e63440f37ffa4532b0a663519d687c4c, status: EXECUTING
[2018-12-12 16:25:40:308] taskId: 0c9c34813171454eab4a2de4b21ba282, status: WAITING
[2018-12-12 16:25:40:308] taskId: ff7f55a6fe4f47009e8418601430e9d3, status: WAITING
[2018-12-12 16:25:40:308] taskId: ae711ef97cb14a908552f030601f7ff9, status: WAITING

---------------------
[2018-12-12 16:25:43:311] taskId: 48766295a2584ef49ddc374e1f3ba0ed, status: COMPLETED
[2018-12-12 16:25:43:312] taskId: e63440f37ffa4532b0a663519d687c4c, status: COMPLETED
[2018-12-12 16:25:43:312] taskId: 0c9c34813171454eab4a2de4b21ba282, status: EXECUTING
[2018-12-12 16:25:43:312] taskId: ff7f55a6fe4f47009e8418601430e9d3, status: EXECUTING
[2018-12-12 16:25:43:312] taskId: ae711ef97cb14a908552f030601f7ff9, status: WAITING

---------------------
[2018-12-12 16:25:46:318] taskId: 48766295a2584ef49ddc374e1f3ba0ed, status: COMPLETED
[2018-12-12 16:25:46:319] taskId: e63440f37ffa4532b0a663519d687c4c, status: COMPLETED
[2018-12-12 16:25:46:319] taskId: 0c9c34813171454eab4a2de4b21ba282, status: COMPLETED
[2018-12-12 16:25:46:319] taskId: ff7f55a6fe4f47009e8418601430e9d3, status: COMPLETED
[2018-12-12 16:25:46:319] taskId: ae711ef97cb14a908552f030601f7ff9, status: EXECUTING

---------------------
[2018-12-12 16:25:49:320] taskId: 48766295a2584ef49ddc374e1f3ba0ed, status: COMPLETED
[2018-12-12 16:25:49:320] taskId: e63440f37ffa4532b0a663519d687c4c, status: COMPLETED
[2018-12-12 16:25:49:321] taskId: 0c9c34813171454eab4a2de4b21ba282, status: COMPLETED
[2018-12-12 16:25:49:321] taskId: ff7f55a6fe4f47009e8418601430e9d3, status: COMPLETED
[2018-12-12 16:25:49:321] taskId: ae711ef97cb14a908552f030601f7ff9, status: COMPLETED

---------------------
-DONE-
-DONE-
-DONE-
-DONE-
-DONE-

代码仓库

https://github.com/johnsonmoon/queue-task.git