深入学习Redis-线程安全与分布式锁


1. 秒杀基本场景

1.1 全局唯一性ID

业务背景

当全局的数据量太大,比如说达到了亿级的数据量,这时候考虑到查询效率以及存储的安全性,就要做分表了,如果使用Mysql提供的自增id,分表的话,各自的表都会产生各自的自增id,比如说table1和table2的自增id都是从1开始的,那么在这种情况下,就会产生id的二义性问题

全局性ID生成器:是一种在分布式系统下用来生成全局唯一ID的工具,一般需要满足:唯一性,高可用,高性能(生成ID速度快),递增性(单调的特性,替代数据的自增ID,利于索引生成),确保安全性(不能被用户猜测到具体信息,不能太明显)

这个需求可以使用redis来完成,这是因为redis是独立于数据库存在的
安全性如何实现:为了增加ID的安全性,可以不直接使用redis自增的数值,而是拼接一些其他信息

private static final long BEGIN_TIMESTAMP = 1672531200L;
/**
 * 序列号的位数
 */
private static final int SEQ_BITS = 32;
@Resource
private StringRedisTemplate stringRedisTemplate;

/**
 * 不同的业务生成不的ID
 * @param keyPrefix
 * @return
 */
public long nextId(String keyPrefix){
    //1.生成时间戳
    long timeStamp = LocalDateTime.now().toEpochSecond(ZoneOffset.UTC) - BEGIN_TIMESTAMP;
    //2.生成序列号
    //获取当前日期,精确到天
    String date = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
    long seq = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);
    //3.拼接并且返回
    return timeStamp << SEQ_BITS | seq;
}

测试代码

@Resource
private ShopServiceImpl shopService;
@Resource
private RedisIDWorker redisIDWorker;

private ExecutorService executorService = Executors.newFixedThreadPool(500);

@Test
public void testIDWorker() throws InterruptedException {
    CountDownLatch latch = new CountDownLatch(300);
    long beginSecond = LocalDateTime.now().toEpochSecond(ZoneOffset.UTC);
    System.out.println(LocalDateTime.now());
    Runnable task = ()->{
        for(int i = 0;i<100;i++){
            long id = redisIDWorker.nextId("order");
            System.out.println("id:"+Long.toBinaryString(id));
        }
        latch.countDown();
    };
    for (int i = 0; i < 300; i++) {
        executorService.submit(task);
    }
    latch.await();
    System.out.println("执行时间:"+(LocalDateTime.now().toEpochSecond(ZoneOffset.UTC)-beginSecond)+"秒");
}
  • 全局唯一ID生成策略

UUID(生成缓慢,不利于使用索引)

Redis自增算法

snowflake算法,注意,在分布式系统中,是非常难以确保每个节点具有相同的时钟的,而这个算法使用了时间戳,具有一定的隐患

数据库自增,就是专门开个表来做数据库的唯一性ID

在企业开发中,通常使用一种在内存中提前缓存若干个ID的方法,使用该方法能够在获取ID的时候减免性能均摊,将机器性能集中到业务执行上

1.2 实现优惠券秒杀下单

   @Override
@Transactional
   public Result seckill(Long voucherId) {
       //1.查询优惠
       SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId);
       //2.判断秒杀是否开始
       LocalDateTime now = LocalDateTime.now();
       LocalDateTime beginTime = seckillVoucher.getBeginTime();
       if(beginTime.isAfter(now)){
           return Result.fail("秒杀未开始!");
       }
       LocalDateTime endTime = seckillVoucher.getEndTime();
       //3.判断秒杀是否结束
       if(endTime.isBefore(now)){
           return Result.fail("秒杀已结束!");
       }
       //TODO:这里在并发的情况下将会产生超卖的现象
       //4.判断库存是否充足
       if(seckillVoucher.getStock()<1){
           return Result.fail("库存不足!");
       }
       //5.创建订单
       boolean success = seckillVoucherService.update()
               .setSql("stock = stock-1")
               .eq("voucher_id", voucherId).update();
       if(!success){
           return Result.fail("库存不足!");
       }
       //6.创建并且返回订单的id
       VoucherOrder voucherOrder = new VoucherOrder();
       //6.1订单id
       long orderID = redisIDWorker.nextId("order");
       voucherOrder.setId(orderID);
       //6.2用户id
       UserDTO user = UserHolder.getUser();
       voucherOrder.setUserId(user.getId());
       //6.3代金券id
       voucherOrder.setVoucherId(voucherId);
       //6.4写入数据库
       save(voucherOrder);
       return Result.ok(orderID);
   }

