分布式事务锁


一、引言

由于传统的锁是基于Tomcat服务器内部的,搭建了集群之后,导致锁失效,使用分布式锁来处理。

在单机时代,虽然不需要分布式锁,但也面临过类似的问题,只不过在单机的情况下,如果有多个线程要同时访问某个共享资源的时候,我们可以采用线程间加锁的机制,即当某个线程获取到这个资源后,就立即对这个资源进行加锁,当使用完资源之后,再解锁,其它线程就可以接着使用了。例如,在JAVA中,甚至专门提供了一些处理锁机制的一些API(synchronize/Lock等)。

但是到了分布式系统的时代,这种线程之间的锁机制,就没作用了,系统可能会有多份并且部署在不同的机器上,这些资源已经不是在线程之间共享了,而是属于进程之间共享的资源。

因此,为了解决这个问题,我们就必须引入「分布式锁」。

分布式锁,是指在分布式的部署环境下,通过锁机制来让多客户端互斥的对共享资源进行访问。

分布式锁要满足哪些要求呢?

排他性:在同一时间只会有一个客户端能获取到锁,其它客户端无法同时获取

避免死锁:这把锁在一段有限的时间之后,一定会被释放(正常释放或异常释放)

高可用:获取或释放锁的机制必须高可用且性能佳

讲完了背景和理论,那我们接下来再看一下分布式锁的具体分类和实际运用。

分布式事务锁的三种方式

1、基于数据库实现

2、基于Redis实现

redis实现的原理是使用reids的setnx命令,当添加成功时认为拿到锁,逻辑业务执行完毕后删除key,认为是释放锁

# 设置值,如果key已存在不添加且返回0,不存在添加且返回1 
setnx key value

3、基于ZooKeeper实现

Zookeeper实现分布式锁的原理就是使用临时有序节点的方式,客户端在指定节点下创建临时有序节点时,如果说序号是最小的就获取了锁资源,如果说当前节点不是最小的,监听比自己小一号的节点,如果这个小一号节点被删除了,当前节点再次判断自己的是否为最小的节点,如果是就拿到锁资源

1、基于数据库实现

基于数据库的乐观锁

乐观锁机制其实就是在数据库表中引入一个版本号(version)字段来实现的。

当我们要从数据库中读取数据的时候,同时把这个version字段也读出来,如果要对读出来的数据进行更新后写回数据库,则需要将version加1,同时将新的数据与新的version更新到数据表中,且必须在更新的时候同时检查目前数据库里version值是不是之前的那个version,如果是,则正常更新。如果不是,则更新失败,说明在这个过程中有其它的进程去更新过数据了。

MyBatisPlus乐观锁实现

当要更新一条记录的时候,希望这条记录没有被别人更新
乐观锁实现方式:

  • 取出记录时,获取当前version
  • 更新时,带上这个version
  • 执行更新时, set version = newVersion where version = oldVersion
  • 如果version不对,就更新失败

使用方法

字段上加上@Version注解

@Version
private Integer version;

说明:

  • 支持的数据类型只有:int,Integer,long,Long,Date,Timestamp,LocalDateTime
  • 整数类型下 newVersion = oldVersion + 1
  • newVersion 会回写到 entity
  • 仅支持 updateById(id)update(entity, wrapper) 方法
  • update(entity, wrapper) 方法下, wrapper 不能复用!!!

2、基于redis实现

1、选用Redis实现分布式锁原因:

(1)Redis有很高的性能;
(2)Redis命令对此支持较好,实现起来比较方便

2、使用命令介绍:

(1)SETNX

SETNX key val:当且仅当key不存在时,set一个key为val的字符串,返回1;若key存在,则什么都不做,返回0。

(2)expire

expire key timeout:为key设置一个超时时间,单位为second,超过这个时间锁会自动释放,避免死锁。

(3)delete

delete key:删除key

在使用Redis实现分布式锁的时候,主要就会使用到这三个命令。

(1)编写获取锁和释放锁的工具类

/**
 * redis处理分布式锁的原理
 *  拿到锁:redis使用setnx命令,如果存储成功就拿到锁
 *  释放锁:删除对应的数据
 * @param
 * @return
 */
@Component
public class RedisLock {
    @Autowired
    private StringRedisTemplate template;

    /**
     * 获取资源的方法,其原理是redis的是使用redis的setnx命令,如果存储相同的值会返回一个标识
     * @param key 存储的key
     * @param val 存储的值
     * @param ex 超时时间,谨防死锁的出现
     * @return
     */
    public boolean getLock(String key,String val,Long ex){
       return template.opsForValue().setIfAbsent(key,val,ex, TimeUnit.SECONDS);
    }
    /**
     * 释放锁,其原理是删除redis中的数据,
     */
    public void unLock(String key){
        template.delete(key);
    }
}

(2)编写秒杀业务

