一、引言
由于传统的锁是基于Tomcat服务器内部的,搭建了集群之后,导致锁失效,使用分布式锁来处理。
在单机时代,虽然不需要分布式锁,但也面临过类似的问题,只不过在单机的情况下,如果有多个线程要同时访问某个共享资源的时候,我们可以采用线程间加锁的机制,即当某个线程获取到这个资源后,就立即对这个资源进行加锁,当使用完资源之后,再解锁,其它线程就可以接着使用了。例如,在JAVA中,甚至专门提供了一些处理锁机制的一些API(synchronize/Lock等)。
但是到了分布式系统的时代,这种线程之间的锁机制,就没作用了,系统可能会有多份并且部署在不同的机器上,这些资源已经不是在线程之间共享了,而是属于进程之间共享的资源。
因此,为了解决这个问题,我们就必须引入「分布式锁」。
分布式锁,是指在分布式的部署环境下,通过锁机制来让多客户端互斥的对共享资源进行访问。
分布式锁要满足哪些要求呢?
排他性:在同一时间只会有一个客户端能获取到锁,其它客户端无法同时获取
避免死锁:这把锁在一段有限的时间之后,一定会被释放(正常释放或异常释放)
高可用:获取或释放锁的机制必须高可用且性能佳
讲完了背景和理论,那我们接下来再看一下分布式锁的具体分类和实际运用。
分布式事务锁的三种方式
1、基于数据库实现
2、基于Redis实现
redis实现的原理是使用reids的setnx命令,当添加成功时认为拿到锁,逻辑业务执行完毕后删除key,认为是释放锁
# 设置值,如果key已存在不添加且返回0,不存在添加且返回1
setnx key value
3、基于ZooKeeper实现
Zookeeper实现分布式锁的原理就是使用临时有序节点的方式,客户端在指定节点下创建临时有序节点时,如果说序号是最小的就获取了锁资源,如果说当前节点不是最小的,监听比自己小一号的节点,如果这个小一号节点被删除了,当前节点再次判断自己的是否为最小的节点,如果是就拿到锁资源
1、基于数据库实现
基于数据库的乐观锁
乐观锁机制其实就是在数据库表中引入一个版本号(version)字段来实现的。
当我们要从数据库中读取数据的时候,同时把这个version字段也读出来,如果要对读出来的数据进行更新后写回数据库,则需要将version加1,同时将新的数据与新的version更新到数据表中,且必须在更新的时候同时检查目前数据库里version值是不是之前的那个version,如果是,则正常更新。如果不是,则更新失败,说明在这个过程中有其它的进程去更新过数据了。
MyBatisPlus乐观锁实现
当要更新一条记录的时候,希望这条记录没有被别人更新
乐观锁实现方式:
- 取出记录时,获取当前version
- 更新时,带上这个version
- 执行更新时, set version = newVersion where version = oldVersion
- 如果version不对,就更新失败
使用方法
字段上加上@Version
注解
@Version
private Integer version;
说明:
- 支持的数据类型只有:int,Integer,long,Long,Date,Timestamp,LocalDateTime
- 整数类型下
newVersion = oldVersion + 1
newVersion
会回写到entity
中- 仅支持
updateById(id)
与update(entity, wrapper)
方法 - 在
update(entity, wrapper)
方法下,wrapper
不能复用!!!
2、基于redis实现
1、选用Redis实现分布式锁原因:
(1)Redis有很高的性能;
(2)Redis命令对此支持较好,实现起来比较方便
2、使用命令介绍:
(1)SETNX
SETNX key val:当且仅当key不存在时,set一个key为val的字符串,返回1;若key存在,则什么都不做,返回0。
(2)expire
expire key timeout:为key设置一个超时时间,单位为second,超过这个时间锁会自动释放,避免死锁。
(3)delete
delete key:删除key
在使用Redis实现分布式锁的时候,主要就会使用到这三个命令。
(1)编写获取锁和释放锁的工具类
/**
* redis处理分布式锁的原理
* 拿到锁:redis使用setnx命令,如果存储成功就拿到锁
* 释放锁:删除对应的数据
* @param
* @return
*/
@Component
public class RedisLock {
@Autowired
private StringRedisTemplate template;
/**
* 获取资源的方法,其原理是redis的是使用redis的setnx命令,如果存储相同的值会返回一个标识
* @param key 存储的key
* @param val 存储的值
* @param ex 超时时间,谨防死锁的出现
* @return
*/
public boolean getLock(String key,String val,Long ex){
return template.opsForValue().setIfAbsent(key,val,ex, TimeUnit.SECONDS);
}
/**
* 释放锁,其原理是删除redis中的数据,
*/
public void unLock(String key){
template.delete(key);
}
}
(2)编写秒杀业务
/**
* 测试秒杀的业务controller 使用的是zookeeper
* @param
* @return
*/
@RestController
public class SeckillControllerZookeeper {
//模拟数据库中的商品数据
private Map<String,Integer> stockMap = new HashMap<>();
//模拟数据库中的订单数据
private Map<String,Integer> orderMap = new HashMap<>();
//初始化数据
@PostConstruct
public void init(){
stockMap.put("手机",1000);//初始化手机数量数据库中的数据
orderMap.put("手机",0);//初始化数据库中的数据
}
@Autowired
private CuratorFramework cf;
@Autowired
RedisLock redisLock;
//秒杀
@RequestMapping("/seckillGoods")
public String seckillGoods(String gname) throws Exception {
//获取锁
boolean lock = redisLock.getLock(gname, "1", 1L);
if(lock){
//1、根据商品名称获取库存
Integer stock = stockMap.get(gname);
//2、判断
if(stock <= 0){
return "商品已经被抢光了,请等待下一轮秒杀";
}
//3、库存-1
stockMap.put(gname,stock-1);
Thread.sleep(200);//模拟修改数据库,需要耗时
//4、订单需要加1
Integer orderCount = orderMap.get(gname);
orderMap.put(gname,orderCount+1);
Thread.sleep(200);//模拟修改数据库,需要耗时
//释放锁
redisLock.unLock(gname);
//5、响应用户
return "抢购【"+gname+"】成功,商品数量还剩【"+stockMap.get(gname)+"】,订单量为【"+orderMap.get(gname)+"】";
}else {
return "当前访问人数过多~";
}
}
}
3、基于ZooKeeper实现
ZooKeeper是一个为分布式应用提供一致性服务的开源组件,它内部是一个分层的文件系统目录树结构,规定同一个目录下只能有一个唯一文件名。基于ZooKeeper实现分布式锁的步骤如下:
(1)创建一个目录mylock;
(2)线程A想获取锁就在mylock目录下创建临时顺序节点;
(3)获取mylock目录下所有的子节点,然后获取比自己小的兄弟节点,如果不存在,则说明当前线程顺序号最小,获得锁;
(4)线程B获取所有节点,判断自己不是最小节点,设置监听比自己次小的节点;
(5)线程A处理完,删除自己的节点,线程B监听到变更事件,判断自己是不是最小的节点,如果是则获得锁。
(1)导入依赖
<!--zookeeper的高级API,内部已经包含了zookeeper依赖-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>
(2)将zookeeper客户端注入到spring容器中,当然也可以不注入,自己创建
@Configuration
public class ZookeeperClient {
//将zookeeper的高级客户端注入到容器
@Bean
public CuratorFramework cf() throws Exception{
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000,2);
CuratorFramework cf = CuratorFrameworkFactory.builder().
connectString("192.168.40.100:2181")
.retryPolicy(retryPolicy)
.build();
cf.start();
return cf;
}
}
(3)编写秒杀业务
/**
* 测试秒杀的业务controller 使用的是zookeeper
* @param
* @return
*/
@RestController
public class SeckillControllerZookeeper {
//模拟数据库中的商品数据
private Map<String,Integer> stockMap = new HashMap<>();
//模拟数据库中的订单数据
private Map<String,Integer> orderMap = new HashMap<>();
//初始化数据
@PostConstruct
public void init(){
stockMap.put("手机",1000);//初始化手机数量数据库中的数据
orderMap.put("手机",0);//初始化数据库中的数据
}
private CuratorFramework cf;
@Autowired
RedisLock redisLock;
//秒杀
@RequestMapping("/seckillGoods")
public String seckillGoods(String gname) throws Exception {
//获取锁
boolean lock = redisLock.getLock(gname, "1", 1L);
if(lock){
//1、根据商品名称获取库存
Integer stock = stockMap.get(gname);
//2、判断
if(stock <= 0){
return "商品已经被抢光了,请等待下一轮秒杀";
}
//3、库存-1
stockMap.put(gname,stock-1);
Thread.sleep(200);//模拟修改数据库,需要耗时
//4、订单需要加1
Integer orderCount = orderMap.get(gname);
orderMap.put(gname,orderCount+1);
Thread.sleep(200);//模拟修改数据库,需要耗时
//释放锁
redisLock.unLock(gname);
//5、响应用户
return "抢购【"+gname+"】成功,商品数量还剩【"+stockMap.get(gname)+"】,订单量为【"+orderMap.get(gname)+"】";
}else {
return "当前访问人数过多~";
}
}
}