1.3 超卖问题

上面我们所实现的代码是有缺陷的,在高并发的环境下将会产生超卖问题!

超卖问题产生的原因以及分析

这实际上还是一个数据库数据和缓存不一致的问题,只是此时缓存的不一样了,我们存储这个库存的信息是JVM中的生成对象,这也是一种缓存

首先,在这种情况下,假设到了超卖的临界点,数据库中存储的库存数据是1,然后高并发下,线程1查询数据,然后在JVM中生成对象,这个对象中的库存记录为1,然而此时产生线程的切换,线程2也去查询数据库,此时依然在会在JVM中生成对象,然后线程各自执行库存的更新操作,最终导致超卖了

那么问题出在哪呢?在于线程1没有把获取库存,更新库存这个操作设置为原子性,如果是原子性,其他线程就无法同时获取这个库存了。

解决方案:加锁

悲观锁和乐观锁实际上并不是一种真正的锁,而是一种锁的设计理念

悲观锁

认为线程安全一定会发生,线程切换时时刻刻都在进行,临界资源一定不安全,因此在操作数据之前必须要获取锁,如果没有获取锁则阻塞等待,确保线程执行序列可串行化

  • SynchronizedLock都属于悲观锁
  • 悲观锁采用的是线程串行合法模型,因此性能较差
  • 悲观锁的缺点是,它锁的范围太大了,尽管有些代码是不涉及到临界资源的,它也去对这个资源上锁,这是一种非常浪费的行为

乐观锁

认为线程安全不一定会发生,因此不加锁,只是在更新数据的时候去判断有没有其他线程做了修改

  • 如果没有修改则认为是安全的,自己才更新数据
  • 如果已经被其他线程修改说明发生了线程安全问题,可以重试或者抛出异常
  • 可能出现成功率低的问题,概率出现,与线程并发情况有关,无法控制

注意,简单的乐观锁的实现有CAS(compare and set),这种原始的方法将会导致ABA问题,而解决ABA问题的关键是加上一个操作版本号,确保在值相同的情况下,这期间只有一个线程在操作

注意,能当做版本号的字段的特征是,只能递增或者递减,在这样的情况下就可以当做版本号使用,否则可能产生ABA问题导致生产事故

首先这是一个基本的CAS的sql写法

boolean success = seckillVoucherService.update()
        .setSql("stock = stock-1")
        .eq("voucher_id", voucherId)
        .eq("stock",seckillVoucher.getStock())
        .update();

然后还有执行异常或者重试,重试的话可以交给用户或者程序,由程序手工帮助用户进行重试

如果抛出异常的话,肯定能够保证安全问题,但是会导致货物卖不出去,用户体验比较差

我们希望的是,用户的请求先到达的,就先抢到券

//5.创建订单
boolean success = seckillVoucherService.update()
        .setSql("stock = stock-1")
        .eq("voucher_id", voucherId)
        .gt("stock",0)//修改为gt
        .update();

注意,执行sql的这个过程,可以认为是原子性的,这是因为mysql中有封锁协议和排它锁,通过这两个机制能够保护事务的并发执行安全

1.4 一人一单

synchronized (user.getId().toString().intern()){//获取字符串常量
    int count = this.query()
            .eq("user_id", user.getId())
            .eq("voucher_id", voucherId)
            .count();
    if(count > 0 ){
        return Result.fail("只能秒杀一张券!");
    }
}

问题1:这里不用同步锁可以吗?

首先分析一下并发问题是如何产生的,假设线程123都进来了,因为数据库的I/O数据比较长,因此线程1执行完这个query,发生线程切换,线程2执行完query也发生线程切换,线程3执行完query也发生线程切换,而这些查询结果是不是都是count=0?如果是这样的话,那么后续的代码都会执行,就会产生一个后果,如果一个用户恶意开辟多个线程来请求接口,那么还会产生一个人下多单的这种效果

我们知道乐观锁解决的问题是更新下的并发安全问题,如果没有更新,那么就没有CAS这种操作,而我们在这做的是纯查询操作,无法使用CAS的操作

因此只能上同步锁了

问题2:如何确定锁的粒度?

这个问题需要我们明确需要锁的是谁?我们想锁的是同一个用户,防止一个用户产生多个线程并发,而不是说所有的用户来,谁来锁谁

