Spring Boot 集成 Zookeeper 实现分布式锁
提示
本文中的完整代码已上传 Gitee:
# 简介
本项目会演示 Zookeeper 分布式锁的使用,会以常见的 “超卖”业务去演示
本示例代码主要包括两部分
- curator 实现的分布式锁使用演示
- 自己实现的 Zookeeper 分布式锁使用演示
# Zookeeper 集群搭建
这里以三台 Zookeeper 集群的形式,以经典的超卖场景,来演示分布式锁 我这里部署集群使用的是 Docker Compose , 详情可以参考这篇 Zookeeper 集群搭建
# Zookeeper 实现分布式锁的原理概述
Zookeeper节点路径不能重复 保证唯一性。 临时节点+事件通知
Zookeeper 实现分布式锁具有天然的优势,临时顺序节点,可以有效的避免死锁问题,让客户端断开,那么就会删除当前临时节点,让下一个节点进行工作。
- 所有请求进来,在/lock下创建 临时顺序节点 ,放心,zookeeper会帮你编号排序
- 判断自己是不是/lock下最小的节点
- 是,获得锁(创建节点)
- 否,对前面小我一级的节点进行监听
- 获得锁请求,处理完业务逻辑,释放锁(删除节点),后一个节点得到通知(比你年轻的死了,你成为最嫩的了)
- 重复步骤2
Zookeeper 内部加锁示意图
# 依赖
curator 包中提供了 Zookeeper 的操作封装
就类似 Redisson 对 Redis 的封装
<!-- zk 分布式锁支持 -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.1.0</version>
</dependency>
1
2
3
4
5
6
2
3
4
5
6
# 自定义 Zookeeper 分布式锁实现
基于上图的原理,我们可以这样来实现一个分布式锁
本文中的完整代码已上传 Gitee: https://gitee.com/bulkall/bulk-demo/tree/master/spring-boot-lock/spring-boot-lock-zookeeper (opens new window)
# 锁定义
public class BulkZookeeperLock {
private ZooKeeper zk = null;
/**
* zookeeper 连接
*/
private final CountDownLatch connectLatch = new CountDownLatch(1);
/**
* zookeeper节点等待
*/
private final CountDownLatch waitLatch = new CountDownLatch(1);
/**
* 操作的当前节点
*/
private String currentNode = null;
/**
* 当前节点要监视的上一个节点
*/
private String waitPath = null;
public BulkZookeeperLock(ZookeeperProperty property) throws IOException, InterruptedException, KeeperException {
//建立和zookeeper集群的连接
zk = new ZooKeeper(property.getZkServers(), property.getSessionTimeout(), new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
// 连接建立时, 打开latch, 唤醒 wait 在该 latch 上的线程
if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
connectLatch.countDown();
}
// 发生了 waitPath 的删除事件 唤醒 waitLatch
if (watchedEvent.getType() == Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)) {
waitLatch.countDown();
}
}
});
//等待建立连接,即连接建立之后才会执行之后的代码。
connectLatch.await();
//判断根节点"/locks"是否存在
Stat stat = zk.exists("/locks", false);
if (stat == null) {
//根节点 “/locks” 不存在,创建根节点
zk.create("/locks", "lock".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
/**
* 加锁方法,加锁就是在根节点“/locks”,下创建节点,不过要判断是否能获取锁。
* 若当前节点是最小的,则可以获取锁,否则应该监听它前一个节点
*/
public void lock() {
try {
currentNode = zk.create("/locks/seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
Thread.sleep(10);
// 注意, 没有必要监听"/locks"的子节点的变化情况
List<String> children = zk.getChildren("/locks", false);
if (children.size() == 1) {
// 列表中只有一个子节点, 那肯定就是 currentNode , 说明client 获得锁
return;
} else {
//对根节点下的所有临时顺序节点进行从小到大排序
Collections.sort(children);
//获取当前节点名称,如 seq-0000001
String thisNode = currentNode.substring("/locks/".length());
//获取当前节点在children中的位置
int index = children.indexOf(thisNode);
if (index == -1) {
System.out.println("数据异常");
} else if (index == 0) {
// index为0表示,thisNode 在列表的子节点中最小,可以获取到锁
return;
} else {
// 否则监听当前节点的上一个节点
waitPath = "/locks/" + children.get(index - 1);
// 在 waitPath 上注册监听器, 当 waitPath 被删除时,zookeeper 会回调监听器的 process 方法
zk.getData(waitPath, true, new Stat());
// 进入等待锁状态,如果说,上一个节点他进行了解锁,那么就可以由上面的 waitLatch.countDown(); 唤醒 继续执行,即加锁成功
waitLatch.await();
return;
}
}
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}
/**
* 解锁方法,解锁就是删除当前节点
*/
public void unLock() {
try {
zk.delete(currentNode, -1);
} catch (InterruptedException | KeeperException e) {
e.printStackTrace();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# 锁使用
@SneakyThrows
@Override
public String reduceStockByMyLock(Integer id) {
// 加 zk 锁
BulkZookeeperLock zkLock = new BulkZookeeperLock(zookeeperProperty);
try {
zkLock.lock();
ProductStock stock = productStockMapper.selectById(id);
if (stock != null && stock.getStock() > 0) {
productStockMapper.reduceStock(id);
} else {
throw new RuntimeException("库存不足!");
}
} finally {
// 解 zk 锁
zkLock.unLock();
}
return "ok";
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# curator 实现 Zookeeper 分布式锁
本文中的完整代码已上传 Gitee: https://gitee.com/bulkall/bulk-demo/tree/master/spring-boot-lock/spring-boot-lock-zookeeper (opens new window)
# 锁定义
public class ZookeeperLock {
private final CuratorFramework curatorFramework;
/**
* 加锁
*
* @param lockKey 锁标识
* @return 锁信息
*/
public InterProcessMutex lock(String lockKey) {
if (!CuratorFrameworkState.STARTED.equals(curatorFramework.getState())) {
log.warn("在调用此方法之前必须启动 CuratorFramework 实例");
return null;
}
String nodePath = "/curator/lock/%s";
try {
// 可重入锁:InterProcessMutex 不可重入锁:InterProcessSemaphoreMutex
InterProcessMutex mutex = new InterProcessMutex(curatorFramework, String.format(nodePath, lockKey));
final boolean locked = mutex.acquire(-1L, null);
return locked ? mutex : null;
} catch (Exception e) {
return null;
}
}
/**
* 解锁
*
* @param lockInstance 锁实例
*/
public void unLock(InterProcessMutex lockInstance) {
try {
lockInstance.release();
} catch (Exception e) {
log.warn("zookeeper lock release error", e);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# 锁使用
public String reduceStock(Integer id) {
// 加 zk 锁
InterProcessMutex mutex = zookeeperLock.lock("stock");
try {
ProductStock stock = productStockMapper.selectById(id);
if (stock != null && stock.getStock() > 0) {
productStockMapper.reduceStock(id);
} else {
throw new RuntimeException("库存不足!");
}
} finally {
// 解 zk 锁
zookeeperLock.unLock(mutex);
}
return "ok";
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 本地测试说明
- 压测工具使用 JMeter , 当然也可以用别的,我用的这个, 此处不了解的可以参考 JMeter 相关操作
- 本地测试,借助于 IDEA 的
Allow parallel run
功能启动多个相同的服务(模拟线上环境多个副本),注意修改端口(-Dserver.port=8089),操作可以看下图 - 使用 Nginx 工具,将启动的多个项目做负载均衡; 此处不了解的可以参考 Nginx 配置负载均衡
- 接下来就是使用 JMeter 开启多个线程去压测 Nginx 暴露出来的接口了
上次更新: 2023/01/13, 18:09:17