Redis分布式事务锁


一、单机锁

/**
* 单机版锁,JVM 级别
* 缺点:仅仅使用于单机,当部署多个服务时人会出现并发问题
* @return
*/
@GetMapping("/sell1")
public String sell1(){

    synchronized (this){
        // 获取库存
        int stock = Integer.parseInt(redisTemplate.opsForValue().get("stock"));
        if (stock > 0){
            // 扣减库存
            int realStock = stock - 1;
            redisTemplate.opsForValue().set("stock",realStock+"");
            System.out.println("扣减成功,库存剩余:" + realStock);
        }else{
            System.out.println("扣减失败,库存不足!!!");
        }
    }
    return "结束";
}

当需要对同一个用户加锁时,比如秒杀业务的一人限购一个的场景,为了防止用户多次点击,或者有人用软件执行批量操作时

synchronized (userId.toString().intern()){
    ...
}

二、redis分布式锁

@Autowired
private StringRedisTemplate redisTemplate;

/**
* redis 实现分布式锁
* @return
*/
@GetMapping("/sell2")
public String sell2(){
    String uuid = UUID.randomUUID().toString();
    // 尝试获取锁
    // 需设置超时时间,防止当执行到加锁代码时,服务器宕机,锁无法释放,导致其他服务无法获取到锁
    // 用 uuid 作为值,在解锁时判断是否当前线程的锁,不用线程id的原因是多服务下,线程id可能重复
    Boolean b = redisTemplate.opsForValue().setIfAbsent("LOCK:PHONE", uuid,10, TimeUnit.SECONDS);
    // 没有获取到锁
    if (!b){
        return "抢购失败";
    }

    // 获取到锁的情况
    try{
        // 获取库存
        int stock = Integer.parseInt(redisTemplate.opsForValue().get("stock"));
        if (stock > 0){
            // 扣减库存
            int realStock = stock - 1;
            redisTemplate.opsForValue().set("stock",realStock+"");
            System.out.println("扣减成功,库存剩余:" + realStock);
        }else{
            System.out.println("扣减失败,库存不足!!!");
        }
    }finally {
        // 释放锁,在finally中,防止加锁代码出现异常导致锁无法释放

        /*
            * 问题:当因某些原因,线程一执行时间超过锁的过期时间时,锁会被删除,
            * 其他线程可与在此时获取到锁,此时线程一执行完毕删除锁,删除的锁实际上是其他线程加的锁,
            * 从此锁会一直处于失效状态。
            * 解决:redis锁的value使用uuid确保删除的锁是当前线程添加的。
            */
        if (uuid.equals(redisTemplate.opsForValue().get("LOCK:PHONE"))){
            /**
            * 在这里还会有问题,那就是获取锁的值和释放锁是两个动作,中间如果发生阻塞(jvm垃圾回收等),
            * 可能在获取锁之后,锁过期了,然后其他线程又加上了锁,此时释放的就是其他线程的锁
            * 解决方法见:三、lua脚本实现原子性
            */
            redisTemplate.delete("LOCK:PHONE");
        }
    }
    return "结束";
}

1、看门狗

三、Lua 脚本实现原子性

1、redis 调用函数

redis 提供的调用函数,语法如下:

# 执行 redis 命令
redis.call('命令名称','key','其他参数',...)

例如:执行 set name jack,则脚本如下

redis.call('set','name','jack')

函数的调用方式,语法如下:

EVAL script numkeys key [key...] arg [arg...]
EVAL "脚本内容" 脚本需要的键的个数 key [key...] arg [arg...]

例如:执行 set name jack,则脚本如下

# 无参
EVAL "return redis.call('set','name','jack')" 0
# 有参,数组索引下标从 1 开始,默认key存放到 KEYS 数组,其他参数存放到 ARGV 数组
EVAL "return redis.call('set',KEYS[1],ARGV[1])" 1 name jack

2、java 调用 lua 脚本

java 调用 lua 靠 RedisTemplateexcute() 方法

public <T> T execute(RedisScript<T> script, List<K> keys, Object... args) {}

script 就是脚本,keys 是key参数集合,args 其他参数。

// 泛型是返回值
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static {
    UNLOCK_SCRIPT = new DefaultRedisScript<>();
    // 从 classpath ,也就是 resource 中获取脚本
    UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
    // 设置返回值类型
    UNLOCK_SCRIPT.setResultType(Long.class);
}

@Override
public void unlock() {
    // 调用lua脚本
    stringRedisTemplate.execute(
        UNLOCK_SCRIPT,
        Collections.singletonList(KEY_PREFIX + name),
        ID_PREFIX + Thread.currentThread().getId());
}

unlock.lua

-- 比较线程标示与锁中的标示是否一致
if(redis.call('get', KEYS[1]) ==  ARGV[1]) then
    -- 释放锁 del key
    return redis.call('del', KEYS[1])
end
return 0

3、完整分布式锁

基本生产可用

(1)lua 脚本

/resource/unlock.lua

-- 比较线程标示与锁中的标示是否一致
if(redis.call('get', KEYS[1]) ==  ARGV[1]) then
    -- 释放锁 del key
    return redis.call('del', KEYS[1])