那么在JVM中有没有这样的一个唯一对象可以表示用户呢?我们知道在JVM的常量区中有字符串常量,这个对象一旦第一次被使用,就会被缓存下来,并且内存对象地址不会改变,那么在这样的情况下,我们可以使用UserId.toString.intern()的这个方法,来每次获取用户id所对应的在JVM中的唯一对象,就能够将这个对象锁住

问题3:这里操作的事务与同步锁是否会产生问题?

我们使用的事务是声明式事务,而这个声明式事务是由Spring管理的,它的原理是通过CGLIB或者JDK动态代理,生成动态代理类,添加前置通知和后置通知,在这些通知里面完成事务的提交,这是基本的原理。

然而,如果我们像下面这样编写代码:

@Transactional
   public Result seckillImpl(Long voucherId){
       //一人一单的判断,也要加锁
       UserDTO user = UserHolder.getUser();
       synchronized (user.getId().toString().intern()){//获取字符串常量
         ...}
   }

那么我们梳理一下最终动态代理会怎样执行这个方法?

底层源码大概长这样

public class TimeInvocationHandler implements InvocationHandler {
    //invoke方法执行过程中,使用method来调用目标对象的目标方法
    private Object target;//目标对象
    public TimeInvocationHandler(Object target){
        this.target = target;
    }
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        //这个地方就是为了编写增强的代码的
        System.out.println("开启事务");
        //dosomething
        //调用目标对象上的目标方法
        //方法四要素:对象,方法,参数,返回值
        Object retVal = method.invoke(target, args);
        System.out.println("提交事务");
        return null;
    }
}

那么我们就知道了,synchronized锁的是method.invoke()这个内部里面的方法,然而,开启事务和提交事务的这个过程会锁进去吗?不会!这样也会产生并发问题!

举例:比如说线程1来了,然后执行完method.invoke(),还没有提交事务,这时候被线程2抢占,它也去执行method.invoke(),然而你这时候事务还没有提交呀,最终两个事务一起提交,也会导致一个人可以下多个单

修改思路:既然直接锁这个方法内部的代码不行,那么我就直接锁这个方法,修改如下:

UserDTO user = UserHolder.getUser();
synchronized (user.getId().toString().intern()){
    return this.seckillImpl(voucherId,user);
}

这就完了吗?

还是事务问题!我们使用的是声明式事务,我们必须时刻记得声明式事务的底层实现,它底层的实现是通过生成一个代理对象proxy,通过这个proxy对象来执行invoke()方法才能执行对应的方法

但是我们这里是怎么写的

return this.seckillImpl(voucherId,user);

我们直接将执行方法的对象写死了!理论上实现动态代理,是通过调用接口的方法与获取具体实现类,两者配合实现的,然而我们这里是直接使用了实现类,因此就会导致动态代理失效,没有执行事务的相关操作,事务失效了

解决办法:在Spring中暴露这个代理对象,让代理对象执行方法就好了

步骤1:添加暴露代理对象的方法

@EnableAspectJAutoProxy(exposeProxy = true)
public class HmDianPingApplication {
    public static void main(String[] args) {
        SpringApplication.run(HmDianPingApplication.class, args);
    }
}

步骤2:让代理对象执行方法

UserDTO user = UserHolder.getUser();
synchronized (user.getId().toString().intern()){
    IVoucherOrderService iVoucherOrderService = (IVoucherOrderService) AopContext.currentProxy();
    return iVoucherOrderService.seckillImpl(voucherId,user);
}

至此,一人一单的功能就实现了(单机)

一人一单的集群模式

我们知道,在集群下,有多台的tomcat服务器接收请求并且响应请求,那么请求就会被均摊到不同的节点

那么我们的锁是锁同一个JVM中的对象的,如果按照上面的思路,就会出问题了,这是因为不同的tomcat是不同的虚拟机,这时候锁的对象自然而然的就发生了变化了

首先这是单机的情况

首先在单机环境下,JVM只有一个,锁监视器只有一个,当线程1执行到synchronized的时候,这时候JVM中的userId所对应的锁监视器就会记录下线程1,然后当线程2执行到同步代码的时候,锁监视器中已经有登记的线程,因此线程2阻塞,这样的话就可以实现一人一单

集群环境如上图所示

2. 分布式锁详解

2.1 分布式锁的基本原理

通过1.4的分析,我们发现在集群环境下的锁的控制,主要是因为锁监视器有两个,它们将会监视不同的线程,从而导致不同的线程获取锁,那么解决思路就是产生一个锁监视器,让这个集群内的机器都去访问这个锁监视器即可,让这些JVM进程都能够访问这个锁

线程-锁监视器访问模型如下:

