散装java 散装java
首页
  • Java基础
  • JVM
  • Java多线程
  • 知识点
  • 案例
  • Redis
  • RabbitMQ
  • Kafka
  • Elasticsearch
  • MySQL
  • Linux
  • Docker
  • Zookeeper
  • Nginx
  • Git
  • JMeter
  • Gradle
  • 常见BUG
  • 常见解决方案
  • 资源
  • 问答
💖支持
Gitee (opens new window)
首页
  • Java基础
  • JVM
  • Java多线程
  • 知识点
  • 案例
  • Redis
  • RabbitMQ
  • Kafka
  • Elasticsearch
  • MySQL
  • Linux
  • Docker
  • Zookeeper
  • Nginx
  • Git
  • JMeter
  • Gradle
  • 常见BUG
  • 常见解决方案
  • 资源
  • 问答
💖支持
Gitee (opens new window)
  • Spring Framework

    • Spring Framework 源码拉取编译技巧
  • Spring知识点

    • Spring 导读
    • Spring 过滤器和拦截器的区别
    • Spring Boot 自动装配原理是如何实现的
  • Spring集成

  • 案例

    • 大文件上传-分片-秒传-断点续传
    • 布隆过滤器使用
    • Spring Boot 集成 Zookeeper 实现分布式锁
      • 简介
      • Zookeeper 集群搭建
      • Zookeeper 实现分布式锁的原理概述
      • 依赖
      • 自定义 Zookeeper 分布式锁实现
        • 锁定义
        • 锁使用
      • curator 实现 Zookeeper 分布式锁
        • 锁定义
        • 锁使用
      • 本地测试说明
    • Spring Boot 集成 Redis 实现分布式锁
    • Spring Boot 集成 MySQL 实现分布式锁
  • Spring
  • 案例
散装java
2023-01-03
目录

Spring Boot 集成 Zookeeper 实现分布式锁

提示

本文中的完整代码已上传 Gitee:

https://gitee.com/bulkall/bulk-demo/tree/master/spring-boot-lock/spring-boot-lock-zookeeper (opens new window)

# 简介

本项目会演示 Zookeeper 分布式锁的使用,会以常见的 “超卖”业务去演示

本示例代码主要包括两部分

  1. curator 实现的分布式锁使用演示
  2. 自己实现的 Zookeeper 分布式锁使用演示

# Zookeeper 集群搭建

这里以三台 Zookeeper 集群的形式,以经典的超卖场景,来演示分布式锁 我这里部署集群使用的是 Docker Compose , 详情可以参考这篇 Zookeeper 集群搭建

# Zookeeper 实现分布式锁的原理概述

Zookeeper节点路径不能重复 保证唯一性。 临时节点+事件通知

Zookeeper 实现分布式锁具有天然的优势,临时顺序节点,可以有效的避免死锁问题,让客户端断开,那么就会删除当前临时节点,让下一个节点进行工作。

  1. 所有请求进来,在/lock下创建 临时顺序节点 ,放心,zookeeper会帮你编号排序
  2. 判断自己是不是/lock下最小的节点
    1. 是,获得锁(创建节点)
    2. 否,对前面小我一级的节点进行监听
  3. 获得锁请求,处理完业务逻辑,释放锁(删除节点),后一个节点得到通知(比你年轻的死了,你成为最嫩的了)
  4. 重复步骤2

Zookeeper 内部加锁示意图

zk-lock-inner

# 依赖

curator 包中提供了 Zookeeper 的操作封装

就类似 Redisson 对 Redis 的封装

<!-- zk 分布式锁支持 -->
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>5.1.0</version>
</dependency>
1
2
3
4
5
6

# 自定义 Zookeeper 分布式锁实现

基于上图的原理,我们可以这样来实现一个分布式锁

本文中的完整代码已上传 Gitee: https://gitee.com/bulkall/bulk-demo/tree/master/spring-boot-lock/spring-boot-lock-zookeeper (opens new window)

# 锁定义

public class BulkZookeeperLock {

    private ZooKeeper zk = null;
    /**
     * zookeeper 连接
     */
    private final CountDownLatch connectLatch = new CountDownLatch(1);

    /**
     * zookeeper节点等待
     */
    private final CountDownLatch waitLatch = new CountDownLatch(1);

    /**
     * 操作的当前节点
     */
    private String currentNode = null;
    /**
     * 当前节点要监视的上一个节点
     */
    private String waitPath = null;

