深入学习Redis-异步与消息队列


1. 异步秒杀

1.1 异步秒杀的实现思路

首先回顾关于秒杀的流程,秒杀为了防止超卖以及实现一人一单,必须实现判断秒杀库存,校验一人一单后,才能进行数据库的写入,然而在原业务流程中,这些流程都是串行执行的。但是实际上,这种业务是完全可以分开做的,也就是说,在判断秒杀库存,校验一人一单的信息后,服务端就可以返回信息给用户了,然后我们后端保证在后台可以创建对应的订单,就可以了。

最终达到的效果就是用户感知到的业务执行时间就是判断秒杀库存和校验一人一单的信息的这个时间,而与数据库交互的时间对用户是透明的

实现思路
第一个问题:既然要在Redis中做这个秒杀库存和校验一人一单的信息,那么必然就要在Redis中存储数据,那么以什么样的形式存储数据呢?

首先关于库存的问题,库存原本我们的实现是通过CAS+version机制实现乐观锁+解决ABA问题的,那么在这里我们的实现的数据结构就是

KEY VALUE
stock:vid:7 100

关于超卖问题,这个超卖问题可以通过记录下购买过这个商品的用户id来实现,当用户进行购买的时候,就能够通过查询这个用户的id是否存在,如果存在,那么就不卖给他,那么实现的数据结构具有以下的特点

  • 一个key能够对应很多value
  • value具有唯一性

set集合符合这样的特质

第二个问题:业务流程如何执行?

1.2 基于Redis完成秒杀资格判断

编写基于Lua脚本的判断

--- 秒杀脚本编写
--- Generated by EmmyLua(https://github.com/EmmyLua)
--- Created by 12743.
--- DateTime: 2023/1/18 15:00
---
-- 1.参数列表
-- 优惠券id
local voucherId = ARGV[1]
-- 在set集合中是否存在,用户id
local userId = ARGV[2]
-- 库存的key
local storeKey ='seckill:stock:' .. voucherId;
-- 订单的key
local orderKey = 'seckill:order:' .. voucherId;

-- 脚本业务
-- 判断库存是否充足,getStoreKey
if (tonumber(redis.call('get', storeKey))<=0) then
    -- 库存不足
    return 1
end
-- 判断用户是否下单
if (redis.call('sismember', orderKey, userId) == 1) then
    -- 存在,无法下单,不允许
    return 2;
end
-- 证明库存充足而且第一次来,那么就可以下单,然后执行stock -1
redis.call('incrby',storeKey,-1)
-- 同时记录下这个用户下了单
redis.call('sadd',orderKey,userId)
return 0

在Java中调用判断脚本

首先先写出对应的框架

public Result seckill(Long voucherId) throws InterruptedException {
    UserDTO user = UserHolder.getUser();
    //1. 执行Lua脚本
    Long result = stringRedisTemplate.execute(
            SECKILL_SCRIPT,
            Collections.emptyList(),
            voucherId.toString(), user.getId().toString()
    );
    //2. 通过Lua脚本判断是否有购买资格
    //3. 没有购买资格,返回错误信息
    assert result != null;
    int retValue = result.intValue();
    if(retValue != 0){
        return Result.fail("没有购买资格!");
    }
    //4. 有购买资格,将下单信息保存到阻塞队列
    long orderId = redisIDWorker.nextId("order:");
    //创建阻塞队列

    //异步执行数据库写入

    //5. 返回订单ID
    return Result.ok(orderId);
}

调用阻塞队列

首先阻塞队列是JUC包下提供的一种的并发编程的数据结构,这种数据结构的特点是:当一个线程尝试从阻塞队列中获取元素的时候,如果这个阻塞队列是空的话,那么线程就会被阻塞,当队列中有元素被push()的时候,就会唤醒线程,并且提交task

private final BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024*1024);
//创建阻塞队列,并且写入
orderTasks.add(voucherOrder);

异步下单

异步下单写入数据库的操作需要借助于开辟的新的线程,对于新的线程,最好使用线程池进行管理

private class voucherOrderHandler implements Runnable{
    @Override
    public void run() {
        while (true){
            //获取队列中订单信息
            VoucherOrder head = null;
            try {
                head = orderTasks.take();
            } catch (InterruptedException e) {
                log.info("处理订单异常:{}",e.getMessage());
                e.printStackTrace();
            }
            //创建订单
            handleVoucherOrder(head);
        }
    }
}
private void handleVoucherOrder(VoucherOrder voucherOrder){
    //获取用户
    Long userId = voucherOrder.getUserId();
    //创建锁对象
    RLock lock = redissonClient.getLock("lock:order:" + userId);
    //获取锁
    boolean isLock = lock.tryLock();
    if(!isLock){
        log.error("redis服务产生问题,此处实现分布锁的流程");
    }
    try{
        //执行创建订单的操作
        proxy.seckillImpl(voucherOrder.getVoucherId(),userId);
    }finally {
        lock.unlock();
    }
}