分布式锁:满足分布式系统或者集群模式下多进程可见并且互斥的锁

  • 多进程可见,多个JVM都能访问同一资源
  • 实现互斥:当有进程获取了锁,其他进程获取锁失败
  • 高可用:大多数时候获取锁的都能够正常使用
  • 高性能:获取锁的速度要快
  • 安全性:要考虑获取锁的时候,产生异常而不发生死锁
  • 可重入性、公平锁等额外特性

2.2 分布式锁的三种实现方式

目前来说,可以使用MySQL、Redis、Zookeeper实现这个分布式锁

MySQL Redis Zookeeper
互斥 使用事务机制,或者使用内置的锁(排他锁),在业务执行先申请锁 利用setnx这样的互斥命令,利用setnx首次设置成功其余失败的特性 利用节点的唯一性和有序性实现互斥
高可用 主从模式,可以达到较高的可用性 主从模式,集群模式,哨兵模式,高可用 支持集群,可用性很好
高性能 性能一般 Redis本身由于C语言编写,单线程,内存运行等机制,性能比较好 维护强一致性,做数据同步需要消耗时间
安全性 事务机制可以确保锁被自动释放,安全性很好 利用redis的过期机制,设置过期时间,防止死锁 节点是临时节点,像有一个过期时间一样,将会自动删除

2.3 分布式锁的实现基本思路

  • 获取锁

    • 互斥:确保只能有一个线程获取到锁
  • 释放锁

    • 手动释放
    • 利用缓存时间过期机制自动删除锁
------这一段指令应该要保持原子性
setnx lock thread1
expire lock 10
------
del lock

假设一种场景,如果当设置了锁,但是当这个锁的过期时间还没有设置的时候,服务器就宕机了,这时候就会产生非常严重的问题,因此应该要这样写来保证原子性

set lock thread1 EX 10 NX

问题:如果线程获取锁失败了,我们要给它们执行什么样的逻辑呢?

  • 获取锁失败的线程阻塞等待,等到释放锁为止,可以是自旋等待,将会导致忙等,或者是将线程直接设置为阻塞,但是会浪费线程资源,阻塞式
  • 获取锁失败,立即结束该task,执行其他task,非阻塞式
public interface ILock {
    /**
     * 尝试获取锁
     * @param timeOutSec:锁持有的超时时间,过期后自动释放
     * @return true:代表获取锁成功,false:代表获取锁失败
     */
    boolean tryLock(long timeOutSec);

    /**
     * 释放锁
     */
    void unlock();
}
private String name;
private static final String KEY_PREFIX="lock:";

public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
    this.name = name;
    this.stringRedisTemplate = stringRedisTemplate;
}

private StringRedisTemplate stringRedisTemplate;
@Override
public boolean tryLock(long timeOutSec) {
    Boolean success = stringRedisTemplate.opsForValue()
            .setIfAbsent(KEY_PREFIX + name, "thread" + Thread.currentThread().getId(), timeOutSec, TimeUnit.SECONDS);
    return BooleanUtil.isTrue(success);
}

@Override
public void unlock() {
    stringRedisTemplate.delete(KEY_PREFIX + name);
}
UserDTO user = UserHolder.getUser();
ILock lock = new SimpleRedisLock("order:"+user.getId(),stringRedisTemplate);
boolean isLock = lock.tryLock(5);
try{
    if(!isLock){
        return Result.fail("一个人只允许下一单!");
    }
    IVoucherOrderService iVoucherOrderService = (IVoucherOrderService) AopContext.currentProxy();
    return iVoucherOrderService.seckillImpl(voucherId,user);
}finally {
    lock.unlock();
}

注意:这里有一个误删的问题

这是一种比较极端的情况,描述如下:

首先线程1先来先服务,于是获取锁成功,然后给对应的资源上锁,然后进入了业务执行,此时需要阻塞,但是由于此时服务缓慢,阻塞时间较长,这个阻塞时间甚至比设定的超时时间还要长,于是在业务还没有执行完成之前,这个锁就被自动释放掉了

此时由于处于高并发的环境,线程2在线程1没有执行完毕业务之前,先创建了锁,而后在线程2没有执行完业务之前,线程1执行完了业务,然后它就线程2上的锁给删掉了

然后这时候线程3来了,发现资源没有上锁,于是执行之前线程2和线程1的那一套操作

最终,资源乱套,程序发生严重错误

产生的原因?

这是因为业务阻塞,导致锁提前释放

锁唯一,无法区分锁是谁上的,导致线程之间乱删锁

