分布式系统笔记:利用zookeeper实现分布式任务锁(Java)

利用zookeeper实现分布式任务锁

依赖原理

  • 在ZK中添加基本节点,路径为锁名称,节点类型为持久节点(PERSISTENT)。
  • 对需要获取锁的每个线程,在ZK中分别添加基本节点的子节点,路径程序自定为temp,类型为临时自编号节点(EPHEMERAL_SEQUENTIAL),并保存创建返回的实际节点路径。
  • 通过delete方式删除本线程创建的子节点,可以作为锁释放的方式。
  • 基本节点的子节点类型为临时自编号节点(EPHEMERAL_SEQUENTIAL),当线程与ZK连接中断后,ZK会自动将该节点删除,确保了断连之后的锁释放。
  • 由于ZK自编号产生的路径是递增的,因此可以通过判断基本节点的子节点中最小路径数字编号的节点是否是本线程新建的节点来判断是否获取到锁。

原理图示

利用zk实现的分布式任务锁实现原理如下:

8个线程分别尝试获取分布式任务锁,情况如下:

  • (1)8个线程分别在ZK基本节点下创建临时自编号节点,获取创建成功后的实际路径
  • (2)在基本节点子节点列表中,判断本线程创建节点编号是否为最小
  • (3)最小编号线程获取分布式任务锁,执行临界区程序,完成任务

image

线程执行完任务,释放锁,情况如下:

  • (1)线程释放锁,将ZK中对应的临时节点删除,此时基本节点下路径最小的子节点获取分布式任务锁
  • (2)某线程由于网络原因与ZK断开了连接,退出锁竞争,ZK自动将其对应的临时节点删除
  • (3)新出现的线程加入锁竞争,在ZK下创建临时节点,排队等待锁竞争

image

方案一 :轮询方式

实现原理

程序流程图如下:

image

实现代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
import org.apache.zookeeper.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