end
return 0

(2)工具类

public class SimpleRedisLock {

    private String name;
    private StringRedisTemplate stringRedisTemplate;

    public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
        this.name = name;
        this.stringRedisTemplate = stringRedisTemplate;
    }

    private static final String KEY_PREFIX = "lock:";
    private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";

    private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;

    // 在静态代码块中加载脚本
    static {
        UNLOCK_SCRIPT = new DefaultRedisScript<>();
        UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
        UNLOCK_SCRIPT.setResultType(Long.class);
    }

    public boolean tryLock(long timeoutSec) {
        // 获取线程标示
        String threadId = ID_PREFIX + Thread.currentThread().getId();
        // 获取锁
        Boolean success = stringRedisTemplate.opsForValue()
                .setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
        return Boolean.TRUE.equals(success);
    }

    public void unlock() {
        // 调用lua脚本
        stringRedisTemplate.execute(
                UNLOCK_SCRIPT,
                Collections.singletonList(KEY_PREFIX + name),
                ID_PREFIX + Thread.currentThread().getId());
    }
}

(3)业务代码

@Autowired
private StringRedisTemplate stringRedisTemplate;

private void createVoucherOrder(VoucherOrder voucherOrder) {
    Long userId = voucherOrder.getUserId();
    Long voucherId = voucherOrder.getVoucherId();
    // 创建锁对象
    SimpleRedisLock redisLock = new SimpleRedisLock("lock:order:" + userId,stringTemplate);
    // 尝试获取锁,并设置锁的过期时间
    boolean isLock = redisLock.tryLock("5");
    // 判断
    if (!isLock) {
        // 获取锁失败,直接返回失败或者重试
        log.error("不允许重复下单!");
        return;
    }

    try {
        // 5.1.查询订单
        int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
        // 5.2.判断是否存在
        if (count > 0) {
            // 用户已经购买过了
            log.error("不允许重复下单!");
            return;
        }

        // 6.扣减库存
        boolean success = seckillVoucherService.update()
            .setSql("stock = stock - 1") // set stock = stock - 1
            .eq("voucher_id", voucherId).gt("stock", 0) // where id = ? and stock > 0
            .update();
        if (!success) {
            // 扣减失败
            log.error("库存不足!");
            return;
        }

        // 7.创建订单
        save(voucherOrder);
    } finally {
        // 释放锁
        redisLock.unlock();
    }

4、仍然存在的问题

其实还是存在以下问题,但是以下问题基本发生的概率极低

  • 不可重入

同一线程无法多次获取同一把锁

  • 不可重试

获取锁只尝试一次就返回 false ,没有重试机制

  • 超时释放

锁超时释放虽然可以避免死锁,但如果是业务执行耗时较长,也会导致锁释放,存在安全隐患

  • 主从一致性

如果 Redis 提供了主从集群,主从同步存在延迟,当主 redis 出现宕机时,主节点中的锁还没同步到从节点中,此时其他线程向从节点获取锁时,是没有的,此时就出现多个线程获取到锁

四、redisson分布式锁

1、依赖

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.16.1</version>
</dependency>

2、配置

@Configuration
public class RedissonConfig {

    @Bean
    public RedissonClient redissonClient(){
        // 配置
        Config config = new Config();
        config.useSingleServer().setAddress("redis://192.168.150.101:6379").setPassword("123321");
        // 创建RedissonClient对象
        return Redisson.create(config);
    }
}

3、使用

@Autowired
private RedissonClient redissonClient;

@GetMapping("/test")
public String test(){
    RLock lock = redissonClient.getLock("lock:order");

    try{
        // 尝试获取锁
        // 参数1:获取锁最大等待时间,期间会重试
        // 参数2:锁的超时自动释放时间
        // 参数3:时间单位
        // 返回值:是否获取到锁
        boolean isLock = lock.tryLock(1, 5, TimeUnit.SECONDS);

        if (!isLock){
            return "没获取到锁";
        }

        ...
        return "下单成功";
    }catch (Exception e){
        e.printStackTrace();
    } finally {

        // 解锁
        lock.unlock();
    }
    return "结束";
}

3、使用

@Autowired
private Redisson redisson;

@Autowired
private StringRedisTemplate redisTemplate;

/**
* redisson 实现分布式锁
* @return
*/
@GetMapping("/sell3")
public String sell3(){

    // 1、获取锁,可重入锁
    RLock lock = redisson.getLock("LOCK:PHONE");

    try{
        // 2、加锁
        lock.lock();
        // 获取库存
        int stock = Integer.parseInt(redisTemplate.opsForValue().get("stock"));
        if (stock > 0){
            // 扣减库存
            int realStock = stock - 1;
            redisTemplate.opsForValue().set("stock",realStock+"");
            System.out.println("扣减成功,库存剩余:" + realStock);
        }else{
            System.out.println("扣减失败,库存不足!!!");
        }
    }finally {
        // 3、解锁
        lock.unlock();
    }
    return "结束";
}

  目录