解决方案

  • 在获取锁的时候存入线程的标识,比如说使用UUID
  • 在释放锁的时候先获取锁中的标识,判断与当前线程标识是否一致
    • 如果一致则释放锁
    • 如果不一致则不释放锁
@Override
public boolean tryLock(long timeOutSec) {
    String threadId = ID_PREFIX+Thread.currentThread().getId();
    Boolean success = stringRedisTemplate.opsForValue()
            .setIfAbsent(KEY_PREFIX + name, threadId, timeOutSec, TimeUnit.SECONDS);
    return BooleanUtil.isTrue(success);
}

@Override
public void unlock() {
    //获取线程标识
    String threadId = ID_PREFIX+Thread.currentThread().getId();
    String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
    if(threadId.equals(id)){
        stringRedisTemplate.delete(KEY_PREFIX + name);
    }
    return;
}

2.4 分布式锁的原子性问题

问题的分析

首先我们明确一点

if(threadId.equals(id)){
    stringRedisTemplate.delete(KEY_PREFIX + name);
}

我们看这段代码,线程在执行这段代码的时候,判断idthreadId是否相等和删除key的这两步不是原子性的,因此有可能产生线程抢占的问题

假设现在这个场景,线程1获取锁,然后执行一段业务,执行完毕之后准备释放锁,然后这时候向redis发请求验证锁,验证锁的正确性之后,准备释放锁

然而这时候,JVM触发了full gc,我们知道full gc是会产生STW(Stop The World)的副作用的,因此此时所有的线程陷入阻塞,这时候锁过期,第二个JVM的线程2到来,它获取到锁,执行业务,执行到一半

线程1所对应的JVM的GC完成,恢复执行,这时候由于之前已经判断过了锁是自己的,因此二话不说直接删除,这时候就产生问题了,线程1删除了线程2的锁

那么我们怎么解决呢?必须确保判断锁的操作和释放锁的操作的原子性

2.5 Java调用Lua脚本解决命令原子性问题

Redis提供了Lua脚本功能,在一个脚本中编写多条的redis命令,确保多条命令执行时的原子性,Lua是一种编程语言,用Lua脚本写出的业务逻辑大致如下

-- 得到锁的key
local key = KEYS[1]
-- 当前线程标识
local threadId = ARGV[1]
-- 获取锁中的线程标识
local id = redis.call('get',key)
-- 比较线程标识与锁中的表示
if(id == threadId) then
    -- 释放锁
    return redis.call('del',key)
end
return 0

加载Lua脚本

//提前定义锁的脚本
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static {
    //加载脚本
    UNLOCK_SCRIPT = new DefaultRedisScript<>();
    UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
    UNLOCK_SCRIPT.setResultType(Long.class);
}

改造unlock代码

@Override
public void unlock() {
    //调用Lua脚本保证原子性
    stringRedisTemplate.execute(
            UNLOCK_SCRIPT,
            Collections.singletonList(KEY_PREFIX+name),
            ID_PREFIX+Thread.currentThread().getId());
}

面试概括

请你讲讲基于Redis要怎么实现分布式锁?

利用setnx指令获取锁,并且设置过期时间,保存线程与JVM的标识,为了原子性执行,通常使用set nx

释放锁的时候先判断线程与JVM形成的组合标识是否一致,一致则删除锁

  • 利用setnx满足互斥
  • 利用sexnx保证故障的时候锁依然能够释放,避免死锁,提高安全性
  • 利用Redis集群保证高可用和高并发的特性

2.6 Redission概述

在上面,我们基于setnx实现一个基本上生产可用的分布式锁,但依然有个问题没有解决,就是锁的续期问题

这个问题描述的是当业务流程还没有执行完毕的时候,锁就过期了的问题,如果这个问题无法解决,依然会有隐患存在

下面具体来分析一下上面的锁实现的问题

  • 锁无法重入:同一个线程无法多次获取同一把锁
  • 不可重试锁:获取锁一次就返回false,没有设定重试的机制
  • 超时释放:锁超时释放虽然可以避免死锁,但是如果遇到业务执行阻塞等问题,此时业务的执行是正常的,但是却会引起锁的提前释放。
  • 主从一致性问题:如果Redis提供了主从集群,主从同步存在延迟,当主节点宕机的时候,这时候将会执行选举算法,然而此时从节点并没有复制到主节点中关于锁的数据,这时候就会导致问题的产生

Redisson是一个基于Redis实现的Java驻内存数据网络In-Meomery Data Grid。提供了一系列的分布式的Java常用对象,还提供了许多的分布式服务,其中就包含了各种分布式锁的实现

