1. 秒杀基本场景
1.1 全局唯一性ID
业务背景
当全局的数据量太大,比如说达到了亿级的数据量,这时候考虑到查询效率以及存储的安全性,就要做分表了,如果使用Mysql提供的自增id,分表的话,各自的表都会产生各自的自增id,比如说table1和table2的自增id都是从1开始的,那么在这种情况下,就会产生id的二义性问题
全局性ID生成器:是一种在分布式系统下用来生成全局唯一ID
的工具,一般需要满足:唯一性,高可用,高性能(生成ID速度快),递增性(单调的特性,替代数据的自增ID,利于索引生成),确保安全性(不能被用户猜测到具体信息,不能太明显)
这个需求可以使用redis
来完成,这是因为redis是独立于数据库存在的
安全性如何实现:为了增加ID的安全性,可以不直接使用redis
自增的数值,而是拼接一些其他信息
private static final long BEGIN_TIMESTAMP = 1672531200L;
/**
* 序列号的位数
*/
private static final int SEQ_BITS = 32;
@Resource
private StringRedisTemplate stringRedisTemplate;
/**
* 不同的业务生成不的ID
* @param keyPrefix
* @return
*/
public long nextId(String keyPrefix){
//1.生成时间戳
long timeStamp = LocalDateTime.now().toEpochSecond(ZoneOffset.UTC) - BEGIN_TIMESTAMP;
//2.生成序列号
//获取当前日期,精确到天
String date = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
long seq = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);
//3.拼接并且返回
return timeStamp << SEQ_BITS | seq;
}
测试代码
@Resource
private ShopServiceImpl shopService;
@Resource
private RedisIDWorker redisIDWorker;
private ExecutorService executorService = Executors.newFixedThreadPool(500);
@Test
public void testIDWorker() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(300);
long beginSecond = LocalDateTime.now().toEpochSecond(ZoneOffset.UTC);
System.out.println(LocalDateTime.now());
Runnable task = ()->{
for(int i = 0;i<100;i++){
long id = redisIDWorker.nextId("order");
System.out.println("id:"+Long.toBinaryString(id));
}
latch.countDown();
};
for (int i = 0; i < 300; i++) {
executorService.submit(task);
}
latch.await();
System.out.println("执行时间:"+(LocalDateTime.now().toEpochSecond(ZoneOffset.UTC)-beginSecond)+"秒");
}
- 全局唯一ID生成策略
UUID(生成缓慢,不利于使用索引)
Redis自增算法
snowflake算法,注意,在分布式系统中,是非常难以确保每个节点具有相同的时钟的,而这个算法使用了时间戳,具有一定的隐患
数据库自增,就是专门开个表来做数据库的唯一性ID
在企业开发中,通常使用一种在内存中提前缓存若干个ID的方法,使用该方法能够在获取ID的时候减免性能均摊,将机器性能集中到业务执行上
1.2 实现优惠券秒杀下单
@Override
@Transactional
public Result seckill(Long voucherId) {
//1.查询优惠
SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId);
//2.判断秒杀是否开始
LocalDateTime now = LocalDateTime.now();
LocalDateTime beginTime = seckillVoucher.getBeginTime();
if(beginTime.isAfter(now)){
return Result.fail("秒杀未开始!");
}
LocalDateTime endTime = seckillVoucher.getEndTime();
//3.判断秒杀是否结束
if(endTime.isBefore(now)){
return Result.fail("秒杀已结束!");
}
//TODO:这里在并发的情况下将会产生超卖的现象
//4.判断库存是否充足
if(seckillVoucher.getStock()<1){
return Result.fail("库存不足!");
}
//5.创建订单
boolean success = seckillVoucherService.update()
.setSql("stock = stock-1")
.eq("voucher_id", voucherId).update();
if(!success){
return Result.fail("库存不足!");
}
//6.创建并且返回订单的id
VoucherOrder voucherOrder = new VoucherOrder();
//6.1订单id
long orderID = redisIDWorker.nextId("order");
voucherOrder.setId(orderID);
//6.2用户id
UserDTO user = UserHolder.getUser();
voucherOrder.setUserId(user.getId());
//6.3代金券id
voucherOrder.setVoucherId(voucherId);
//6.4写入数据库
save(voucherOrder);
return Result.ok(orderID);
}
1.3 超卖问题
上面我们所实现的代码是有缺陷的,在高并发的环境下将会产生超卖问题!
超卖问题产生的原因以及分析
这实际上还是一个数据库数据和缓存不一致的问题,只是此时缓存的不一样了,我们存储这个库存的信息是JVM中的生成对象,这也是一种缓存
首先,在这种情况下,假设到了超卖的临界点,数据库中存储的库存数据是1
,然后高并发下,线程1查询数据,然后在JVM中生成对象,这个对象中的库存记录为1,然而此时产生线程的切换,线程2也去查询数据库,此时依然在会在JVM中生成对象,然后线程各自执行库存的更新操作,最终导致超卖了
那么问题出在哪呢?在于线程1没有把获取库存,更新库存这个操作设置为原子性,如果是原子性,其他线程就无法同时获取这个库存了。
解决方案:加锁
悲观锁和乐观锁实际上并不是一种真正的锁,而是一种锁的设计理念
悲观锁
认为线程安全一定会发生,线程切换时时刻刻都在进行,临界资源一定不安全,因此在操作数据之前必须要获取锁,如果没有获取锁则阻塞等待,确保线程执行序列可串行化
Synchronized
、Lock
都属于悲观锁- 悲观锁采用的是线程串行合法模型,因此性能较差
- 悲观锁的缺点是,它锁的范围太大了,尽管有些代码是不涉及到临界资源的,它也去对这个资源上锁,这是一种非常浪费的行为
乐观锁
认为线程安全不一定会发生,因此不加锁,只是在更新数据的时候去判断有没有其他线程做了修改
- 如果没有修改则认为是安全的,自己才更新数据
- 如果已经被其他线程修改说明发生了线程安全问题,可以重试或者抛出异常
- 可能出现成功率低的问题,概率出现,与线程并发情况有关,无法控制
注意,简单的乐观锁的实现有
CAS(compare and set)
,这种原始的方法将会导致ABA
问题,而解决ABA
问题的关键是加上一个操作版本号,确保在值相同的情况下,这期间只有一个线程在操作
注意,能当做版本号的字段的特征是,只能递增或者递减,在这样的情况下就可以当做版本号使用,否则可能产生ABA问题导致生产事故
首先这是一个基本的CAS的sql写法
boolean success = seckillVoucherService.update()
.setSql("stock = stock-1")
.eq("voucher_id", voucherId)
.eq("stock",seckillVoucher.getStock())
.update();
然后还有执行异常或者重试,重试的话可以交给用户或者程序,由程序手工帮助用户进行重试
如果抛出异常的话,肯定能够保证安全问题,但是会导致货物卖不出去,用户体验比较差
我们希望的是,用户的请求先到达的,就先抢到券
//5.创建订单
boolean success = seckillVoucherService.update()
.setSql("stock = stock-1")
.eq("voucher_id", voucherId)
.gt("stock",0)//修改为gt
.update();
注意,执行sql的这个过程,可以认为是原子性的,这是因为mysql
中有封锁协议和排它锁,通过这两个机制能够保护事务的并发执行安全
1.4 一人一单
synchronized (user.getId().toString().intern()){//获取字符串常量
int count = this.query()
.eq("user_id", user.getId())
.eq("voucher_id", voucherId)
.count();
if(count > 0 ){
return Result.fail("只能秒杀一张券!");
}
}
问题1:这里不用同步锁可以吗?
首先分析一下并发问题是如何产生的,假设线程123都进来了,因为数据库的I/O数据比较长,因此线程1执行完这个query
,发生线程切换,线程2执行完query
也发生线程切换,线程3执行完query
也发生线程切换,而这些查询结果是不是都是count=0
?如果是这样的话,那么后续的代码都会执行,就会产生一个后果,如果一个用户恶意开辟多个线程来请求接口,那么还会产生一个人下多单的这种效果
我们知道乐观锁解决的问题是更新下的并发安全问题,如果没有更新,那么就没有CAS
这种操作,而我们在这做的是纯查询操作,无法使用CAS
的操作
因此只能上同步锁了
问题2:如何确定锁的粒度?
这个问题需要我们明确需要锁的是谁?我们想锁的是同一个用户,防止一个用户产生多个线程并发
,而不是说所有的用户来,谁来锁谁
那么在JVM
中有没有这样的一个唯一对象可以表示用户
呢?我们知道在JVM
的常量区中有字符串常量,这个对象一旦第一次被使用,就会被缓存下来,并且内存对象地址不会改变,那么在这样的情况下,我们可以使用UserId.toString.intern()
的这个方法,来每次获取用户id所对应的在JVM
中的唯一对象,就能够将这个对象锁住
问题3:这里操作的事务与同步锁是否会产生问题?
我们使用的事务是声明式事务
,而这个声明式事务
是由Spring管理的,它的原理是通过CGLIB或者JDK动态代理,生成动态代理类,添加前置通知和后置通知,在这些通知里面完成事务的提交,这是基本的原理。
然而,如果我们像下面这样编写代码:
@Transactional
public Result seckillImpl(Long voucherId){
//一人一单的判断,也要加锁
UserDTO user = UserHolder.getUser();
synchronized (user.getId().toString().intern()){//获取字符串常量
...略
}
}
那么我们梳理一下最终动态代理会怎样执行这个方法?
底层源码大概长这样
public class TimeInvocationHandler implements InvocationHandler {
//invoke方法执行过程中,使用method来调用目标对象的目标方法
private Object target;//目标对象
public TimeInvocationHandler(Object target){
this.target = target;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//这个地方就是为了编写增强的代码的
System.out.println("开启事务");
//dosomething
//调用目标对象上的目标方法
//方法四要素:对象,方法,参数,返回值
Object retVal = method.invoke(target, args);
System.out.println("提交事务");
return null;
}
}
那么我们就知道了,synchronized
锁的是method.invoke()
这个内部里面的方法,然而,开启事务和提交事务的这个过程会锁进去吗?不会!这样也会产生并发问题!
举例:比如说线程1来了,然后执行完method.invoke()
,还没有提交事务,这时候被线程2抢占,它也去执行method.invoke()
,然而你这时候事务还没有提交呀,最终两个事务一起提交,也会导致一个人可以下多个单
修改思路:既然直接锁这个方法内部的代码不行,那么我就直接锁这个方法,修改如下:
UserDTO user = UserHolder.getUser();
synchronized (user.getId().toString().intern()){
return this.seckillImpl(voucherId,user);
}
这就完了吗?
还是事务问题!我们使用的是声明式事务,我们必须时刻记得声明式事务的底层实现,它底层的实现是通过生成一个代理对象proxy
,通过这个proxy对象来执行invoke()
方法才能执行对应的方法
但是我们这里是怎么写的
return this.seckillImpl(voucherId,user);
我们直接将执行方法的对象写死了!理论上实现动态代理,是通过调用接口的方法与获取具体实现类,两者配合实现的,然而我们这里是直接使用了实现类,因此就会导致动态代理失效,没有执行事务的相关操作,事务失效了
解决办法:在Spring中暴露这个代理对象,让代理对象执行方法就好了
步骤1:添加暴露代理对象的方法
@EnableAspectJAutoProxy(exposeProxy = true)
public class HmDianPingApplication {
public static void main(String[] args) {
SpringApplication.run(HmDianPingApplication.class, args);
}
}
步骤2:让代理对象执行方法
UserDTO user = UserHolder.getUser();
synchronized (user.getId().toString().intern()){
IVoucherOrderService iVoucherOrderService = (IVoucherOrderService) AopContext.currentProxy();
return iVoucherOrderService.seckillImpl(voucherId,user);
}
至此,一人一单的功能就实现了(单机)
一人一单的集群模式
我们知道,在集群下,有多台的tomcat
服务器接收请求并且响应请求,那么请求就会被均摊到不同的节点
那么我们的锁是锁同一个JVM
中的对象的,如果按照上面的思路,就会出问题了,这是因为不同的tomcat是不同的虚拟机,这时候锁的对象自然而然的就发生了变化了
首先这是单机的情况
首先在单机环境下,JVM只有一个,锁监视器只有一个,当线程1执行到synchronized
的时候,这时候JVM中的userId所对应的锁监视器就会记录下线程1,然后当线程2执行到同步代码的时候,锁监视器中已经有登记的线程,因此线程2阻塞,这样的话就可以实现一人一单
集群环境如上图所示
2. 分布式锁详解
2.1 分布式锁的基本原理
通过1.4的分析,我们发现在集群环境下的锁的控制,主要是因为锁监视器有两个,它们将会监视不同的线程,从而导致不同的线程获取锁
,那么解决思路就是产生一个锁监视器
,让这个集群内的机器都去访问这个锁监视器即可,让这些JVM
进程都能够访问这个锁
线程-锁监视器访问模型如下:
分布式锁:
满足分布式系统或者集群模式下多进程可见并且互斥的锁
- 多进程可见,多个JVM都能访问同一资源
- 实现互斥:当有进程获取了锁,其他进程获取锁失败
- 高可用:大多数时候获取锁的都能够正常使用
- 高性能:获取锁的速度要快
- 安全性:要考虑获取锁的时候,产生异常而不发生死锁
- 可重入性、公平锁等额外特性
2.2 分布式锁的三种实现方式
目前来说,可以使用MySQL、Redis、Zookeeper实现这个分布式锁
MySQL | Redis | Zookeeper | |
---|---|---|---|
互斥 | 使用事务机制,或者使用内置的锁(排他锁),在业务执行先申请锁 | 利用setnx这样的互斥命令,利用setnx首次设置成功其余失败的特性 | 利用节点的唯一性和有序性实现互斥 |
高可用 | 主从模式,可以达到较高的可用性 | 主从模式,集群模式,哨兵模式,高可用 | 支持集群,可用性很好 |
高性能 | 性能一般 | Redis本身由于C语言编写,单线程,内存运行等机制,性能比较好 | 维护强一致性,做数据同步需要消耗时间 |
安全性 | 事务机制可以确保锁被自动释放,安全性很好 | 利用redis的过期机制,设置过期时间,防止死锁 | 节点是临时节点,像有一个过期时间一样,将会自动删除 |
2.3 分布式锁的实现基本思路
获取锁
- 互斥:确保只能有一个线程获取到锁
释放锁
- 手动释放
- 利用缓存时间过期机制自动删除锁
------这一段指令应该要保持原子性
setnx lock thread1
expire lock 10
------
del lock
假设一种场景,如果当设置了锁,但是当这个锁的过期时间还没有设置的时候,服务器就宕机了,这时候就会产生非常严重的问题,因此应该要这样写来保证原子性
set lock thread1 EX 10 NX
问题:如果线程获取锁失败了,我们要给它们执行什么样的逻辑呢?
- 获取锁失败的线程阻塞等待,等到释放锁为止,可以是自旋等待,将会导致忙等,或者是将线程直接设置为阻塞,但是会浪费线程资源,阻塞式
- 获取锁失败,立即结束该task,执行其他task,非阻塞式
public interface ILock {
/**
* 尝试获取锁
* @param timeOutSec:锁持有的超时时间,过期后自动释放
* @return true:代表获取锁成功,false:代表获取锁失败
*/
boolean tryLock(long timeOutSec);
/**
* 释放锁
*/
void unlock();
}
private String name;
private static final String KEY_PREFIX="lock:";
public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}
private StringRedisTemplate stringRedisTemplate;
@Override
public boolean tryLock(long timeOutSec) {
Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + name, "thread" + Thread.currentThread().getId(), timeOutSec, TimeUnit.SECONDS);
return BooleanUtil.isTrue(success);
}
@Override
public void unlock() {
stringRedisTemplate.delete(KEY_PREFIX + name);
}
UserDTO user = UserHolder.getUser();
ILock lock = new SimpleRedisLock("order:"+user.getId(),stringRedisTemplate);
boolean isLock = lock.tryLock(5);
try{
if(!isLock){
return Result.fail("一个人只允许下一单!");
}
IVoucherOrderService iVoucherOrderService = (IVoucherOrderService) AopContext.currentProxy();
return iVoucherOrderService.seckillImpl(voucherId,user);
}finally {
lock.unlock();
}
注意:这里有一个误删的问题
这是一种比较极端的情况,描述如下:
首先线程1先来先服务,于是获取锁成功,然后给对应的资源上锁,然后进入了业务执行,此时需要阻塞,但是由于此时服务缓慢,阻塞时间较长,这个阻塞时间甚至比设定的超时时间还要长,于是在业务还没有执行完成之前,这个锁就被自动释放掉了
此时由于处于高并发的环境,线程2在线程1没有执行完毕业务之前,先创建了锁,而后在线程2没有执行完业务之前,线程1执行完了业务,然后它就线程2上的锁给删掉了
然后这时候线程3来了,发现资源没有上锁,于是执行之前线程2和线程1的那一套操作
最终,资源乱套,程序发生严重错误
产生的原因?
这是因为业务阻塞,导致锁提前释放
锁唯一,无法区分锁是谁上的,导致线程之间乱删锁
解决方案
- 在获取锁的时候存入线程的标识,比如说使用UUID
- 在释放锁的时候先获取锁中的标识,判断与当前线程标识是否一致
- 如果一致则释放锁
- 如果不一致则不释放锁
@Override
public boolean tryLock(long timeOutSec) {
String threadId = ID_PREFIX+Thread.currentThread().getId();
Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + name, threadId, timeOutSec, TimeUnit.SECONDS);
return BooleanUtil.isTrue(success);
}
@Override
public void unlock() {
//获取线程标识
String threadId = ID_PREFIX+Thread.currentThread().getId();
String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
if(threadId.equals(id)){
stringRedisTemplate.delete(KEY_PREFIX + name);
}
return;
}
2.4 分布式锁的原子性问题
问题的分析
首先我们明确一点
if(threadId.equals(id)){
stringRedisTemplate.delete(KEY_PREFIX + name);
}
我们看这段代码,线程在执行这段代码的时候,判断id
和threadId
是否相等和删除key
的这两步不是原子性的,因此有可能产生线程抢占的问题
假设现在这个场景,线程1获取锁,然后执行一段业务,执行完毕之后准备释放锁,然后这时候向redis发请求验证锁,验证锁的正确性之后,准备释放锁
然而这时候,JVM触发了full gc
,我们知道full gc
是会产生STW(Stop The World)
的副作用的,因此此时所有的线程陷入阻塞,这时候锁过期,第二个JVM
的线程2到来,它获取到锁,执行业务,执行到一半
线程1所对应的JVM的GC完成,恢复执行,这时候由于之前已经判断过了锁是自己的,因此二话不说直接删除,这时候就产生问题了,线程1删除了线程2的锁
那么我们怎么解决呢?必须确保判断锁的操作和释放锁的操作的原子性
2.5 Java调用Lua脚本解决命令原子性问题
Redis提供了Lua脚本功能,在一个脚本中编写多条的redis
命令,确保多条命令执行时的原子性,Lua
是一种编程语言,用Lua脚本写出的业务逻辑大致如下
-- 得到锁的key
local key = KEYS[1]
-- 当前线程标识
local threadId = ARGV[1]
-- 获取锁中的线程标识
local id = redis.call('get',key)
-- 比较线程标识与锁中的表示
if(id == threadId) then
-- 释放锁
return redis.call('del',key)
end
return 0
加载Lua脚本
//提前定义锁的脚本
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static {
//加载脚本
UNLOCK_SCRIPT = new DefaultRedisScript<>();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
UNLOCK_SCRIPT.setResultType(Long.class);
}
改造unlock代码
@Override
public void unlock() {
//调用Lua脚本保证原子性
stringRedisTemplate.execute(
UNLOCK_SCRIPT,
Collections.singletonList(KEY_PREFIX+name),
ID_PREFIX+Thread.currentThread().getId());
}
面试概括
请你讲讲基于Redis要怎么实现分布式锁?
利用
setnx
指令获取锁,并且设置过期时间,保存线程与JVM的标识,为了原子性执行,通常使用set nx
释放锁的时候先判断线程与JVM形成的组合标识是否一致,一致则删除锁
- 利用setnx满足互斥
- 利用sexnx保证故障的时候锁依然能够释放,避免死锁,提高安全性
- 利用Redis集群保证高可用和高并发的特性
2.6 Redission概述
在上面,我们基于setnx
实现一个基本上生产可用的分布式锁,但依然有个问题没有解决,就是锁的续期问题
这个问题描述的是当业务流程还没有执行完毕的时候,锁就过期了的问题,如果这个问题无法解决,依然会有隐患存在
下面具体来分析一下上面的锁实现的问题
- 锁无法重入:同一个线程无法多次获取同一把锁
- 不可重试锁:获取锁一次就返回false,没有设定重试的机制
- 超时释放:锁超时释放虽然可以避免死锁,但是如果遇到业务执行阻塞等问题,此时业务的执行是正常的,但是却会引起锁的提前释放。
- 主从一致性问题:如果Redis提供了主从集群,主从同步存在延迟,当主节点宕机的时候,这时候将会执行选举算法,然而此时从节点并没有复制到主节点中关于锁的数据,这时候就会导致问题的产生
Redisson
是一个基于Redis
实现的Java
驻内存数据网络In-Meomery Data Grid
。提供了一系列的分布式的Java
常用对象,还提供了许多的分布式服务,其中就包含了各种分布式锁的实现
2.7 Redisson快速入门
<!--引入依赖-->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.6</version>
</dependency>
配置RedisClient
//redisson的工厂类,可以拿到各种各样的工具,下面是这个bean的初始化逻辑
@Bean
public RedissonClient redissonClient(){
Config config = new Config();
config.useSingleServer().setAddress("redis://localhost:6379");
return Redisson.create(config);
}
RLock lock = redissonClient.getLock("order:" + user.getId());
boolean isLock = lock.tryLock(10,2, TimeUnit.SECONDS);
2.8 Redisson可重入的原理
重入
:一个线程里,多次去获取同一把锁,那么在我们所实现的锁中,是无法实现这个可重入的
参考JDK中的可重入锁的实现
可重入锁的实现机制非常简单,就是实现一个计数器,这个计数器在锁在被获取的时候+1,锁在被释放的时候-1,当这个计数器=0的时候,表示这个锁可以被新的线程获取
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();//state变量是用来指代,这个锁被重入了多少次
if (c == 0) {//当重入了0次的时候,这时候准备获取锁
if (compareAndSetState(0, acquires)) {//CAS
setExclusiveOwnerThread(current);
return true;
}
}
//如果已经获取了锁,线程还是当前线程,表明发生了锁的重入
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {//从Sync继承过来的方法
int c = getState() - releases;//states--
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
那么我们实现这个可重入锁的机制就是这样的:
- 第一步:判断锁有没有被占用
- 如果锁被占用了,那么就判断锁监视的线程是否是自己,如果是自己,计数器+1,如果不是自己,取消操作
- 如果锁没有被占用,那么就登记当前线程,设置锁的有效期,并且计数器+1
- 第二步:释放锁
- 如果释放的锁是属于自己的,那么就让计数器-1,同时,重置有效期,如果计数器减到了0,那么就删除这个锁
- 如果释放的锁不是属于自己的,那么就直接取消操作
local key = KEYS[1]
local threadId = ARGV[1]
local releaseTime = ARGV[2]
-- 判断锁是否存在
if(redis.call('exist',key) == 0)then
-- 不存在,获取锁
redis.call('hset',key,threadId,'1')
-- 设置有效期
redis.call('expire',key,releaseTime)
return 1
end;
-- 锁存在了,判断是不是自己
if(redis.call('hexists',key,threadId) == 1)then
-- 存在,获取锁,重入次数+1
redis.call('hincrby',key,threadId,'1')
-- 设置有效期
redis.call('expire',key,releaseTime)
return 1
end;
return 0;
2.9 Redisson的锁重试机制和看门狗机制
锁重试机制与看门狗机制是Redisson
理解中的一个重点和难点,面试如果能答上来会很加分,下面我将对这个流程进行梳理,尽量清晰
第一个问题:为什么要有锁的自动重试机制?
这是因为通常情况下,锁的占用通常不会太长,如果每一次业务的执行都需要用户重新发起请求,而每一次的请求都需要一部分的网络IO,一定程度上,这种情况会更加加剧服务器资源的消耗
第二个问题:为什么要实现看门狗机制?
首先要理解看门狗(Watch Dog)
机制是什么东西,看门狗提供了一种自动续期的机制,它出现的目的是为了尽量让业务流程执行完毕后才释放锁,而不是锁超时自动释放锁,看门狗则是基于定时任务机制,实现了不断给锁延期的机制。
首先我们从整体上来理解一下它的流程,这个图是Redisson
中关于锁重试机制的具体实现流程,其中的核心是发布订阅模式+定时任务(看门狗)+信号量机制
如何启动看门狗机制?
通过阅读源码,我们发现当releaseTime=1
的时候就会启动看门狗机制,以下情况可以触发:
- 不传入
releaseTime
,其默认值就是-1 - 主动传入
releaseTime=-1
看门狗机制会引发死锁吗?
我们说看门狗机制是为线程自动延期,那么看门狗就是一种为线程的执行无限延长时间的机制,然而,我们设置过期时间就是为了防止线程无法正常释放锁,而看门狗这种机制就可能使得线程一直持有锁,这会导致死锁吗?
不会
,首先,当线程持有锁并且看门狗在不断监控的情况下,此时的JVM执行线程的情况都是正常的,那么只要正常执行,那么业务逻辑一定会执行完并且释放锁,不会导致死锁,会出现死锁的情况是当JVM
宕机掉了,这时候线程就被kill掉了,无法释放锁,然而这时候进行思考,JVM
都挂掉了,你看门狗又是基于定时任务所开启的线程进行监控的,那么看门狗是不是也挂掉了呢?如果看门狗挂掉了的话,那么就不会再延期了,redis将会为我们释放掉锁
看门狗机制原理
看门狗的实现原理可以总结为发布订阅模式+信号量+定时任务
//可以等待的时间
long time = unit.toMillis(waitTime);
//当前时间,其作用是标志当前代码的执行耗时
long current = System.currentTimeMillis();
//threadId:当前执行的线程标识
long threadId = Thread.currentThread().getId();
//ttl:就是锁的过期时间
//如果获取到的ttl就null,就是获取到了锁
//否则的话就会获得关于锁的过期时间的一个number
Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// lock acquired
...
接下来看tryAcquire
看代码其实一头雾水,从整体上理解就是返回一个异步任务的结果,这个异步任务就是获取一个锁的获取情况,这里到后面再理解代码的含义
然后我们顺着tryLock
继续往下面看
if (ttl == null) {
return true;
}
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
current = System.currentTimeMillis();
//这个订阅方法,是订阅在Lua脚本中别人释放锁的信号,而释放锁的信号是在Lua脚本中完成的
//"redis.call('del', KEYS[1]); "
//"redis.call('publish', KEYS[2], ARGV[1]); "
RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
//await方法解析:这个方法是等待time的时间,如果这个futrue在time的时间内返回了,那么就为true
//否则就会false
//这里的话就等不到锁了,于是直接不等了,超时
if (!subscribeFuture.cancel(false)) {
subscribeFuture.onComplete((res, e) -> {
if (e == null) {
unsubscribe(subscribeFuture, threadId);//在这里取消订阅关系
}
});
}
acquireFailed(waitTime, unit, threadId);
return false;//获取锁失败
}
//以下是有可能获取到锁的流程
try {
//执行上述代码需要耗时,于是先检验
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
//正式开始锁的重试
while (true) {
long currentTime = System.currentTimeMillis();
ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
//开始之重试之前先检查时间
//尝试获取锁
// lock acquired
if (ttl == null) {//获取到了
return true;
}
//检查
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
// waiting for message
// 流程分支解读
/*信号量机制:*/
currentTime = System.currentTimeMillis();
if (ttl >= 0 && ttl < time) {
//ttl是锁的过期时间,time是我要等的时间
//如果锁的过期时间在我要等的时间之内,那么就会尝试去获取这个future的中的信号量
//可以看到这个信号量并没有得到返回值,那么是什么个执行机制呢,理论对程序逻辑没有任何影响
subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
}
} finally {
unsubscribe(subscribeFuture, threadId);
}
首先await()
会使得线程进入阻塞状态,一直等待发布方返回一个锁空闲的消息
这是为了解决资源浪费的问题,如果在time(最大等待时间内)
都等不到有锁空闲,那么就直接返回并且取消订阅
如果在time
内有时机是锁空闲的,那么就会尝试获取锁,那么是如何尝试获取锁的呢?
if (ttl >= 0 && ttl < time) {
//ttl是锁的过期时间,time是我要等的时间
//如果锁的过期时间在我要等的时间之内,那么就会尝试去获取这个future的中的信号量
//可以看到这个信号量并没有得到返回值,那么是什么个执行机制呢,理论对程序逻辑没有任何影响
subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
首先,它利用了CountDownLatch
中的tryAcquire()
方法实现线程的等待阻塞,其含义是在time
的时间内,如果获取到(或者没有)许可,那么将会解除阻塞,并且继续执行,简单来说就是一个计时器的功能,这个计时器能够使得线程在合理的时间内被唤醒并且尝试获取锁。
if (leaseTime != -1) {
return tryLockInnerAsync(waitTime, leaseTime, unit,
threadId, RedisCommands.EVAL_NULL_BOOLEAN);
}
RFuture<Boolean> ttlRemainingFuture = tryLockInnerAsync(waitTime,
commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),//走默认时间30*1000
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {//如果没有异常,那么就直接返回
return;
}
// lock acquired
if (ttlRemaining) {//获取锁成功了,解决有效期的问题,重置锁的有效期
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
自动续期的功能
this.id = commandExecutor.getConnectionManager().getId();
this.entryName = id + ":" + name;
ExpirationEntry entry = new ExpirationEntry();
//锁的名称->entry
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
//只有当第一次来的时候才放入对象
//如果是重入的话,就不执行
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
renewExpiration();//添加看门狗监控线程
}
public synchronized void addThreadId(long threadId) {
Integer counter = threadIds.get(threadId);//使用thread变量,使用可重入
if (counter == null) {
counter = 1;
} else {
counter++;
}
threadIds.put(threadId, counter);
}
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
//延时任务
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getName() + " expiration", e);
return;
}
if (res) {
// reschedule itself
renewExpiration();//递归执行,注意,这里有栈溢出的隐患
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);//内部锁释放时间10s
ee.setTimeout(task);
return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
//刷新有效期
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +//锁是不是这个线程拿的
"redis.call('pexpire', KEYS[1], ARGV[1]); " +//重置锁的有效期
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getName()),
internalLockLeaseTime, getLockName(threadId));
public RFuture<Void> unlockAsync(long threadId) {
RPromise<Void> result = new RedissonPromise<Void>();
RFuture<Boolean> future = unlockInnerAsync(threadId);
future.onComplete((opStatus, e) -> {
cancelExpirationRenewal(threadId);//取消更新任务
if (e != null) {
result.tryFailure(e);
return;
}
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
result.tryFailure(cause);
return;
}
result.trySuccess(null);
});
return result;
}
2.10 Redisson是怎么解决主从一致性问题的?
Redis对于主从一致性问题的解决思路非常简单,如果主从复制会产生数据不一致的情况,那么我就让这些节点不再有这些角色的分配,而是让这些节点具有一样的权重
这也就意味着,当设置锁的时候,这些锁在每个redis节点都存在,当解锁的时候,只有当每个节点返回正确的解锁信息才能真正的解开这个锁
实战
- 步骤1:在Java应用配置集群
@Bean
public RedissonClient redissonClient(){
Config config = new Config();
config.useSingleServer().setAddress("redis://localhost:6379");
return Redisson.create(config);
}
@Bean
public RedissonClient redissonClient2(){
Config config = new Config();
config.useSingleServer().setAddress("redis://localhost:6380");
return Redisson.create(config);
}
@Bean
public RedissonClient redissonClient3(){
Config config = new Config();
config.useSingleServer().setAddress("redis://localhost:6381");
return Redisson.create(config);
}
- 步骤二:创建联锁
@Resource
private RedissonClient redissonClient;
@Resource
private RedissonClient redissonClient2;
@Resource
private RedissonClient redissonClient3;
private RLock lock;
@BeforeEach
void setUp(){
RLock lock1 = redissonClient.getLock("order");
RLock lock2 = redissonClient2.getLock("order");
RLock lock3 = redissonClient3.getLock("order");
//创建联锁,被创建为集合
lock = redissonClient.getMultiLock(lock1,lock2,lock3);
//然后正常使用即可
}
联锁的使用方法与普通的分布式锁的使用方法是一致的