注意这里有一个ThreadLocal的问题,由于是开辟了子线程,所以在父线程中的ThreadLocal数据是没有共享的,因此在这样的情况下,我们之前基于ThreadLocal的操作通通不能做了,只能通过别的共享方式进行操作

就比如说,第一个是关于保存订单中,关于代理对象的问题,代理对象的保存也是基于ThreadLocal进行实现的,因此这个代理对象的获取就会失效,那么为了保证获取同一个代理对象,办法就是提升作用域,提前保存对象到对象实例中。

第二个就是关于UserHolder中实现的getUser(),在这里的话必须要使用vocherId

上述的实现有什么问题?

  • 所使用的阻塞队列是放在JVM的内存中的,如果不加以选择,不断地向队列中put()元素,最终就会OOM
  • 假设JVM宕机了,内存中的东西全部丢失了就会导致异步任务全部丢失,从而产生错误

2. 消息队列

2.1 认识消息队列

消息队列Message Queue:字面意思就是存储消息的队列,最简单的消息队列包括三个角色

  • 消息队列:存储和管理消息,被称为消息代理(Messge Broker)
  • 生产者:发送消息到消息队列
  • 消费者:从消息队列中获取消息并且处理消息

Redis中提供了三种不同的方式来实现消息队列

  • List结构:基于List结构模拟消息队列
  • Pubsub结构:基本的点对点消息模型
  • Stream:比较完善的队列模型

2.2 基于List实现消息队列

我们在学习数据结构的时候,本质上队列、栈这些数据结构都是通过链表演化出来的,其功能其实是链表的阉割版。而在Redis中的List,它本质上是一个双向链表,因此非常容易地就能够模拟出一个消息队列

如果要实现一个具有阻塞性的阻塞队列,那么应该要使用如下的API

  • BRPOP
  • BLPOP

优点

  • 利用redis进行存储,不会受限于JVM的内存上限
  • 基于redis的持久化机制,数据安全性有保证
  • 可以保证消息的有序性

缺点

  • 无法避免消息丢失,比如说当消费者pop出了一个数据,但是当这个数据还没有处理完,redis就宕机了,就会导致消息的丢失
  • 只支持单消费者

2.3 基于Pubsub实现消息队列

Pubsub(发布订阅模型)中,消费者可以订阅一个或者多个channel,生产者向对应的channel发送消息后,所有订阅者都能够收到相关的消息

  • SUBSCRIBE channel [channel]:订阅一个或者多个频道
  • PUBLISH channel msg:向一个频道发送消息
  • PSUBSCRIBE pattern[pattern]:订阅与pattern格式匹配的所有频道

优点

  • 支持多生产生消费

缺点

  • 不支持数据的持久化
  • 无法避免消息的丢失(阅后即焚)
  • 消息堆积有上限,超出的时候将丢弃消息请求,消息堆积不是指在channel中堆积,而是指在消费者的缓冲区中进行堆积

2.4 基于Stream实现消息队列

Stream是一种数据类型,可以实现一种功能完善的消息队列

XADD key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]]
*|ID]
field value [field value...]

key:消息队列的名称

NOMKSTREAM:如果队列不存在,是否自动创建队列,默认是自动创建

MAXLEN:最大长度,设置消息队列的最大消息数量,相当于设置一个有界的阻塞队列

*|ID:指定消息的唯一ID,*代表说由Redis自动生成,格式为时间戳-递增数字

fidld value:发送到消息队列中的消息,格式就是key-value键值对

在业务开发中,通常可以循环调用XREAD 阻塞方式来查询最新的消息,从而实现监听队列的效果

当我们指定起始ID为$的时候,代表读取最新的消息,如果我们处理一条信息的过程中,又有超过一条的消息到达队列,则下次获取的时候也只能够获取到最新的一条,会出现消息漏读的问题

  • 消息可回溯
  • 一个消息可以被多个消费者读取
  • 可以阻塞读取
  • 有消息漏读的风险

2.5 消费者组模式

所谓消费者组就是将多个消费者划分到一个组中,监听同一个队列,具备有以下的特点

  • 消息的分流:队列中的消息会分流给不同的消费者,而不是重复的消费,从而加快消息处理的速度
  • 消息的标识:消费者组会维护一个标识,记录最后一个被处理的消息,哪怕消费者宕机重启,还会从标识之后读取数据,确保每一个消息都会被消费
  • 消息的确认机制:消费者在获取消息之后,消息会处于一个pending的状态,并且存入一个pending-list中,当处理完成后需要通过XACK来确认消息,标记消息为已处理,才会从pending-list中移出

创建消费者组