2.7 Redisson快速入门

<!--引入依赖-->
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.13.6</version>
</dependency>

配置RedisClient

//redisson的工厂类,可以拿到各种各样的工具,下面是这个bean的初始化逻辑
@Bean
public RedissonClient redissonClient(){
    Config config = new Config();
    config.useSingleServer().setAddress("redis://localhost:6379");
    return Redisson.create(config);
}
RLock lock = redissonClient.getLock("order:" + user.getId());
boolean isLock = lock.tryLock(10,2, TimeUnit.SECONDS);

2.8 Redisson可重入的原理

重入:一个线程里,多次去获取同一把锁,那么在我们所实现的锁中,是无法实现这个可重入的

参考JDK中的可重入锁的实现

可重入锁的实现机制非常简单,就是实现一个计数器,这个计数器在锁在被获取的时候+1,锁在被释放的时候-1,当这个计数器=0的时候,表示这个锁可以被新的线程获取

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();//state变量是用来指代,这个锁被重入了多少次
    if (c == 0) {//当重入了0次的时候,这时候准备获取锁
        if (compareAndSetState(0, acquires)) {//CAS
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    //如果已经获取了锁,线程还是当前线程,表明发生了锁的重入
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
protected final boolean tryRelease(int releases) {//从Sync继承过来的方法
    int c = getState() - releases;//states--
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

那么我们实现这个可重入锁的机制就是这样的:

  • 第一步:判断锁有没有被占用
  • 如果锁被占用了,那么就判断锁监视的线程是否是自己,如果是自己,计数器+1,如果不是自己,取消操作
  • 如果锁没有被占用,那么就登记当前线程,设置锁的有效期,并且计数器+1
  • 第二步:释放锁
  • 如果释放的锁是属于自己的,那么就让计数器-1,同时,重置有效期,如果计数器减到了0,那么就删除这个锁
  • 如果释放的锁不是属于自己的,那么就直接取消操作

local key = KEYS[1]
local threadId = ARGV[1]
local releaseTime = ARGV[2]
-- 判断锁是否存在
if(redis.call('exist',key) == 0)then
    -- 不存在,获取锁
    redis.call('hset',key,threadId,'1')
    -- 设置有效期
    redis.call('expire',key,releaseTime)
    return 1
end;
-- 锁存在了,判断是不是自己
if(redis.call('hexists',key,threadId) == 1)then
    -- 存在,获取锁,重入次数+1
    redis.call('hincrby',key,threadId,'1')
    -- 设置有效期
    redis.call('expire',key,releaseTime)
    return 1
end;
return 0;

2.9 Redisson的锁重试机制和看门狗机制

锁重试机制与看门狗机制是Redisson理解中的一个重点和难点,面试如果能答上来会很加分,下面我将对这个流程进行梳理,尽量清晰

第一个问题:为什么要有锁的自动重试机制?

这是因为通常情况下,锁的占用通常不会太长,如果每一次业务的执行都需要用户重新发起请求,而每一次的请求都需要一部分的网络IO,一定程度上,这种情况会更加加剧服务器资源的消耗

第二个问题:为什么要实现看门狗机制?

首先要理解看门狗(Watch Dog)机制是什么东西,看门狗提供了一种自动续期的机制,它出现的目的是为了尽量让业务流程执行完毕后才释放锁,而不是锁超时自动释放锁,看门狗则是基于定时任务机制,实现了不断给锁延期的机制。

首先我们从整体上来理解一下它的流程,这个图是Redisson中关于锁重试机制的具体实现流程,其中的核心是发布订阅模式+定时任务(看门狗)+信号量机制

如何启动看门狗机制?

通过阅读源码,我们发现当releaseTime=1的时候就会启动看门狗机制,以下情况可以触发:

  • 不传入releaseTime,其默认值就是-1
  • 主动传入releaseTime=-1

看门狗机制会引发死锁吗?

我们说看门狗机制是为线程自动延期,那么看门狗就是一种为线程的执行无限延长时间的机制,然而,我们设置过期时间就是为了防止线程无法正常释放锁,而看门狗这种机制就可能使得线程一直持有锁,这会导致死锁吗?

不会,首先,当线程持有锁并且看门狗在不断监控的情况下,此时的JVM执行线程的情况都是正常的,那么只要正常执行,那么业务逻辑一定会执行完并且释放锁,不会导致死锁,会出现死锁的情况是当JVM宕机掉了,这时候线程就被kill掉了,无法释放锁,然而这时候进行思考,JVM都挂掉了,你看门狗又是基于定时任务所开启的线程进行监控的,那么看门狗是不是也挂掉了呢?如果看门狗挂掉了的话,那么就不会再延期了,redis将会为我们释放掉锁

看门狗机制原理

看门狗的实现原理可以总结为发布订阅模式+信号量+定时任务

//可以等待的时间
long time = unit.toMillis(waitTime);
//当前时间,其作用是标志当前代码的执行耗时
long current = System.currentTimeMillis();
//threadId:当前执行的线程标识
long threadId = Thread.currentThread().getId();
//ttl:就是锁的过期时间
//如果获取到的ttl就null,就是获取到了锁
//否则的话就会获得关于锁的过期时间的一个number
Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// lock acquired
...

接下来看tryAcquire

看代码其实一头雾水,从整体上理解就是返回一个异步任务的结果,这个异步任务就是获取一个锁的获取情况,这里到后面再理解代码的含义

然后我们顺着tryLock继续往下面看

if (ttl == null) {
    return true;
}

time -= System.currentTimeMillis() - current;
if (time <= 0) {
    acquireFailed(waitTime, unit, threadId);
    return false;
}

current = System.currentTimeMillis();
//这个订阅方法,是订阅在Lua脚本中别人释放锁的信号,而释放锁的信号是在Lua脚本中完成的
//"redis.call('del', KEYS[1]); " 
//"redis.call('publish', KEYS[2], ARGV[1]); " 
RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
    //await方法解析:这个方法是等待time的时间,如果这个futrue在time的时间内返回了,那么就为true
    //否则就会false
    //这里的话就等不到锁了,于是直接不等了,超时
    if (!subscribeFuture.cancel(false)) {
        subscribeFuture.onComplete((res, e) -> {
            if (e == null) {
                unsubscribe(subscribeFuture, threadId);//在这里取消订阅关系
            }
        });
    }
    acquireFailed(waitTime, unit, threadId);
    return false;//获取锁失败
}
//以下是有可能获取到锁的流程
try {
    //执行上述代码需要耗时,于是先检验
    time -= System.currentTimeMillis() - current;
    if (time <= 0) {
        acquireFailed(waitTime, unit, threadId);
        return false;
    }
    //正式开始锁的重试
    while (true) {
        long currentTime = System.currentTimeMillis();
        ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
        //开始之重试之前先检查时间
        //尝试获取锁
        // lock acquired
        if (ttl == null) {//获取到了
            return true;
        }
        //检查
        time -= System.currentTimeMillis() - currentTime;
        if (time <= 0) {
            acquireFailed(waitTime, unit, threadId);
            return false;
        }
        
        // waiting for message
        // 流程分支解读
        /*信号量机制:*/
        currentTime = System.currentTimeMillis();
        if (ttl >= 0 && ttl < time) {
            //ttl是锁的过期时间,time是我要等的时间
            //如果锁的过期时间在我要等的时间之内,那么就会尝试去获取这个future的中的信号量
            //可以看到这个信号量并没有得到返回值,那么是什么个执行机制呢,理论对程序逻辑没有任何影响
            subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
        } else {
            subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
        }

        time -= System.currentTimeMillis() - currentTime;
        if (time <= 0) {
            acquireFailed(waitTime, unit, threadId);
            return false;
        }
    }
} finally {
    unsubscribe(subscribeFuture, threadId);
}

首先await()会使得线程进入阻塞状态,一直等待发布方返回一个锁空闲的消息

这是为了解决资源浪费的问题,如果在time(最大等待时间内)都等不到有锁空闲,那么就直接返回并且取消订阅

如果在time内有时机是锁空闲的,那么就会尝试获取锁,那么是如何尝试获取锁的呢?

if (ttl >= 0 && ttl < time) {
    //ttl是锁的过期时间,time是我要等的时间
    //如果锁的过期时间在我要等的时间之内,那么就会尝试去获取这个future的中的信号量
    //可以看到这个信号量并没有得到返回值,那么是什么个执行机制呢,理论对程序逻辑没有任何影响
    subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
    subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}

首先,它利用了CountDownLatch中的tryAcquire()方法实现线程的等待阻塞,其含义是在time的时间内,如果获取到(或者没有)许可,那么将会解除阻塞,并且继续执行,简单来说就是一个计时器的功能,这个计时器能够使得线程在合理的时间内被唤醒并且尝试获取锁。

if (leaseTime != -1) {
    return tryLockInnerAsync(waitTime, leaseTime, unit, 
                             threadId, RedisCommands.EVAL_NULL_BOOLEAN);
}
RFuture<Boolean> ttlRemainingFuture = tryLockInnerAsync(waitTime,
                                                        commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),//走默认时间30*1000
                                                        
 TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);

ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
    if (e != null) {//如果没有异常,那么就直接返回
        return;
    }

    // lock acquired
    if (ttlRemaining) {//获取锁成功了,解决有效期的问题,重置锁的有效期
        scheduleExpirationRenewal(threadId);
    }
});
return ttlRemainingFuture;