    public BulkZookeeperLock(ZookeeperProperty property) throws IOException, InterruptedException, KeeperException {

        //建立和zookeeper集群的连接
        zk = new ZooKeeper(property.getZkServers(), property.getSessionTimeout(), new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                // 连接建立时, 打开latch, 唤醒 wait 在该 latch 上的线程
                if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
                    connectLatch.countDown();
                }

                // 发生了 waitPath 的删除事件 唤醒 waitLatch
                if (watchedEvent.getType() == Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)) {
                    waitLatch.countDown();
                }
            }
        });

        //等待建立连接,即连接建立之后才会执行之后的代码。
        connectLatch.await();

        //判断根节点"/locks"是否存在
        Stat stat = zk.exists("/locks", false);
        if (stat == null) {
            //根节点 “/locks” 不存在,创建根节点
            zk.create("/locks", "lock".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
    }

    /**
     * 加锁方法,加锁就是在根节点“/locks”,下创建节点,不过要判断是否能获取锁。
     * 若当前节点是最小的,则可以获取锁,否则应该监听它前一个节点
     */
    public void lock() {
        try {
            currentNode = zk.create("/locks/seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

            Thread.sleep(10);

            // 注意, 没有必要监听"/locks"的子节点的变化情况
            List<String> children = zk.getChildren("/locks", false);

            if (children.size() == 1) {
                // 列表中只有一个子节点, 那肯定就是 currentNode , 说明client 获得锁
                return;
            } else {
                //对根节点下的所有临时顺序节点进行从小到大排序
                Collections.sort(children);

                //获取当前节点名称,如 seq-0000001
                String thisNode = currentNode.substring("/locks/".length());

                //获取当前节点在children中的位置
                int index = children.indexOf(thisNode);

                if (index == -1) {
                    System.out.println("数据异常");
                } else if (index == 0) {
                    // index为0表示,thisNode 在列表的子节点中最小,可以获取到锁
                    return;
                } else {
                    // 否则监听当前节点的上一个节点
                    waitPath = "/locks/" + children.get(index - 1);
                    // 在 waitPath 上注册监听器, 当 waitPath 被删除时,zookeeper 会回调监听器的 process 方法
                    zk.getData(waitPath, true, new Stat());
                    // 进入等待锁状态,如果说,上一个节点他进行了解锁,那么就可以由上面的 waitLatch.countDown(); 唤醒 继续执行,即加锁成功
                    waitLatch.await();
                    return;
                }
            }

        } catch (KeeperException | InterruptedException e) {
            e.printStackTrace();
        }

    }

    /**
     * 解锁方法,解锁就是删除当前节点
     */
    public void unLock() {
        try {
            zk.delete(currentNode, -1);
        } catch (InterruptedException | KeeperException e) {
            e.printStackTrace();
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110

# 锁使用

@SneakyThrows
@Override
public String reduceStockByMyLock(Integer id) {
  // 加 zk 锁
  BulkZookeeperLock zkLock = new BulkZookeeperLock(zookeeperProperty);
  try {
     zkLock.lock();
     ProductStock stock = productStockMapper.selectById(id);
     if (stock != null && stock.getStock() > 0) {
        productStockMapper.reduceStock(id);
     } else {
        throw new RuntimeException("库存不足!");
     }
  } finally {
     // 解 zk 锁
     zkLock.unLock();
  }
  return "ok";
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

# curator 实现 Zookeeper 分布式锁

本文中的完整代码已上传 Gitee: https://gitee.com/bulkall/bulk-demo/tree/master/spring-boot-lock/spring-boot-lock-zookeeper (opens new window)

# 锁定义

public class ZookeeperLock {
    private final CuratorFramework curatorFramework;

    /**
     * 加锁
     *
     * @param lockKey 锁标识
     * @return 锁信息
     */
    public InterProcessMutex lock(String lockKey) {
        if (!CuratorFrameworkState.STARTED.equals(curatorFramework.getState())) {
            log.warn("在调用此方法之前必须启动 CuratorFramework 实例");
            return null;
        }
        String nodePath = "/curator/lock/%s";
        try {
            // 可重入锁:InterProcessMutex 不可重入锁:InterProcessSemaphoreMutex
            InterProcessMutex mutex = new InterProcessMutex(curatorFramework, String.format(nodePath, lockKey));
            final boolean locked = mutex.acquire(-1L, null);
            return locked ? mutex : null;
        } catch (Exception e) {
            return null;
        }
    }

    /**
     * 解锁
     *
     * @param lockInstance 锁实例
     */
    public void unLock(InterProcessMutex lockInstance) {
        try {
            lockInstance.release();
        } catch (Exception e) {
            log.warn("zookeeper lock release error", e);
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

# 锁使用

public String reduceStock(Integer id) {
  // 加 zk 锁
  InterProcessMutex mutex = zookeeperLock.lock("stock");
  try {
     ProductStock stock = productStockMapper.selectById(id);
     if (stock != null && stock.getStock() > 0) {
        productStockMapper.reduceStock(id);
     } else {
        throw new RuntimeException("库存不足!");
     }
  } finally {
     // 解 zk 锁
     zookeeperLock.unLock(mutex);
  }
  return "ok";
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

# 本地测试说明

  1. 压测工具使用 JMeter , 当然也可以用别的,我用的这个, 此处不了解的可以参考 JMeter 相关操作
  2. 本地测试,借助于 IDEA 的 Allow parallel run 功能启动多个相同的服务(模拟线上环境多个副本),注意修改端口(-Dserver.port=8089),操作可以看下图
  3. 使用 Nginx 工具,将启动的多个项目做负载均衡; 此处不了解的可以参考 Nginx 配置负载均衡
  4. 接下来就是使用 JMeter 开启多个线程去压测 Nginx 暴露出来的接口了
上次更新: 2023/01/13, 18:09:17
布隆过滤器使用
Spring Boot 集成 Redis 实现分布式锁

← 布隆过滤器使用 Spring Boot 集成 Redis 实现分布式锁→

Theme by Vdoing | Copyright © 2022-2024 散装java | MIT License | 鲁ICP备2022022143号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式