XGROUP CREATE KEY groupName ID [MKSTREAM] 
  • key:队列名称
  • groupName:消费者组的名称
  • ID:起始ID标识,$标识队列中的最后一个消息,0代表队列中的第一个消息(从头开始)
  • MKSTREAM:队列不存在的时候自动创建队列
#删除指定的消费者组
XGROUP DESTORY key groupName
#给指定的消费者组添加消费者
XGROUP CREATECONSUMER key groupname consumername
#删除消费者组中的指定消费者
XGORUP DELCONSUMER key groupname consumername

从消费者中读取消息

XREADGROUP GROUP group consumer [COUNT count]
[BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
  • group:消费组名称

  • consumer:消费者名称,如果消费者不存在,会自动创建一个消费者

  • count:本次查询的最大数量

  • BLOCK milliseconds:当没有消息的时候的最长等待时间

  • NOACK:不会启用pending-list机制,当消息一旦投递,就会自动确认

  • STREAMSk key:指定队列的名称

  • ID:获取消息的起始ID,>表示从下一个未消费的消息开始,而其他表示根据id从pending-list中获取已消费但未确认的消息,例如0,表示是从pending-list中的第一个消息开始

消费者监听消息的基本思路

实战模拟

  • 步骤1:创建消费者组以及消息队列
XGROUP CREATE stream.orders g1 MKSTREAM
  • 步骤2:修改Lua脚本
-- 订单id
local orderId = ARGV[3]
-- 有资格后,发送信息到队列中 XADD streams.orders * k1 v1 k2 v2 ..
redis.call('xadd','streams.order','*','userId',voucherId,'voucherId',id,'orderId')
  • 步骤3:修改业务流程
public Result seckill(Long voucherId) throws InterruptedException {
    UserDTO user = UserHolder.getUser();
    //1. 执行Lua脚本
    long orderId = redisIDWorker.nextId("order:");
    Long result = stringRedisTemplate.execute(
            SECKILL_SCRIPT,
            Collections.emptyList(),
            voucherId.toString(), user.getId().toString(),Long.toString(orderId)
    );
    //2. 通过Lua脚本判断是否有购买资格
    //3. 没有购买资格,返回错误信息
    assert result != null;
    int retValue = result.intValue();
    if(retValue != 0){
        return Result.fail("没有购买资格!");
    }
    //获取代理对象
    proxy = (IVoucherOrderService) AopContext.currentProxy();
    //异步执行数据库写入
    //5. 返回订单ID
    return Result.ok(orderId);
}
public void run() {
    while (true){
        //获取消息队列中的订单信息
        //XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 200 STREAMS stream.order >
        //lastConsumed是最后消费的那个
        //latest是消费最新的,没有标记
        List<MapRecord<String, Object, Object>> messages = stringRedisTemplate.opsForStream().read(
            Consumer.from(groupName, consumerName),
            StreamReadOptions.empty().count(1).block(Duration.ofMillis(200)),
            StreamOffset.create(queueName, ReadOffset.lastConsumed())

        );
        //判断消息获取是否成功
        //如果不存在,说明没有消息
        if(messages == null || messages.isEmpty()){
            continue;
        }
        //如果存在,则进行处理的业务,解析消息
        for (MapRecord<String, Object, Object> record : messages) {
            Map<Object, Object> map = record.getValue();
            VoucherOrder voucherOrder =
                BeanUtil.fillBeanWithMap(map, new VoucherOrder(), true);
            //catch到异常,循环处理,并且在异常的时候记录日志(比如说重试次数达到了1000次)
            //XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 200 STREAMS stream.order 0
            try{
                handleVoucherOrder(voucherOrder);
            }catch (Exception e){
                e.printStackTrace();
                int cnt = 0;
                while (true){
                    cnt++;
                    List<MapRecord<String, Object, Object>> tryMessage
                        = stringRedisTemplate.opsForStream().read(
                        Consumer.from(groupName, consumerName),
                        StreamReadOptions.empty().count(1).block(Duration.ofMillis(200)),
                        StreamOffset.create(queueName, ReadOffset.from("0"))
                    );
                    if(tryMessage == null || tryMessage.isEmpty()){
                        break;//这里是break;
                    }
                    if(cnt  == 1000){
                        log.error("重试超过1000次,订单号:{}",voucherOrder.getId());
                    }
                    for (MapRecord<String, Object, Object> msg : tryMessage) {
                        Map<Object, Object> m = msg.getValue();
                        VoucherOrder errDeal = 
                            BeanUtil.fillBeanWithMap(m, new VoucherOrder(), true);
                        try{
                            handleVoucherOrder(errDeal);
                        }catch (Exception ee){
                            ee.printStackTrace();
                            continue;
                        }
                    }
                }
            }
            //ACK SACK streams.orders g1 record.getId()
            //处理成功,则ACK
            stringRedisTemplate.opsForStream().
                acknowledge(queueName,groupName,record.getId());
        }
    }

文章作者: 穿山甲
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 穿山甲 !
  目录