自动续期的功能

this.id = commandExecutor.getConnectionManager().getId();
this.entryName = id + ":" + name;
ExpirationEntry entry = new ExpirationEntry();
//锁的名称->entry
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
//只有当第一次来的时候才放入对象
//如果是重入的话,就不执行
if (oldEntry != null) {
    oldEntry.addThreadId(threadId);
} else {
    entry.addThreadId(threadId);
    renewExpiration();//添加看门狗监控线程
}
public synchronized void addThreadId(long threadId) {
    Integer counter = threadIds.get(threadId);//使用thread变量,使用可重入
    if (counter == null) {
        counter = 1;
    } else {
        counter++;
    }
    threadIds.put(threadId, counter);
}
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
    return;
}
//延时任务
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
    @Override
    public void run(Timeout timeout) throws Exception {
        ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
        if (ent == null) {
            return;
        }
        Long threadId = ent.getFirstThreadId();
        if (threadId == null) {
            return;
        }

        RFuture<Boolean> future = renewExpirationAsync(threadId);
        future.onComplete((res, e) -> {
            if (e != null) {
                log.error("Can't update lock " + getName() + " expiration", e);
                return;
            }

            if (res) {
                // reschedule itself
                renewExpiration();//递归执行,注意,这里有栈溢出的隐患
            }
        });
    }
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);//内部锁释放时间10s