/**
* 基于zk的分布式任务锁
* <pre>
* 方案一:轮询方式
*
* 实现思路:阻塞递归获取父节点的子节点列表判断是否获取锁
* 1.连接ZK时候监听连接结果,若连接不成功则将是否需要中断标志位置为是,之后获取锁方法会直接返回false。
* 2.创建以锁名称命名的持久化节点作为父节点,在父节点下创建名称固定(程序定义)的临时节点,实际创建的节点路径zookeeper服务器会进行自编号。
* 3.获取锁方法中,方法会阻塞、递归判断本线程对应子节点的路径后缀编号是否是父节点下所有子节点中最小的,若是最小的获取锁,反之继续阻塞。
* 4.获取锁方法中,若查询父节点的子节点列表出现异常,则退出阻塞状态并直接返回false,退出锁竞争。
* 5.线程获取锁并执行完成任务后,释放锁(删除对应的子节点)并释放zookeeper连接。
* </pre>
* Created by xuyh at 2017/11/24 9:18.
*/
public class ZKLock {
private Logger logger = LoggerFactory.getLogger(ZKLock.class);
private static final String CHILD_NODE_PATH = "temp";
private String baseLockPath;
private String finalLockId;

//是否需要中断阻塞标志位
private boolean needInterrupt = false;
//ZK是否连接成功标志位
private boolean connected = false;

private String host = "127.0.0.1";
private String port = "2181";
private ZooKeeper zooKeeper;

private ZKLock(String lock, String host, String port) {
this.host = host;
this.port = port;
this.baseLockPath = "/" + lock;
}

/**
* 新建锁(连接ZK阻塞)
*
* @param host zk 服务ip
* @param port zk 服务端口
* @param lock 锁名称
* @return
*/
public static ZKLock create(String host, String port, String lock) {
ZKLock zkLock = new ZKLock(lock, host, port);
zkLock.connectZooKeeper();
return zkLock;
}

/**
* 获取锁(阻塞)
*
* @return true代表获取到分布式任务锁
*/
public boolean getLock() {
return isSubPathMin();
}

/**
* 释放锁
*
* @return true代表释放锁成功, 并切断ZK连接
*/
public boolean releaseLock() {
try {
if (zooKeeper != null && connected) {
zooKeeper.delete(finalLockId, -1);
}
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
return disconnectZooKeeper();
}

private boolean connectZooKeeper() {
try {
//连接ZK,并注册连接状态监听
zooKeeper = new ZooKeeper(host + ":" + port, 60000, event -> {
if (event.getState() == Watcher.Event.KeeperState.AuthFailed) {
needInterrupt = true;
} else if (event.getState() == Watcher.Event.KeeperState.Disconnected) {
needInterrupt = true;
} else if (event.getState() == Watcher.Event.KeeperState.Expired) {
needInterrupt = true;
} else {
if (event.getType() == Watcher.Event.EventType.None) {//连接成功
connected = true;
}
}
});

//等待异步连接成功,超过时间30s则退出等待,防止线程锁死
int i = 1;
while (!connected) {
if (i == 100)
break;
Thread.sleep(300);
i++;
}

if (connected) {
//创建父节点
if (zooKeeper.exists(baseLockPath, false) == null) {
zooKeeper.create(baseLockPath, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}

//创建子节点
finalLockId = zooKeeper.create(baseLockPath + "/" + CHILD_NODE_PATH, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
} else {
needInterrupt = true;
logger.warn("Connect zookeeper failed. Time consumes 30 s");
return false;
}
} catch (Exception e) {
logger.warn(e.getMessage(), e);
return false;
}
return true;
}

private boolean disconnectZooKeeper() {
if (zooKeeper == null && !connected)
return false;
try {
connected = false;
zooKeeper.close();
} catch (Exception e) {
logger.warn(String.format("ZK disconnect failed. [%s]", e.getMessage()), e);
}
return true;
}

private boolean isSubPathMin() {
if (!connected)
return false;
try {
Thread.sleep(1000);
List<String> childrenList = zooKeeper.getChildren(baseLockPath, false);

if (needInterrupt) {
disconnectZooKeeper();
return false;
}

if (judgePathNumMin(childrenList)) {
return true;
} else {
return isSubPathMin();
}
} catch (Exception e) {
logger.warn(e.getMessage(), e);
disconnectZooKeeper();
return false;
}
}

private boolean judgePathNumMin(List<String> paths) {
if (paths.isEmpty())
return true;
if (paths.size() >= 2) {
//对无序状态的子节点路径列表按照编号升序排序
paths.sort((str1, str2) -> {
int num1;
int num2;
String string1 = str1.substring(CHILD_NODE_PATH.length(), str1.length());
String string2 = str2.substring(CHILD_NODE_PATH.length(), str2.length());
num1 = Integer.parseInt(string1);
num2 = Integer.parseInt(string2);
if (num1 > num2) {
return 1;
} else if (num1 < num2) {
return -1;
} else {
return 0;
}
});
}

//判断本线程的节点路径是否是最小编号
String minId = paths.get(0);
return finalLockId.equals(baseLockPath + "/" + minId);
}
}

测试

测试程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
private void testZKLockOneWithMultiThread() throws Exception {
int threadCount = 20;
List<TestThread> testThreads = new ArrayList<>();
for (int i = 0; i < threadCount; i++) {
testThreads.add(new TestThread("" + i, "127.0.0.1", "2181", "testThreadLock"));
}
testThreads.forEach(TestThread::start);
Thread.sleep(100000);
}

private class TestThread extends Thread {
private String host;
private String port;
private String lockPath;

private String num;

/**
* @param threadNum 线程编号
*/
public TestThread(String threadNum, String host, String port, String lockPath) {
this.host = host;
this.port = port;
this.lockPath = lockPath;
this.num = threadNum;
}

@Override
public void run() {
ZKLock zkLock = ZKLock.create(host, port, lockPath);
if (zkLock.getLock()) {
System.out.println(String.format("线程:[%s]获取到任务锁,并执行了任务", num));
try {
Thread.sleep(2000);
} catch (Exception e) {
}
} else {
System.out.println(String.format("线程:[%s]没有获取到任务锁,放弃执行任务", num));
}
zkLock.releaseLock();
}
}

结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
线程:[17]获取到任务锁,并执行了任务
线程:[11]获取到任务锁,并执行了任务
线程:[1]获取到任务锁,并执行了任务
线程:[19]获取到任务锁,并执行了任务
线程:[14]获取到任务锁,并执行了任务
线程:[8]获取到任务锁,并执行了任务
线程:[0]获取到任务锁,并执行了任务
线程:[5]获取到任务锁,并执行了任务
线程:[10]获取到任务锁,并执行了任务
线程:[16]获取到任务锁,并执行了任务
线程:[13]获取到任务锁,并执行了任务
线程:[12]获取到任务锁,并执行了任务
线程:[6]获取到任务锁,并执行了任务
线程:[18]获取到任务锁,并执行了任务
线程:[9]获取到任务锁,并执行了任务
线程:[7]获取到任务锁,并执行了任务
线程:[4]获取到任务锁,并执行了任务
线程:[15]获取到任务锁,并执行了任务
线程:[3]获取到任务锁,并执行了任务
线程:[2]获取到任务锁,并执行了任务

方案一优劣

优点

  • 通过递归实现循环轮询
  • 程序实现逻辑简单易懂
  • 不需要实现监听节点变动的watcher

劣势

  • 每个在阻塞状态下竞争锁的线程,都需要在固定时间间隔查询所有存活节点情况,导致网络开销巨大,资源浪费巨大

方案二 :父节点监听方式

实现原理

程序流程图如下:

image

实现代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
import org.apache.zookeeper.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

/**
* 基于zk的分布式任务锁
* <pre>
* 方案二:父节点监听方式
*
* 实现思路:监听父节点状态
* 1.在父节点(持久化)下创建临时节点,实际创建的节点路径会根据数量进行自增(ZK自编号方式创建节点)。
* 2.创建节点成功后,获取父节点下的子节点列表,判断本线程的路径后缀编号是否是所有子节点中最小的,若是则获取锁,反之监听父节点变动状态(通过getChildren()方法注册watcher)
* 3.当父节点状态变动(主要是子节点列表变动)后watcher会接收到通知,这时判断父节点下的子节点的排序状态,若满足本线程的路径后缀编号最小则获取锁,反之继续注册watcher监听父节点状态
* </pre>
* <p>
* Created by xuyh at 2017/11/29 13:43.
*/
public class ZKLockTwo {
private Logger logger = LoggerFactory.getLogger(ZKLockTwo.class);
private static final String CHILD_NODE_PATH = "temp";
private String baseLockPath;
private String finalLockId;

//是否需要中断阻塞标志位
private boolean needInterrupt = false;
//ZK是否连接成功标志位
private boolean connected = false;
//是否获取到锁标志位
private boolean acquireLock = false;

private String host = "127.0.0.1";
private String port = "2181";
private ZooKeeper zooKeeper;
private FatherNodeWatcher fatherNodeWatcher;

private ZKLockTwo(String lock, String host, String port) {
this.host = host;
this.port = port;
this.baseLockPath = "/" + lock;
this.fatherNodeWatcher = new FatherNodeWatcher(this);
}

/**
* 新建锁(连接ZK阻塞)
*
* @param host zk 服务ip
* @param port zk 服务端口
* @param lock 锁名称
* @return
*/
public static ZKLockTwo create(String host, String port, String lock) {
ZKLockTwo zkLockTwo = new ZKLockTwo(lock, host, port);
zkLockTwo.connectZooKeeper();
return zkLockTwo;
}

/**
* 获取锁(阻塞)
*
* @return true代表获取到分布式任务锁
*/
public boolean getLock() {
if (!connected)
return false;
while (!needInterrupt) {
try {
Thread.sleep(1000);
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}

if (acquireLock) {
return true;
}
}
return false;
}

/**
* 释放锁
*
* @return true代表释放锁成功, 并切断ZK连接
*/
public boolean releaseLock() {
try {
if (zooKeeper != null && connected) {
zooKeeper.delete(finalLockId, -1);
}
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
return disconnectZooKeeper();
}

private boolean disconnectZooKeeper() {
if (zooKeeper == null && !connected)
return false;
try {
connected = false;
acquireLock = false;
zooKeeper.close();
} catch (Exception e) {
logger.warn(String.format("ZK disconnect failed. [%s]", e.getMessage()), e);
}
return true;
}

private boolean connectZooKeeper() {
try {
//连接ZK
zooKeeper = new ZooKeeper(host + ":" + port, 60000, event -> {
if (event.getState() == Watcher.Event.KeeperState.AuthFailed) {
needInterrupt = true;
} else if (event.getState() == Watcher.Event.KeeperState.Disconnected) {
needInterrupt = true;
} else if (event.getState() == Watcher.Event.KeeperState.Expired) {
needInterrupt = true;
} else {
if (event.getType() == Watcher.Event.EventType.None) {//连接成功
connected = true;
}
}
});

//等待异步连接成功,超过时间30s则退出等待,防止线程锁死
int i = 1;
while (!connected) {
if (i == 100)
break;
Thread.sleep(300);
i++;
}

if (connected) {
//创建父节点
if (zooKeeper.exists(baseLockPath, false) == null) {
zooKeeper.create(baseLockPath, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}

//创建子节点
finalLockId = zooKeeper.create(baseLockPath + "/" + CHILD_NODE_PATH, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);

//检查一次是否获取到锁
checkAcquire();
} else {
needInterrupt = true;
logger.warn("Connect zookeeper failed. Time consumes 30 s");
return false;
}
} catch (Exception e) {
logger.warn(e.getMessage(), e);
return false;
}
return true;
}

private void checkAcquire() {
if (!connected)
return;
try {
//获取子节点列表同时再次注册监听
List<String> childrenList = zooKeeper.getChildren(baseLockPath, fatherNodeWatcher);
if (judgePathNumMin(childrenList)) {
acquireLock = true;//获取到锁
}
} catch (Exception e) {
logger.warn(e.getMessage(), e);
disconnectZooKeeper();
}
}

private boolean judgePathNumMin(List<String> paths) {
if (paths.isEmpty())
return true;
if (paths.size() >= 2) {
//对无序状态的子节点路径列表按照编号升序排序
paths.sort((str1, str2) -> {
int num1;
int num2;
String string1 = str1.substring(CHILD_NODE_PATH.length(), str1.length());
String string2 = str2.substring(CHILD_NODE_PATH.length(), str2.length());
num1 = Integer.parseInt(string1);
num2 = Integer.parseInt(string2);
if (num1 > num2) {
return 1;
} else if (num1 < num2) {
return -1;
} else {
return 0;
}
});
}

//判断本线程的节点路径是否是最小编号
String minId = paths.get(0);
return finalLockId.equals(baseLockPath + "/" + minId);
}

private class FatherNodeWatcher implements Watcher {
private ZKLockTwo context;

FatherNodeWatcher(ZKLockTwo context) {
this.context = context;
}

@Override
public void process(WatchedEvent event) {
if (event.getState() == Watcher.Event.KeeperState.AuthFailed) {
context.needInterrupt = true;
} else if (event.getState() == Watcher.Event.KeeperState.Disconnected) {
context.needInterrupt = true;
} else if (event.getState() == Watcher.Event.KeeperState.Expired) {
context.needInterrupt = true;
} else {
if (event.getType() == Event.EventType.NodeChildrenChanged) {//子节点有变动
context.checkAcquire();
}
}
}
}
}

测试

测试程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
private void testZKLockTwoWithMultiThread() throws Exception {
int threadCount = 20;
List<TestLockTwoThread> testLockTwoThreads = new ArrayList<>();
for (int i = 0; i < threadCount; i++) {
testLockTwoThreads.add(new TestLockTwoThread("" + i, "127.0.0.1", "2181", "TestLockTwoThreadLock"));
}
testLockTwoThreads.forEach(TestLockTwoThread::start);
Thread.sleep(100000);
}

private class TestLockTwoThread extends Thread {
private String host;
private String port;
private String lockPath;

private String num;

/**
* @param threadNum 线程编号
*/
public TestLockTwoThread(String threadNum, String host, String port, String lockPath) {
this.host = host;
this.port = port;
this.lockPath = lockPath;
this.num = threadNum;
}

@Override
public void run() {
ZKLockTwo zkLockTwo = ZKLockTwo.create(host, port, lockPath);
if (zkLockTwo.getLock()) {
System.out.println(String.format("线程:[%s]获取到任务锁,并执行了任务", num));
try {
Thread.sleep(2000);
} catch (Exception e) {
}
} else {
System.out.println(String.format("线程:[%s]没有获取到任务锁,放弃执行任务", num));
}
zkLockTwo.releaseLock();
}
}

结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
线程:[4]获取到任务锁,并执行了任务
线程:[8]获取到任务锁,并执行了任务
线程:[3]获取到任务锁,并执行了任务
线程:[2]获取到任务锁,并执行了任务
线程:[1]获取到任务锁,并执行了任务
线程:[0]获取到任务锁,并执行了任务
线程:[19]获取到任务锁,并执行了任务
线程:[17]获取到任务锁,并执行了任务
线程:[16]获取到任务锁,并执行了任务
线程:[15]获取到任务锁,并执行了任务
线程:[14]获取到任务锁,并执行了任务
线程:[12]获取到任务锁,并执行了任务
线程:[11]获取到任务锁,并执行了任务
线程:[10]获取到任务锁,并执行了任务
线程:[9]获取到任务锁,并执行了任务
线程:[6]获取到任务锁,并执行了任务
线程:[5]获取到任务锁,并执行了任务
线程:[18]获取到任务锁,并执行了任务
线程:[7]获取到任务锁,并执行了任务
线程:[13]获取到任务锁,并执行了任务

方案二优劣

优点

  • 实现对父节点变动状态(主要是子节点列表变化)的监听
  • 当子节点列表出现变化后,ZK通知监听的各个线程,各个线程查询子节点状态
  • 相对于轮询方式来说,避免了很大一部分网络开销和资源浪费
  • 对父节点进行监听,实现起来相对简单

劣势

  • 每个在阻塞状态下竞争锁的线程,都监听了父节点状态,即父节点出现变动(主要是子节点列表变化)后,ZK服务器需要通知到所有注册监听的线程,网络消耗和资源浪费依然比较大

方案三 :子节点监听方式

实现原理

程序流程图如下:

image

实现代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

/**
* Created by xuyh at 2017/11/29 19:00.
* <p>
* **最优方案**
* <pre>
* 方案三:子节点监听方式
*
* 实现思路:监听子节点状态
* 1.在父节点(持久化)下创建临时节点,实际创建的节点路径会根据数量进行自增(ZK自编号方式创建节点)。
* 2.创建节点成功后,首先获取父节点下的子节点列表,判断本线程的路径后缀编号是否是所有子节点中最小的,若是则获取锁,反之监听本节点前一个节点(路径排序为本节点路径数字减一的节点)变动状态(通过getData()方法注册watcher)
* 3.当监听对象状态变动(节点删除状态)后watcher会接收到通知,这时再次判断父节点下的子节点的排序状态,若满足本线程的路径后缀编号最小则获取锁,反之继续注册watcher监听前一个节点状态
* </pre>
*/
public class ZKLockThree {
private Logger logger = LoggerFactory.getLogger(ZKLockThree.class);
private static final String CHILD_NODE_PATH = "temp";
private String baseLockPath;
private String finalLockId;

//是否需要中断阻塞标志位
private boolean needInterrupt = false;
//ZK是否连接成功标志位
private boolean connected = false;
//是否获取到锁标志位
private boolean acquireLock = false;

private String host = "127.0.0.1";
private String port = "2181";
private ZooKeeper zooKeeper;
private PreviousNodeWatcher previousNodeWatcher;

private ZKLockThree(String host, String port, String lock) {
this.host = host;
this.port = port;
this.baseLockPath = "/" + lock;
this.previousNodeWatcher = new PreviousNodeWatcher(this);
}

/**
* 新建锁(连接ZK阻塞)
*
* @param host zk 服务ip
* @param port zk 服务端口
* @param lock 锁名称
* @return
*/
public static ZKLockThree create(String host, String port, String lock) {
ZKLockThree zkLockThree = new ZKLockThree(host, port, lock);
zkLockThree.connectZooKeeper();
return zkLockThree;
}

/**
* 获取锁(阻塞)
*
* @return true代表获取到分布式任务锁
*/
public boolean getLock() {
if (!connected)
return false;
while (!needInterrupt) {
try {
Thread.sleep(1000);
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}

if (acquireLock) {
return true;
}
}
return false;
}

/**
* 释放锁
*
* @return true代表释放锁成功, 并切断ZK连接
*/
public boolean releaseLock() {
try {
if (zooKeeper != null && connected) {
zooKeeper.delete(finalLockId, -1);
}
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
return disconnectZooKeeper();
}

private boolean disconnectZooKeeper() {
if (zooKeeper == null && !connected)
return false;
try {
connected = false;
acquireLock = false;
zooKeeper.close();
} catch (Exception e) {
logger.warn(String.format("ZK disconnect failed. [%s]", e.getMessage()), e);
}
return true;
}

private boolean connectZooKeeper() {
try {
//连接ZK
zooKeeper = new ZooKeeper(host + ":" + port, 60000, event -> {
if (event.getState() == Watcher.Event.KeeperState.AuthFailed) {
needInterrupt = true;
} else if (event.getState() == Watcher.Event.KeeperState.Disconnected) {
needInterrupt = true;
} else if (event.getState() == Watcher.Event.KeeperState.Expired) {
needInterrupt = true;
} else {
if (event.getType() == Watcher.Event.EventType.None) {//连接成功
connected = true;
}
}
});

//等待异步连接成功,超过时间30s则退出等待,防止线程锁死
int i = 1;
while (!connected) {
if (i == 100)
break;
Thread.sleep(300);
i++;
}

if (connected) {
//创建父节点
if (zooKeeper.exists(baseLockPath, false) == null) {
zooKeeper.create(baseLockPath, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}

//创建子节点
finalLockId = zooKeeper.create(baseLockPath + "/" + CHILD_NODE_PATH, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);

//检查一次是否获取到锁
checkAcquire();
} else {
needInterrupt = true;
logger.warn("Connect zookeeper failed. Time consumes 30 s");
return false;
}
} catch (Exception e) {
logger.warn(e.getMessage(), e);
return false;
}
return true;
}

private void checkAcquire() {
if (!connected)
return;
try {
//获取子节点列表,若没有获取到锁,注册监听,监听对象应当是比本节点路径编号小一(或者排在前面一位)的节点
List<String> childrenList = zooKeeper.getChildren(baseLockPath, false);

if (judgePathNumMin(childrenList)) {
acquireLock = true;//获取到锁
} else {
watchPreviousNode(childrenList);
}

} catch (Exception e) {
logger.warn(e.getMessage(), e);
disconnectZooKeeper();
}
}

private boolean judgePathNumMin(List<String> paths) {
if (paths.isEmpty())
return true;
if (paths.size() >= 2) {
//对无序状态的子节点路径列表按照编号升序排序
paths.sort((str1, str2) -> {
int num1;
int num2;
String string1 = str1.substring(CHILD_NODE_PATH.length(), str1.length());
String string2 = str2.substring(CHILD_NODE_PATH.length(), str2.length());
num1 = Integer.parseInt(string1);
num2 = Integer.parseInt(string2);
if (num1 > num2) {
return 1;
} else if (num1 < num2) {
return -1;
} else {
return 0;
}
});
}

//判断本线程的节点路径是否是最小编号
String minId = paths.get(0);
return finalLockId.equals(baseLockPath + "/" + minId);
}

private void watchPreviousNode(List<String> paths) {
if (paths.isEmpty() || paths.size() == 1) {
needInterrupt = true;
return;
}
int currentNodeIndex = paths.indexOf(finalLockId.substring((baseLockPath + "/").length(), finalLockId.length()));
String previousNodePath = baseLockPath + "/" + paths.get(currentNodeIndex - 1);
//通过getData方法注册watcher
try {
zooKeeper.getData(previousNodePath, previousNodeWatcher, new Stat());
} catch (Exception e) {
//watcher注册失败,退出锁竞争
logger.warn(String.format("Previous node watcher register failed! message: [%s]", e.getMessage()), e);
needInterrupt = true;
}
}

private class PreviousNodeWatcher implements Watcher {
private ZKLockThree context;

PreviousNodeWatcher(ZKLockThree context) {
this.context = context;
}

@Override
public void process(WatchedEvent event) {
if (event.getState() == Watcher.Event.KeeperState.AuthFailed) {
context.needInterrupt = true;
} else if (event.getState() == Watcher.Event.KeeperState.Disconnected) {
context.needInterrupt = true;
} else if (event.getState() == Watcher.Event.KeeperState.Expired) {
context.needInterrupt = true;
} else {
//节点被删除了,说明这个节点释放了锁
if (event.getType() == Event.EventType.NodeDeleted) {
context.checkAcquire();
}
}
}
}
}

测试

测试程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
private void testZKLockThreeWithMultiThread() throws Exception {
int threadCount = 20;
List<TestLockThreeThread> testLockThreeThreads = new ArrayList<>();
for (int i = 0; i < threadCount; i++) {
testLockThreeThreads.add(new TestLockThreeThread("" + i, "127.0.0.1", "2181", "TestLockThreeThreadLock"));
}
testLockThreeThreads.forEach(TestLockThreeThread::start);
Thread.sleep(100000);
}

private class TestLockThreeThread extends Thread {
private String host;
private String port;
private String lockPath;

private String num;

/**
* @param threadNum 线程编号
*/
public TestLockThreeThread(String threadNum, String host, String port, String lockPath) {
this.host = host;
this.port = port;
this.lockPath = lockPath;
this.num = threadNum;
}

@Override
public void run() {
ZKLockThree zkLockThree = ZKLockThree.create(host, port, lockPath);
if (zkLockThree.getLock()) {
System.out.println(String.format("线程:[%s]获取到任务锁,并执行了任务", num));
try {
Thread.sleep(2000);
} catch (Exception e) {
}
} else {
System.out.println(String.format("线程:[%s]没有获取到任务锁,放弃执行任务", num));
}
zkLockThree.releaseLock();
}
}

结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
线程:[4]获取到任务锁,并执行了任务
线程:[7]获取到任务锁,并执行了任务
线程:[2]获取到任务锁,并执行了任务
线程:[9]获取到任务锁,并执行了任务
线程:[8]获取到任务锁,并执行了任务
线程:[6]获取到任务锁,并执行了任务
线程:[3]获取到任务锁,并执行了任务
线程:[19]获取到任务锁,并执行了任务
线程:[12]获取到任务锁,并执行了任务
线程:[15]获取到任务锁,并执行了任务
线程:[14]获取到任务锁,并执行了任务
线程:[16]获取到任务锁,并执行了任务
线程:[17]获取到任务锁,并执行了任务
线程:[18]获取到任务锁,并执行了任务
线程:[11]获取到任务锁,并执行了任务
线程:[13]获取到任务锁,并执行了任务
线程:[0]获取到任务锁,并执行了任务
线程:[1]获取到任务锁,并执行了任务
线程:[5]获取到任务锁,并执行了任务
线程:[10]获取到任务锁,并执行了任务

方案优劣

优点

  • 实现对子节点变动状态(排序在本线程对应节点之前的一个节点)的监听
  • 被监听子节点变动(删除)之后,ZK通知本线程执行相应操作,进行锁竞争
  • 相对于父节点监听方式来说,子节点监听方式在每一次锁释放(或者节点变动)时,ZK仅通知到一个线程的watcher操作,节省了大量的网络消耗和资源占用

劣势

  • 实现方式与程序逻辑较轮询和父节点监听来说比较繁琐

总结比较

对这三种基于ZK的分布式任务锁的实现方式进行比较,可以得出这些结论:

  • 程序复杂度:
    轮询方式 < 父节点监听方式 < 子节点监听方式

  • 网络资源消耗:
    轮询方式 >> 父节点监听方式 >> 子节点监听方式

  • 程序可靠性
    轮询方式 << 父节点监听方式 < 子节点监听方式