Zookeeper 实现分布式锁(乐观和悲观)

标签: zookeeper  分布式  

说明:
做备忘用,大家之言汇总到一起。
Jar

<!-- zkclient依赖 -->
<dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.10</version>
</dependency>

这里写图片描述

zookeeper基础巩固

ZooKeeper 节点是有生命周期的,这取决于节点的类型。在 ZooKeeper 中,节点类型可以分为持久节点(PERSISTENT )、临时节点(EPHEMERAL),以及时序节点(SEQUENTIAL ),具体在节点创建过程中,一般是组合使用,可以生成以下 4 种节点类型。

持久节点(PERSISTENT)

所谓持久节点,是指在节点创建后,就一直存在,直到有删除操作来主动清除这个节点——不会因为创建该节点的客户端会话失效而消失。

持久顺序节点(PERSISTENT_SEQUENTIAL)

这类节点的基本特性和上面的节点类型是一致的。额外的特性是,在ZK中,每个父节点会为他的第一级子节点维护一份时序,会记录每个子节点创建的先后顺序。基于这个特性,在创建子节点的时候,可以设置这个属性,那么在创建节点过程中,ZK会自动为给定节点名加上一个数字后缀,作为新的节点名。这个数字后缀的范围是整型的最大值。

临时节点(EPHEMERAL)

和持久节点不同的是,临时节点的生命周期和客户端会话绑定。也就是说,如果客户端会话失效,那么这个节点就会自动被清除掉。注意,这里提到的是会话失效,而非连接断开。另外,在临时节点下面不能创建子节点。

临时顺序节点(EPHEMERAL_SEQUENTIAL)

可以用来实现分布式锁

这里写图片描述

代码

业务代码-模拟并发下生成id

package com.dongnao.lock;

import java.text.SimpleDateFormat;
import java.util.Date;

public class OrderCodeGenerator {

    //自增长序列
    private static int i =0;

    //按照“年-月-日-小时-分钟-秒-自增长序列”的规则生成订单编号
    public String getOrderCode(){
        Date now = new Date();
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss-");
        return sdf.format(now)+ ++i;
    }
    public static void main(String[] args) {
        OrderCodeGenerator ong = new OrderCodeGenerator();
        for (int i = 0; i < 10; i++) {
            System.out.println(ong.getOrderCode());
        }
    }
}

模拟100个线程去创建订单id

代码说明:这里我们用的java的发令枪来模拟并发CountDownLatch ,主函数运行 所有的线程都处于阻塞状态 cdl.await();当 cdl.countDown();执行之后,所有线程开始并发执行 createOrder() ; 该方法中会用到 lock.lock(); 该lock 对象我们提供了三种实例,方式1是java自带的,非分布式的。方式2,3是我们利用zookeeper 来实现的,这里会贴出 方式2,3的具体代码,也会对比着去分析方式3差在那里,如何优化到方式2这用利用zookeeper来实现分布式锁,进而投入生产。