ee.setTimeout(task);
return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                      //刷新有效期
                      "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +//锁是不是这个线程拿的
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +//重置锁的有效期
                      "return 1; " +
                      "end; " +
                      "return 0;",
                      Collections.singletonList(getName()),
                      internalLockLeaseTime, getLockName(threadId));
public RFuture<Void> unlockAsync(long threadId) {
    RPromise<Void> result = new RedissonPromise<Void>();
    RFuture<Boolean> future = unlockInnerAsync(threadId);

    future.onComplete((opStatus, e) -> {
        cancelExpirationRenewal(threadId);//取消更新任务

        if (e != null) {
            result.tryFailure(e);
            return;
        }

        if (opStatus == null) {
            IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                    + id + " thread-id: " + threadId);
            result.tryFailure(cause);
            return;
        }

        result.trySuccess(null);
    });

    return result;
}

2.10 Redisson是怎么解决主从一致性问题的?

Redis对于主从一致性问题的解决思路非常简单,如果主从复制会产生数据不一致的情况,那么我就让这些节点不再有这些角色的分配,而是让这些节点具有一样的权重

这也就意味着,当设置锁的时候,这些锁在每个redis节点都存在,当解锁的时候,只有当每个节点返回正确的解锁信息才能真正的解开这个锁

实战

  • 步骤1:在Java应用配置集群
@Bean
public RedissonClient redissonClient(){
    Config config = new Config();
    config.useSingleServer().setAddress("redis://localhost:6379");
    return Redisson.create(config);
}
@Bean
public RedissonClient redissonClient2(){
    Config config = new Config();
    config.useSingleServer().setAddress("redis://localhost:6380");
    return Redisson.create(config);
}
@Bean
public RedissonClient redissonClient3(){
    Config config = new Config();
    config.useSingleServer().setAddress("redis://localhost:6381");
    return Redisson.create(config);
}
  • 步骤二:创建联锁
@Resource
private RedissonClient redissonClient;
@Resource
private RedissonClient redissonClient2;
@Resource
private RedissonClient redissonClient3;
private RLock lock;
@BeforeEach
void setUp(){
    RLock lock1 = redissonClient.getLock("order");
    RLock lock2 = redissonClient2.getLock("order");
    RLock lock3 = redissonClient3.getLock("order");
    //创建联锁,被创建为集合
    lock = redissonClient.getMultiLock(lock1,lock2,lock3);
    //然后正常使用即可
}

联锁的使用方法与普通的分布式锁的使用方法是一致的


文章作者: 穿山甲
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 穿山甲 !
  目录