/**
 * 测试秒杀的业务controller  使用的是zookeeper
 * @param
 * @return
 */
@RestController
public class SeckillControllerZookeeper {
    //模拟数据库中的商品数据
    private Map<String,Integer> stockMap = new HashMap<>();
    //模拟数据库中的订单数据
    private Map<String,Integer> orderMap = new HashMap<>();
    //初始化数据
    @PostConstruct
    public void init(){
        stockMap.put("手机",1000);//初始化手机数量数据库中的数据
        orderMap.put("手机",0);//初始化数据库中的数据
    }
    @Autowired
    private CuratorFramework cf;
    @Autowired
    RedisLock redisLock;
    //秒杀
    @RequestMapping("/seckillGoods")
    public String seckillGoods(String gname) throws Exception {
        //获取锁
        boolean lock = redisLock.getLock(gname, "1", 1L);
        if(lock){
            //1、根据商品名称获取库存
            Integer stock = stockMap.get(gname);
            //2、判断
            if(stock <= 0){
                return "商品已经被抢光了,请等待下一轮秒杀";
            }
            //3、库存-1
            stockMap.put(gname,stock-1);
            Thread.sleep(200);//模拟修改数据库,需要耗时
            //4、订单需要加1
            Integer orderCount = orderMap.get(gname);
            orderMap.put(gname,orderCount+1);
            Thread.sleep(200);//模拟修改数据库,需要耗时
            //释放锁
            redisLock.unLock(gname);
            //5、响应用户
            return "抢购【"+gname+"】成功,商品数量还剩【"+stockMap.get(gname)+"】,订单量为【"+orderMap.get(gname)+"】";
        }else {
            return "当前访问人数过多~";
        }
    }

}

3、基于ZooKeeper实现

ZooKeeper是一个为分布式应用提供一致性服务的开源组件,它内部是一个分层的文件系统目录树结构,规定同一个目录下只能有一个唯一文件名。基于ZooKeeper实现分布式锁的步骤如下:

(1)创建一个目录mylock;
(2)线程A想获取锁就在mylock目录下创建临时顺序节点;
(3)获取mylock目录下所有的子节点,然后获取比自己小的兄弟节点,如果不存在,则说明当前线程顺序号最小,获得锁;
(4)线程B获取所有节点,判断自己不是最小节点,设置监听比自己次小的节点;
(5)线程A处理完,删除自己的节点,线程B监听到变更事件,判断自己是不是最小的节点,如果是则获得锁。

(1)导入依赖

<!--zookeeper的高级API,内部已经包含了zookeeper依赖-->
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>2.12.0</version>
</dependency>

(2)将zookeeper客户端注入到spring容器中,当然也可以不注入,自己创建

@Configuration
public class ZookeeperClient {
    //将zookeeper的高级客户端注入到容器
    @Bean
    public CuratorFramework cf() throws Exception{
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000,2);
        CuratorFramework cf = CuratorFrameworkFactory.builder().
                connectString("192.168.40.100:2181")
                .retryPolicy(retryPolicy)
                .build();
        cf.start();
        return cf;
    }
}

(3)编写秒杀业务

/**
 * 测试秒杀的业务controller  使用的是zookeeper
 * @param
 * @return
 */
@RestController
public class SeckillControllerZookeeper {

    //模拟数据库中的商品数据
    private Map<String,Integer> stockMap = new HashMap<>();
    //模拟数据库中的订单数据
    private Map<String,Integer> orderMap = new HashMap<>();

    //初始化数据
    @PostConstruct
    public void init(){
        stockMap.put("手机",1000);//初始化手机数量数据库中的数据
        orderMap.put("手机",0);//初始化数据库中的数据
    }
    private CuratorFramework cf;
    @Autowired
    RedisLock redisLock;
    //秒杀
    @RequestMapping("/seckillGoods")
    public String seckillGoods(String gname) throws Exception {
        //获取锁
        boolean lock = redisLock.getLock(gname, "1", 1L);
        if(lock){
            //1、根据商品名称获取库存
            Integer stock = stockMap.get(gname);
            //2、判断
            if(stock <= 0){
                return "商品已经被抢光了,请等待下一轮秒杀";
            }
            //3、库存-1
            stockMap.put(gname,stock-1);
            Thread.sleep(200);//模拟修改数据库,需要耗时

            //4、订单需要加1
            Integer orderCount = orderMap.get(gname);
            orderMap.put(gname,orderCount+1);
            Thread.sleep(200);//模拟修改数据库,需要耗时
            //释放锁
            redisLock.unLock(gname);
            //5、响应用户
            return "抢购【"+gname+"】成功,商品数量还剩【"+stockMap.get(gname)+"】,订单量为【"+orderMap.get(gname)+"】";
        }else {
            return "当前访问人数过多~";
        }
    }
}

(4)使用高并发测压工具进行测试


  目录