package com.dongnao.lock;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OrderServiceImpl implements Runnable {

    private static OrderCodeGenerator ong = new OrderCodeGenerator();

    private Logger logger = LoggerFactory.getLogger(OrderServiceImpl.class);

    private static final int NUM = 100;
    // 按照线程数初始化倒计数器
    private static CountDownLatch cdl = new CountDownLatch(NUM);

//  private static Lock lock = new ReentrantLock();  加锁方式1
//  private Lock lock = new ZookeeperImproveLock();  加锁方式2 
    private Lock lock = new ZookeeperLock();         加锁方式3

    // 创建订单接口
    public void createOrder() {
        String orderCode = null;
        lock.lock();
        try {
            // 获取订单编号
            orderCode = ong.getOrderCode();
        } catch (Exception e) {
            // TODO: handle exception
        }finally{
            lock.unlock();
        }


        // ……业务代码,此处省略100行代码

        logger.info(Thread.currentThread().getName()
                + " =======================>" + orderCode);
    }

    @Override
    public void run() {
        try {
            // 等待其他线程初始化
            cdl.await();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        // 创建订单
        createOrder();
    }

    public static void main(String[] args) {
        for (int i = 1; i <= NUM; i++) {
            // 按照线程数迭代实例化线程
            new Thread(new OrderServiceImpl()).start();
            // 创建一个线程,倒计数器减1
            cdl.countDown();
        }
    }

}

lock 对象 方式3

lock()方法是调用的入口,它去调用tryLock() 尝试获取锁和阻塞其他线程,tryLock()中去创建持久节点LOCK,之前介绍过,持久节点只能有一个,所以其他线程去创建的时候,会抛出ZkNodeExistsException 异常,tryLock()是非阻塞的,捕获异常我们返回false, 在 lock() 中调用waitForLock(); 去阻塞线程和对LOCK节点的监听,当锁释放了,继续调用 lock(); 再去竞争锁(递归)。

package com.dongnao.lock;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZookeeperLock implements Lock {

    private static final String ZK_IP_PROT = "localhost:2181";
//  private static final String ZK_IP_PROT = "13.206.6.232:2181";
    private static final String LOCK_NODE = "/LOCK";

    private ZkClient client = new ZkClient(ZK_IP_PROT);

    private static final Logger logger = LoggerFactory.getLogger(ZookeeperLock.class);

    private CountDownLatch cdl=null;


    @Override
    //阻塞的方式去获取锁
    public void lock() {
        if(tryLock()){
            logger.info("=============get lock success==============");
        }else{
            waitForLock();
            lock();
        }

    }


    @Override
    //通过新建节点的方式去尝试加锁  非阻塞
    public boolean tryLock() {
        try {
            client.createPersistent(LOCK_NODE);
            return true;
        } catch (ZkNodeExistsException e) {
            return false;
        }
    }


    @Override
    public void unlock() {
        client.delete(LOCK_NODE);
    }

    private void waitForLock() {
        //1.创建一个监听
        IZkDataListener listener = new IZkDataListener() {

            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                //3.当其他的线程释放锁,抛出事件,让其他线程重新竞争锁
                logger.info("=============catch data delete event==============");
                if(cdl!=null){
                    cdl.countDown();
                }
            }

            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {
             // TODO Auto-generated method stub
            }
        };
        client.subscribeDataChanges(LOCK_NODE, listener);
        //2.如果节点还存在,让线程阻塞
        if(client.exists(LOCK_NODE)){
            cdl = new CountDownLatch(1);
            try {
                cdl.await();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        client.unsubscribeDataChanges(LOCK_NODE, listener);

    }

    public static void main(String[] args) throws InterruptedException {

        final CountDownLatch cdl = new CountDownLatch(1);
        ZkClient client = new ZkClient(ZK_IP_PROT);
        client.subscribeDataChanges(LOCK_NODE, new IZkDataListener() {

            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                System.out.println("===============aaa===========");
                cdl.countDown();
            }

            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {   
            }
        });
        cdl.await();
    }


    //--------------------不需要写逻辑的方法--------------------

    @Override
    public Condition newCondition() {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        // TODO Auto-generated method stub

    }

    @Override
    public boolean tryLock(long time, TimeUnit unit)
            throws InterruptedException {
        // TODO Auto-generated method stub
        return false;
    }

}

测试方式3的弊端
方式3不建议投入生产,弊端有两个:(1)会出现死锁。(2)基于zookeeper内部机制,所有产生连接的客户端,当节点LOCK 删除之后,zookeeper回给所有的客户端发送 删除通知,这严重的影响了我们的性能。如果我们有100个客户端,当拿到锁的 线程去释放锁(删除该节点)之后,zookeeper会通过http 告诉99个客户端该节点删除了。

测试步骤1

用命令 去创建一个LOCK节点,然后就会死锁。因为对于这100个人来说,他们创建这个节点时发现已经存在了,它会抛异常,捕获异常之后他们都会阻塞,没有线程会去删除这个节点,此时100个人永久等待。

这里写图片描述

运行主函数
这里写图片描述

启动一堆线程之后,发现所有线程都是在阻塞

这里写图片描述

同理:当一个线程创建这个节点之后,服务器宕机了,网络延迟等导致这个LOCK 节点 没有合理性的释放,其他线程死锁。

步骤二,我们命令删除LOCK来测试第二个弊端

因为,我们在代码里 写了对 LOCK 节点的监听client.subscribeDataChanges(LOCK_NODE, listener);所以命令删除之后,100线程正常的去抢占锁资源,一切程序恢复正常。如图

这里写图片描述

这里写图片描述

上图我们会发现: 肉眼可见的所有线程在抢锁,很慢,而且每次释放锁(删除节点),会有 n - 1次通知,n 为当前最大线程个数。

方式二解决了以上两个弊端

(1)我们用临时节点,这样就不会死锁。
(2)我们每个线程只监听他的上一个节点(排序),这样通知就变为了1 。

这里写图片描述

代码

package com.dongnao.lock;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.SerializableSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZookeeperImproveLock implements Lock {

    private static final String LOCK_PATH = "/LOCK";

    private static final String ZOOKEEPER_IP_PORT = "localhost:2181";

    private ZkClient client = new ZkClient(ZOOKEEPER_IP_PORT, 1000, 1000, new SerializableSerializer());

    private static Logger logger = LoggerFactory.getLogger(ZookeeperImproveLock.class);

    private CountDownLatch cdl;

    private String beforePath;// 当前请求的节点
    private String currentPath;// 当前请求的节点前一个节点

    // 判断有没有LOCK目录,没有则创建
    public ZookeeperImproveLock() {
        if (!this.client.exists(LOCK_PATH)) {
            this.client.createPersistent(LOCK_PATH);
        }
    }

    public void lock() {
        if (!tryLock()) {
            waitForLock();
            lock();
        } else {
            logger.info(Thread.currentThread().getName() + " 获得分布式锁!");
        }

    }

    /**
     * 为当前节点添加 监听器
     */
    private void waitForLock() {
        IZkDataListener listener = new IZkDataListener() {

            // 删除的时候去监听
            public void handleDataDeleted(String dataPath) throws Exception {
                logger.info(Thread.currentThread().getName() + ":捕获到DataDelete事件!---------------------------");
                if (cdl != null) {
                    cdl.countDown();
                }
            }

            // 发生改变的时候去监听
            public void handleDataChange(String dataPath, Object data) throws Exception {

            }
        };
        // 给之前的节点增加数据删除的watcher
        this.client.subscribeDataChanges(beforePath, listener);

        if (this.client.exists(beforePath)) { // 如果这个节点存在
            cdl = new CountDownLatch(1);
            try {
                cdl.await(); // 线程就给他阻塞,让他等
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        this.client.unsubscribeDataChanges(beforePath, listener);

    }

    public boolean tryLock() {
        // 如果currentPath为空则为第一次尝试加锁,第一次加锁赋值currentPath
        if (currentPath == null || currentPath.length() <= 0) {
            // 创建一个临时顺序节点
            currentPath = this.client.createEphemeralSequential(LOCK_PATH + '/', "lock");
            System.out.println("---------------------------->" + currentPath);
        }
        // 获取所有临时节点并排序,临时节点名称为自增长的字符串如:0000000400
        List<String> childrens = this.client.getChildren(LOCK_PATH);
        Collections.sort(childrens);

        if (currentPath.equals(LOCK_PATH + '/' + childrens.get(0))) {// 如果当前节点在所有节点中排名第一则获取锁成功
            return true;
        } else {// 如果当前节点在所有节点中排名中不是排名第一,则获取前面的节点名称,并赋值给beforePath
            int wz = Collections.binarySearch(childrens, currentPath.substring(6));
            beforePath = LOCK_PATH + '/' + childrens.get(wz - 1);
        }
        return false;
    }

    public void unlock() {
        // 删除当前临时节点
        client.delete(currentPath);

    }

    // ===================用不到的实现方法=======================

    public void lockInterruptibly() throws InterruptedException {
        // TODO Auto-generated method stub

    }

    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        // TODO Auto-generated method stub
        return false;
    }

    public Condition newCondition() {
        // TODO Auto-generated method stub
        return null;
    }

}

运行效果:

LOKC下创建有序号的 值

这里写图片描述

这里写图片描述

上图发现秒级抢锁, 方式2的代码就不介绍了,主要就是:获取该节点下所有的 值,然后排序,取第一个,100个线程都有自己的编号,然后跟排序完第一个equals() 比较,肯定只有一个能批对上,其他99个去阻塞等~,再就是监听上一个节点来保证通知只会发生一次保证性能。

基于Zookeeper实现分布式锁 已经没有问题了。多说一嘴,我们上面是通过java的发令枪CountDownLatch 来进行阻塞,实现悲观锁,如果我们不阻塞可以实现乐观锁。

补充:

该异常是因为客户端和服务器部署的zookeeper版本不兼容导致,上面介绍过,我们的客户端支持3.4.8 一下版本的 zookeeper, 以上版本就会出现如下异常。

这里写图片描述

版权声明:本文为qq_16681279原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/qq_16681279/article/details/78061526