1. 项目架构说明
首先这个项目的整体架构如上图所示,用户端向NGINX
发起请求,NGINX
响应静态资源或者动态资源,当需要请求动态资源的时候,NGINX
转发用户的请求到tomcat,由tomcat完成此次请求的响应
注意,当负载过重的时候,这时候可以安排多个tomcat服务器同时运行接收请求,但是这时候就会存在集群内部的机器之间的交互的问题。
2. 基于Session实现登录
2.1 实现发送短信验证码的业务
http://localhost:8080/api/user/code?phone=13250631234
Request Method: POST
Status Code: 200
Remote Address: 127.0.0.1:8080
Referrer Policy: strict-origin-when-cross-origin
首先拿到请求的url,这个请求的url有一个前缀api
,这个api
是用来表示这个请求是发向tomcat
的,因此真正的请求路径是/user/code
对应到项目的controller
/**
* 发送手机验证码
*/
@PostMapping("code")
public Result sendCode(@RequestParam("phone") String phone, HttpSession session) {
// TODO 发送短信验证码并保存验证码
return Result.fail("功能未完成");
}
就在这里补充验证码的处理业务逻辑
- 第一步:首先在
controller
中定义需要的参数,以及执行对参数的校验
首先我们需要用户的phone
,然后拿到这个phone
之后进行校验,由于后期还需要对code
进行校验,那么我们就需要一个当前会话对象的session
,这个对象由spring
容器为我们自动注入
@PostMapping("code")
public Result sendCode(@RequestParam("phone") String phone, HttpSession session) {
// 1. 在controller中对参数进行校验
if (!RegexUtils.isPhoneInvalid(phone)) {
return Result.fail("手机号码不合法!");
}
//2.执行业务逻辑
return userService.sendCode(phone,session);
}
- 第二步:编写关键业务逻辑
@Override
public Result sendCode(String phone, HttpSession session) {
//1. controller已经对phone做过了校验,service不需要多做这一步
//2. 生成一个验证码(6位)
String code = RandomUtil.randomString(6);
//3. 保存验证码到session
session.setAttribute(SystemConstants.CODE_STRING,code);
//4. 发送验证码
log.info("你获得的验证码是:{}",code);
//5. 业务执行完毕,返回
return Result.ok();
}
2.2 实现登录/注册的功能
首先拿到请求的url和post,定义如下接口
/**
* 登录功能
* @param loginForm 登录参数,包含手机号、验证码;或者手机号、密码
*/
@PostMapping("/login")
public Result login(@RequestBody LoginFormDTO loginForm, HttpSession session){
// TODO 实现登录功能
return Result.fail("功能未完成");
}
- 第一步,先做参数的校验
@PostMapping("/login")
public Result login(@RequestBody LoginFormDTO loginForm, HttpSession session){
//1. 对手机号进行校验
String phone = loginForm.getPhone();
String password = loginForm.getPassword();
String code = loginForm.getCode();
if(phone == null || RegexUtils.isPhoneInvalid(phone)){
return Result.fail("手机号不合法!");
}
//2. 对验证码进行校验
if(password == null){
if(code == null){
return Result.fail("验证码不能为空");
}
}
//4.执行关键逻辑
return userService.login(loginForm,session);
}
- 第二步,编写关键逻辑
@Override
public Result login(LoginFormDTO loginForm, HttpSession session) {
//1.参数校验在controller中完成了
//2.既然要登录,那么首先从数据库中进行用户的查询
User user = query().eq("phone", loginForm.getPhone()).one();
//3.如果user不存在那么就注册
if(user == null){
user = this.createUserByPhone(loginForm.getPhone());
}
//4.存在的话,就将用户信息保存到session
session.setAttribute("user",user);
//5.业务执行完毕
return Result.ok();
}
private User createUserByPhone(String phone){
User user = new User();
user.setPhone(phone);
user.setNickName(SystemConstants.USER_NICK_NAME_PREFIX+RandomUtil.randomString(10));
save(user);
return user;
}
2.3 实现登录的验证功能
首先,登录的验证逻辑是这样的前端发送请求
@GetMapping("/me")
public Result me(){
// TODO 获取当前登录的用户并返回
return Result.fail("功能未完成");
}
如果前端检测到发送回来的状态码是失败的情况的话,那么就会直接跳转到登录页面,因此,我们需要补充这个接口
那么首先,用户携带的参数是一个SessionId
,那么我们可以取出这个SessionId,然后拿出对应的session
,然后检查session里面有没有对应的user
对象,如果有,就证明用户已经登录了,否则的话就是没有登录,执行相关逻辑即可
这里的话稍微思考一下
检验用户是否登录这个业务,是纯粹的纵向业务还是横向业务?我们知道,要使用我们的服务,必须执行登录,因此这个业务是一个横向交叉业务,那么就是一个切面了,既然是一个切面,我们就可以使用
Spring
提供的AOP
切面编程,为这些切面编写切点和通知,分析性质可知,这个通知肯定是一个前置通知。那么在springmvc中,提供了一种
AOP
技术,拦截器技术,它能够在所有controller
执行之前,都去执行切点表达式中所需要通知的方法,根据方法业务中的结果,判断该请求是否能够放行
同时,我们知道后续的业务逻辑是需要用到
User
的信息,但是这个User
信息是由我们的全局拦截器进行返回的,那么这时候就会产生一个线程安全的问题了,假设用户开了很多个页面,发送了很多次请求,这很多次请求都会导致产生一个线程去访问我们的tomcat服务器,那么如果这多个线程都去同时操作我们同一个user
对象,那么这时候就可能导致线程的不安全问题,基于此,可以使用ThreadLocal
来存储查询得到的user
对象,这样的话就能够保证线程安全问题了,各个线程之间的操作互不干扰。
有没有具体的例子呢?比如说在高并发的场景下,一个会话下有许多个连接,这多个连接可能是不同的用户发起的,这个项目还没有,但是比如说当一台电脑中的一个浏览器,它同时具有两个用户端,一个客户端和一个管理端,这时候就会导致客户端和管理端的用户使用错误了,比如说客户端用的是A账号,然后这时候这个人又去管理端上使用了B账号,这时候你本来在A上用得好好的,突然去登录B,这时候好了,客户端的账号变成了B的了,这时候就会产生逻辑错误,会产生一系列的问题。
而且如果说你一直用的是session提供的对象,实际上是会导致脏读的问题的,就是,你这个对象是存在缓存里面的,实际上只有持久化的设备数据库上的数据才是可信的,比如说请求A修改了user,然后发生异常程序直接crash了,但是这时候已经写入了user,这时候的数据操作进行到一半就crash掉了,最终就会导致数据的不一致
这样有点抽象,可以举个例子
public class User{ private Integet account; private String name; }
假设说请求A发起了转账操作,令
a.acount+=10;b.acount-=10
,但是这时候抛出了异常,使得a.acount+=10
没有执行后面对b的操作,然而这时候已经写入了内存中了,其他请求线程读取session中的user对象的时候也会读取到这个a,那就会产生问题了。但是如果我们用
ThreadLocal
呢,这时候的问题就解决了,每个线程都有自己的副本,只要程序业务注意缓存的加载和刷新即可保证程序的正确性。同样的,因为这是并发场景,幻读和不可重复读的现象肯定还是会出现的,因为我们没有加锁,啥也没干,这两个现象肯定会出现
2.4 集群的Session共享问题
Session共享问题:多台tomcat
并不共享session
存储空间,当请求切换到不同的tomcat
服务器的时候就会导致数据丢失的问题
场景:比如说有三台的
tomcat
,然后用户首次登录,将code存储到了第一台服务器上,然而在后来,再次接收登录请求,登录请求分发到了第一台服务器上,此时就会产生错误。
session的替代方案应该要满足
- 数据共享
- 内存存储
- key、value结构
2.5 如何使用redis代替session
首先,我们之前做的都是session
这种方案来的,那么现在我们将写入session
的过程全部修改为写入redis
code的数据结构的选择
由于存储的是验证码,因此使用String
比较好,这是value的选型
但是key呢?我们在session中,使用的是code
,这是基于每个浏览器对应一个session
的这个前提来做的
但是在redis
中,它是共享的内存空间,无论你是多少个浏览器,只要你是通过tomcat来访问我们的redis
,你都会读取到相同的数据
为了解决这个二义性的问题,我们选用phone
作为key
即可
缓存的创建与加载问题
我们知道,session
对象的创建和加载都是由tomcat
服务器为我们解决的
- 当服务器中没有session对象的时候,tomcat为我们自动生成一个,并设置存活时间和sessionId
- 当服务器中有session对象的时候,tomcat传递这个session对象给我们使用
但是现在使用了redis,那么sessionId要怎么存呢?
对于存储用户对象的key,可以选择使用一种叫做token
的数据类型进行数据的存储,所谓的token就是随机字符串,比如说用UUID
这样的算法来生成
我们可以这样设计,首先查询redis
缓存中有没有对应token
的value
,有的话直接取出,没有的话缓存phone->value
的键值对以便使用
保存登录的用户信息,可以使用String
结构,以JSON
字符串进行保存,这种方式比较直观
也可以使用Hash
结构,这种结构可以将对象中的每个字段独立存储,可以针对单个字段做CRUD,并且内存占用比较少,其中针对单个字段的修改,hash
结构具有比较大的优势
其中这个流程有变化,因为session
发送到浏览器端的时候,是内置对象自动化的,但是token
是我们设计的,所以这时候就需要将token
写入到请求头中。
设计总结
在设计key的时候,必须要注意key的唯一性以及key的安全性
在设计value的时候,必须要考虑需求,考虑操作的频繁性
2.6 基于redis实现短信登录
修改验证码
//3. 改动:需要修改为redis,加上业务前缀以示区分
//设置有效期
stringRedisTemplate.opsForValue().set("login:code:"+phone,code,5, TimeUnit.MINUTES);
修改登录流程
这个流程比较复杂,下面我将分步进行代码的重构
- 第一步:修改登录流程
@Override
public Result login(LoginFormDTO loginForm, HttpSession session) {
//1.参数校验在controller中完成了,我们在业务中进行验证码的校验
String codeKey = RedisConstants.LOGIN_CODE_KEY + loginForm.getPhone();
String code = stringRedisTemplate.opsForValue().get(codeKey);
//然后比对
if(!loginForm.getCode().equals(code)){
return Result.fail("验证码错误!");
}
//2.既然要登录,那么首先从数据库中进行用户的查询
User user = query().eq("phone", loginForm.getPhone()).one();
//3.如果user不存在那么就注册
if(user == null){
user = this.createUserByPhone(loginForm.getPhone());
}
//4.存在的话,就将用户信息保存到redis
//存储结构为token->hash
//4.1 随机生成token,作为登录的令牌
String token = UUID.fastUUID().toString(true);
String tokenKey = RedisConstants.LOGIN_USER_KEY+token;
//4.2 将User对象转为Hash存储
//这里有一个小坑,就是用hutools的这个工具的话,它会将User这个对象转化为一个map对象
//但是你要特别注意,这个map对象的value是object,然而你的stringRedisTemplate是处理String的
//因此你必须要把Long类型转换为String类型,转化代码如下:
UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);
//第一步,忽略空值
//第二步,将value值全部转为string
Map<String, Object> hash = BeanUtil.beanToMap(userDTO, new HashMap<>(), CopyOptions.create()
.ignoreNullValue()
.setFieldValueEditor((fieldName, fieldValue) -> fieldValue.toString())
);
//4.3 存储对象
//putAll(),key:就是整个hash的key,然后你再传入一个key即可
stringRedisTemplate.opsForHash().putAll(tokenKey,hash);
stringRedisTemplate.expire(tokenKey,RedisConstants.LOGIN_USER_TTL,TimeUnit.MINUTES);
//5.业务执行完毕,别忘了要给前端返回一个token
return Result.ok(token);
}
- 第二步,修改登录拦截器
登录拦截器需要完成的功能有:
从Redis中取出user
对象,判断是否为空
如果是空,那么拦截
如果为非空,那么就放行
那么要怎么做呢?我们知道,这里分析了前端的代码是通过在请求头中携带了类似于sessionId
的字段
也就是authorization
这个字段,然后后端从request
中取出这个字段,将这个字段进行redis
的查询即可
注意,这里有个问题,就是我们的这个LoginInterceptor能够直接使用直接注入的
stringRedisTemplate
吗?这个的问题,首先我们要搞清楚我们自动注入的容器是谁?是Spring的IoC容器,但是这个Interceptor
是属于SpringMvc容器的,因此我们无法直接注入这个对象,这是因为Spring的IoC
容器是父容器
,而SpringMVC的容器是子容器
,这个子容器可以直接访问父容器,但是父容器不能直接访问子容器而我们知道
LoginInterceptor
的对象是归SpringMvc
的容器管的,是子容器中的对象,Spring
的IoC
容器无法直接访问SpringMvc
的容器,那么怎么办呢?
还记得我们是如何添加拦截器的吗?
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(new LoginInterceptor());
}
我们看到添加拦截器的时候,是直接new出来的,那么注入对象的时机可以在这里自动进行注入
那么我们注入的stringRedisTemplate
从哪里来呢?就是从Spring
的IoC
容器中来,这个配置类是SpringMvc
的配置类,还是归Spring
管的,因此可以在此处注入stringRedisTemplate
,然后传递到SpringMvc
所管理的容器中
那么修改为
@Slf4j
public class LoginInterceptor implements HandlerInterceptor {
private StringRedisTemplate stringRedisTemplate;
public LoginInterceptor(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}
}
@Resource
private StringRedisTemplate stringRedisTemplate;
@Override
public void addInterceptors(InterceptorRegistry registry) {}
}
然后我们开始编写相关代码
//进入controller之前做登录校验
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
//从redis中查询用户信息
String token = request.getHeader("authorization");
if(StringUtils.isBlank(token)){
log.info("token为空");
response.setStatus(401);
return false;
}
//组装key
String key = RedisConstants.LOGIN_USER_KEY+token;
//获取用户
Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(key);
//判断用户是否存在,也就是判断这个map是否为空
if(userMap.isEmpty()){
log.info("用户未登录");
response.setStatus(401);
return false;//用户未登录
}
UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);
//否则的话就是有这个用户的信息
//存在则保存信息到Local
UserHolder.saveUser(userDTO);
//放行
//刷新token
stringRedisTemplate.expire(key,RedisConstants.LOGIN_USER_TTL, TimeUnit.SECONDS);
log.info("缓存user对象,放行请求");
return true;
}
好了,做到这里基本上功能已经完成了,但是有一点需要注意的是:
想想看,假设用户在使用的过程,已经登录过了,但是一直访问的是不需要的登录的页面,但是这时候用户也在客户端上活跃啊,但是这时候并不会进入到我们的拦截器中,也就会导致用户在一直活跃,但是过了一段时间后,key过期,导致登录失效
解决思路:
第一个拦截器去拦截检查token,如果token在redis中没有过期,那么就延长过期时间,同时将这个用户信息存储到ThreadLocal中,如果token在redis不存在,那么不管,直接放行,交给第二个拦截器
第二个拦截器判断,你只需要查询,在上一次的请求中是否获取到了token,并且将对应的用户缓存到了ThreadLocal
中?注意,这里是同一个线程的,所以两个拦截器都是同一个线程在进行操作,所以可以很好的使用ThreadLocal
这种模式叫做责任链模式,将责任再次细分
@Slf4j
public class RefreshTokenInterceptor implements HandlerInterceptor {
private StringRedisTemplate stringRedisTemplate;
public RefreshTokenInterceptor(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
//从redis中查询用户信息
String token = request.getHeader("authorization");
if(StringUtils.isBlank(token)){//如果为空,则直接放行
log.info("token为空,放行到下一层拦截器");
return true;
}
//组装key
String key = RedisConstants.LOGIN_USER_KEY+token;
//获取用户
Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(key);
//判断用户是否存在,也就是判断这个map是否为空
if(userMap.isEmpty()){
log.info("token不存在(过期/失效/错误的参数),token拦截器放行,交给下层拦截器");
return true;//用户未登录
}
UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);
//否则的话就是有这个用户的信息
//存在则保存信息到Local
UserHolder.saveUser(userDTO);
//放行
//刷新token
stringRedisTemplate.expire(key,RedisConstants.LOGIN_USER_TTL, TimeUnit.SECONDS);
log.info("缓存user对象,刷新登录状态,放行请求");
return true;
}
}
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
//判断有没有token
if(UserHolder.getUser()==null){
log.info("用户未登录,拦截");
response.setStatus(401);
return false;
}
log.info("用户已经登录");
return true;
}
@Resource
private StringRedisTemplate stringRedisTemplate;
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(new LoginInterceptor())
.excludePathPatterns(
"/user/code",
"/user/login",
"blog/hot",
"/shop/**",
"/shop-type/**"
).order(1);
registry.addInterceptor(new RefreshTokenInterceptor(stringRedisTemplate)).addPathPatterns("/**").order(0);
//控制拦截顺序
}
3. 商户查询缓存
3.1 缓存概述
缓存就是数据交换的缓冲区,是存储数据的临时区域,一般读写性能比较高
- 浏览器缓存:当浏览器的缓存没有命中的时候,就会走tomcat
- 应用层缓存:在web层设置一个缓存,当下一次请求来的话,直接返回数据
- 数据库缓存:比如说索引,可以将索引相关数据进行缓存
- CPU多级缓存
- 磁盘缓存
缓存的作用
- 降低后端的负载
- 提高读写效率,降低响应时间
缓存的成本
- 数据的一致性成本:需要维护数据库与缓存的一致性
- 代码维护的成本
如何添加缓存?
@Resource
private StringRedisTemplate stringRedisTemplate;
@Override
public Result queryById(Long id) {
//从redis中查询是否有缓存,
String shopKey = RedisConstants.CACHE_SHOP_KEY+id.toString();
String shopJson = stringRedisTemplate.opsForValue().get(shopKey);
//缓存命中
if(StrUtil.isBlank(shopJson)){
Shop shop = JSONUtil.toBean(shopJson, Shop.class);
return Result.ok(shop);
}
//缓存不命中
//查询数据库
Shop shop = getById(id);
//数据库中没有信息,报错
if(shop == null){
return Result.fail("店铺不存在!");
}
//数据库中有信息,写回redis
stringRedisTemplate.opsForValue().set(shopKey,JSONUtil.toJsonStr(shop));
return Result.ok(shop);
}
3.2 缓存更新策略
内存淘汰 | 超时剔除 | 主动更新 | |
---|---|---|---|
说明 | 不需要自己进行维护,利用Redis内存淘汰机制,当内存不足的时候自动淘汰部分数据,下次查询时更新缓存 | 给缓存数据添加TTL时间,到期后自动删除缓存。下次查询时自动更新缓存 | 编写业务逻辑,在修改数据库的时候同时缓存数据,确保一致性 |
一致性 | 差,这是因为并不知道什么时候进行内存的淘汰与缓存,数据库发生改变时,无法知道是否形成一致 | 并非强一致性,是否一致与key的淘汰时间密切相关 | 好,并非完全一致 |
维护成本 | 无 | 低 | 高,需要编码维护一致性 |
- 低一致性的需求:使用内存淘汰机制
- 高一致性需求:主动更新,并以超时剔除作为兜底的方案
Cache Aside Pattern(旁路缓存策略)
由缓存的调用者,在更新数据库的同时更新缓存,它比较适合读请求比较多的情况,该模式的工作流程如下:
- 首先从缓存中去读取数据,如果缓存命中则直接返回
- 如果缓存未命中,则需要到数据库中去读取
- 将数据库中读取的结果的副本加载进缓存中,并且返回
- 写操作
- 更新数据库
- 删除缓存中的数据
这个策略需要考虑以下问题
1.删除缓存还是更新缓存?
- 更新缓存:假设对数据库进行了n次的更新操作,那么也就需要对缓存做n次的更新操作,如果这n次操作并没有任何的线程进行数据的查询(写多读少的情况),那么就会造成许多次无效的写操作。
- 删除缓存:更新数据库的时候让缓存失效,查询时再更新缓存
(延迟加载)
两种模式比较而言,删除缓存的模式更佳,因为它具有一种懒加载的思想,只有当真正的读请求到来的时候,才将数据库一致性数据写入到缓存中,避免不必要的IO操作
2.如何保证缓存与数据库的操作同时成功或者同时失败?(保证原子性)
- 单体系统:将缓存与数据库操作放在一个事务当中
- 分布式系统:使用TCC等分布式事务方案
3.先操作缓存还是先操作数据库?
这个问题本质上是线程安全问题
假设我们先删除cache后更新db,这样可能会造成数据库和缓存的数据不一致
举个例子来说:请求1先写入数据A,请求2随后读取数据A
请求1执行写入缓存的操作,那么就执行写操作,写操作的流程是先删除
cache
,后更新db请求1将cache中的A数据删除,因为删除cache和更新db的操作不是原子性的,因此绝对有可能发生并发问题
请求2请求缓存数据,发现没有,因此直接到db中读取数据,然后缓存数据到
cache
中请求1更新数据库数据
这时候就产生了数据库和cache的不一致了
但是先更新db后删除cache就没有问题吗?
理论上还是会产生数据不一致的问题的,不过概率比较小的,这是因为缓存写入的速度比数据库的写入速度要快很多
举个例子来说:请求1读取数据A,请求2写入数据A,并且数据A在请求1请求之前就不在缓存中的话,就有可能产生数据的不一致的问题
请求1请求读取数据A,发现缓存不命中,于是从数据库中进行查询,此时由于数据库操作耗时高,因此请求2抢占CPU,执行操作
请求2写入数据到db,此时缓存中没有数据A,因此不需要删除cache
但是此时请求1返回执行,将之前查询到的数据A写入到缓存中
此时就产生了数据的不一致性
分析一下缺陷
- 首次请求数据一定不在cache中
这个缺陷可以这样解决,就是手动进行缓存预热
,将热点数据提前在缓存进行载入
- 写操作比较频繁的时候使用旁路缓存策略的话就会导致缓存频繁失效,弊大于利,不断地进行缓存的写入,IO开销大
解决思路:
- 数据库和缓存需要保持强一致性的场景:更新数据库的时候同样更新cache,不过需要加锁,避免线程安全问题
- 可以短暂允许缓存和数据库不一致的场景,更新db的时候同样更新cache,但是给缓存加一个比较短的过期时间,这样的话就可以保证即使数据不一致的话影响也比较小,也就是主动更新为主,然后超时剔除机制进行兜底
Read/Write Through Pattern(读写穿透)
缓存与数据库整合为一个服务,由服务来维护一致性,调用者调用该服务,无需关心缓存的一致性问题
写(Write Through)
- 先查 cache,cache 中不存在,直接更新 db。
- cache 中存在,则先更新 cache,然后 cache 服务自己更新 db(同步更新 cache 和 db)。
读(Read Through)
- 从cache中读取数据,读取到了就直接返回
- 读取不到的话,先从db中进行加载,写入到cache后返回响应
读写穿透策略和旁路缓存策略有什么区别?
他们本质上是一样的,都是在需要更新db的时候同步更新cache,但是,读写穿透策略的cache服务对于客户端来说是透明,但旁路缓存策略需要自己完成cache服务,这是最根本的不同。
Write Behind Caching Pattern(异步缓存写入)
调用者只操作缓存,由其他线程异步地将数据持久化到数据库,保证最终的一致性,这种属于最终一致性
有种感觉,就是异步缓存写入
和读写穿透策略
很相似,两者都是由cache
来负责cache
和db
的数据一致性
但是!最根本的区别在于异步和同步,异步缓存写入是基于一种类似于定时任务的机制,批量将cache中的数据刷回到数据库中,但是读写穿透策略本质上还是同步更新cache和db的,也就是说
异步缓存写入保证了最终一致性,而读写穿透策略能够保证强一致性
这种策略在我们平时开发过程中也非常非常少见,但是不代表它的应用场景少,比如消息队列中消息的异步写入磁盘、MySQL 的 Innodb Buffer Pool 机制都用到了这种策略。
Write Behind Pattern 下 db 的写性能非常高,非常适合一些数据经常变化又对数据一致性要求没那么高的场景,比如浏览量、点赞量。
为查询商铺的缓存添加超时剔除和主动更新的策略
业务需求
- 根据id查询店铺的时候,如果缓存没有命中,那么就查询数据库,将数据库结构写入到缓存中,并且设置超时时间
- 根据id修改店铺的时候,先修改数据库,再删除缓存
@Override
@Transactional
public Result update(Shop shop) {
//1.更新数据库
updateById(shop);
//2.删除缓存
stringRedisTemplate.delete(RedisConstants.CACHE_SHOP_KEY+shop.getId());
return Result.ok();
}
@PutMapping
public Result updateShop(@RequestBody Shop shop) {
Long id = shop.getId();
if(id == null){
return Result.fail("店铺不存在!");
}
// 写入数据库
return shopService.update(shop);
}
3.3 缓存穿透
缓存穿透
:就是指客户端请求的数据在缓存中和在数据库中都不存在,这样缓存永远都不会生效,最终这些请求都会打到数据库
方案1:缓存空对象
就是当数据库也不命中的时候,这时候数据库索性向缓存中缓存一个key->null
,这样的话就不会导致每次都压到数据库上去了
这种方法的优点是简单粗暴,维护方便
缺点:额外的内存消耗(就是当外部人员编造ID,编造各种各样不存在的ID,都会在缓存中存活,浪费空间,影响查询效率,方法也有,设置一个比较短的TTL,尽可能起保护作用)
,可能造成短期内的不一致(当插入前查询,缓存了null,当插入后查询,这时候缓存中依然为null,造成了数据库的不一致,方案是启用缓存更新策略,维护一致性)
方案2:布隆过滤
布隆过滤器
可以非常简单地判断一个给定数据是否存在于海量数据中,我们需要做的就是判断这个key是否合法,因此布隆过滤器适合用于做一个web层到 redis层之间的中间层,对请求中的key进行拦截判断
把所有可能存在的值都放在布隆过滤器中,当用户请求过来的时候,先判断用户发来的请求的值是否存在于布隆过滤器中,不存在的话直接返回错误信息到前端,存在的话才会走下面的流程
下图描述是布隆过滤是如何操作的
具有这样进行描述,首先用户的请求过来,它会先被布隆过滤器拦截
值是否位于布隆过滤器中?
- 不存在,直接报错,返回响应到前端
- 存在,放行走redis
然后后面就是正常的缓存+数据库的查询过程,就不赘述了
关于布隆过滤器的误判情况
布隆过滤器说某个元素存在,小概率会误判,布隆过滤器说某个元素不存在,那么这个元素一定就不存在
布隆过滤器的实现原理
布隆过滤器本质是一个二进制向量(位数组)和一系列的哈希函数
组成的数据结构,它占用空间比较小,具备较高性能但其缺点是返回的结果是概率性的,而不是非常准确的,理论情况下添加到集合中的元素越多,越有可能误报,存放在布隆过滤器中的元素不容易进行删除
当一个元素加入到布隆过滤器时,会进行哪些操作?
- 使用布隆过滤器中内置的哈希函数对元素值进行计算,也就是
H(x) = y
,其中x是原像,y是计算出来的哈希值,注意,布隆过滤器中的哈希函数不止一个,可能有多个,这是为了解决哈希冲突的问题 - 根据得到的哈希值,在位数组中把对应的下标的值设置为1
当一个元素被判断是否存在于布隆过滤器的时候,会进行哪些操作?
- 对给定的元素再次进行相同的哈希运算
- 得到值之后,判断位数组中的每个元素是否都为1,如果值都为1,那么就说明这个值在布隆过滤器中,如果存在一个值不为1,那么就说明这个元素不在布隆过滤器中
如图所示,当一个字符串要执行标记的时候,这时候就经过这个布隆过滤器内置的3个hash函数
,让这个hash
函数去计算对应的位向量下标,并且把这个位向量下标对应的值设置为1,当第二次,相同的数据来的时候,这时候就可以计算出相同的下标
那么,误判是怎么回事呢?
首先,我们要理解哈希碰撞,所谓哈希碰撞就是对于公式
h = H(x)
,对于不同的x1和x2
能够经过H()
运算后,得到相同的h
,那么这时候就产生问题了,如果两个不同的元素,恰巧产生相同的哈希值,就算这个元素没有被存储过,布隆过滤器也会认为它存在过,导致误判,但是这是非常小概率的事件,这是因为目前研发出来的哈希函数具有良好的抗碰撞性,并且在布隆过滤器中还有多个哈希函数这样的手段来降低碰撞的概率,因此不能说没有误判,只能说误判的概率很小误判,有两种情况
- 误判存在可能吗?完全可能,哈希碰撞导致
- 误判不存在可能吗?不可能,如果一个元素之前经过了布隆过滤器,那么它一定会留下mark,这个mark是不会被篡改的
当出现严重的误判现象的时候,我们可以采取以下措施
- 适当增加位数组的大小
- 调整哈希函数
实战:解决缓存穿透
- 使用空对象的思路
@Override
public Result queryById(Long id) {
//从redis中查询是否有缓存,
String shopKey = RedisConstants.CACHE_SHOP_KEY+id.toString();
String shopJson = stringRedisTemplate.opsForValue().get(shopKey);
//缓存命中
if(StrUtil.isNotBlank(shopJson)){//有商品数据才为true
log.info("店铺查询:缓存命中");
Shop shop = JSONUtil.toBean(shopJson, Shop.class);
return Result.ok(shop);
}
//命中的是否是空值
if(shopJson != null){
//返回错误信息
return Result.fail("店铺不存在");
}
//缓存不命中
//查询数据库
Shop shop = getById(id);
//数据库中没有信息,报错
if(shop == null){
//写入redis
stringRedisTemplate.opsForValue().set(shopKey,"",RedisConstants.CACHE_NULL_TTL,TimeUnit.MINUTES);
return Result.fail("店铺不存在!");
}
//数据库中有信息,写回redis
stringRedisTemplate.opsForValue().set(shopKey,JSONUtil.toJsonStr(shop),RedisConstants.CACHE_SHOP_TTL, TimeUnit.MINUTES);
return Result.ok(shop);
}
3.4 缓存雪崩
缓存雪崩
指同一时段大量的缓存key同时失效或者redis服务器宕机,导致大量请求直接到达数据库,带来巨大压力
给不同的key的TTL设置不同的随机值,避免在同一个时间全部过期
利用Redis集群提高服务的可用性
,通过哨兵机制,监控集群的状态,当主节点失效时,通过选举算法在从属节点上重新选举新节点,避免redis宕机带来的巨大压力,而集群间的数据一致性是通过主从复制来实现的
给缓存业务添加降级限流策略,比如说,当服务器出现重大事故,比如说火灾啥的,导致服务器全部宕机,这时候就应该要启动一个拒绝服务的策略,避免上游请求继续积压到下游服务器,甚至最后导致请求提交失败,导致严重的生产事故
给业务添加多级缓存,比如说在浏览器中添加缓存,在JVM中添加缓存等等
3.5 缓存击穿
缓存击穿
问题也被叫做热点key
问题,就是一个被高并发访问
并且缓存重建业务较复杂
的key突然间失效了,无效的请求访问会突然给数据库造成巨大的压力
常见的解决方案
互斥锁
简述一下互斥锁解决缓存击穿的机制
这种机制本质上是通过减少与数据库的交互次数,假设第一个线程查询缓存发现缓存中的数据过期失效,此时首先它会去获取对数据库操作权限的锁,当获得了这个锁之后,它就去执行对缓存数据的重建,而其他的线程进来之后,发现缓存查询不到的话就也会去尝试获取锁并且完成缓存的重建
但是由于第一个线程已经占用了锁,因此其他线程会以一种
自旋等待
的方式,就算线程获得了时间片,它也不会执行其他代码,而是会忙等锁的解开,不断地尝试获取缓存数据,直到缓存中有数据为止。
这种方式的性能比较差,因为出现了忙等,甚至在极端情况下可能出现死锁,造成严重的生产事故
死锁因为当操作数据库的线程因为某些因素被kill掉了之后,这个锁永远不会被释放,导致所有线程一直等待
或者是多个缓存间的访问加锁获取锁的这些操作,都会导致死锁
逻辑过期
让这些热点的key
永不过期,或者过期时间较长,或者我们人为设置一个逻辑过期时间,注意,逻辑过期就是说缓存中的数据的存在与否,不交给Redis直接操作,而是给到了我们的业务逻辑,当检测到逻辑过期的时候,执行相应的操作,描述如下:
互斥锁的最大弊端是什么?忙等,各个线程之间因为锁而自旋等待,无法响应客户端
那么为了解除忙等,于是提出了这种机制,忙等的根本原因是因为redis中已经没有任何数据可以返回了,那么为什么我不能够保留这个数据呢?在新的数据写入之前,我就一直使用的旧的数据,虽然会短暂造成数据的不一致,但是不会过于影响用户的体验,那么是怎么做的呢?
首先:第一个线程查询缓存,发现数据已逻辑过期,于是给这个数据上一把锁,然后开辟一个新的线程
,让这个线程去更新数据,然后自己干啥呢,就是拿着缓存中旧的数据,不断响应客户端
然后那个新的线程更新数据之后,就会释放锁,这时候缓存中的数据就是最新的了
如果又来第二个线程,获取锁失败了会怎么办?这时候也不忙等,就会马上返回旧的数据,不断响应客户端
互斥锁和逻辑过期两种方式的对比
解决方案 | 优点 | 缺点 |
---|---|---|
互斥锁 | 1. 没有额外的内存消耗,不需要保持逻辑过期时间2.保持一致性3.实现起来简单 | 1.线程需要等待,忙等导致CPU利用率低2.可能在极端情况下发生死锁 |
逻辑过期 | 1.线程无需忙等,性能好,CPU利用率高 | 1.不保证数据的一致性2.有额外的内存消耗3.实现起来比较复杂 |
使用互斥锁解决缓存击穿问题
首先需要解决的问题是,如何设计这个锁?
在redis中,有一个操作指令叫做setnx key value
,这个操作指令是只有当key不存在的时候,才能够修改value的值,如果key已经存在,那么就无法修改这个值
我们在使用互斥锁的这种思路中,一种很重要的观点就是先到者加锁
,setnx key value
能够很好地帮助我们实现这个效果,因为这个key在第一次产生之后就无法修改。
当然这种方法是完全可能导致死锁的,这是因为key
如果没有人去删除,或者操作的线程删除失败之后,key永远无法被删除,因此一个办法就是设置一个过期时间,防止死锁
public Shop queryWithMutex(Long id){
//从redis中查询是否有缓存
String shopKey = RedisConstants.CACHE_SHOP_KEY+id.toString();
String shopJson = stringRedisTemplate.opsForValue().get(shopKey);
//缓存命中
if(StrUtil.isNotBlank(shopJson)){//有商品数据才为true
log.info("店铺查询:缓存命中");
return JSONUtil.toBean(shopJson, Shop.class);
}
//命中的是否是空值
if(shopJson != null){
//返回错误信息
return null;
}
Shop shop = null;
String lockKey = RedisConstants.LOCK_SHOP_KEY+id;
try{
//缓存不命中
//实现缓存重建
String shopCache = stringRedisTemplate.opsForValue().get(shopKey);
if(!tryLock(lockKey)){
//休眠并重试
//查询redis中是否有数据,它重试大概20次
int retryTime = 50;
while ((retryTime--)>0){
shopCache = stringRedisTemplate.opsForValue().get(shopKey);
if(StrUtil.isNotBlank(shopCache)){
//redis中有数据存在,直接
return JSONUtil.toBean(shopCache,Shop.class);
}else{
Thread.sleep(5);
}
}
return null;
}else{
//再次查询redis
shopCache = stringRedisTemplate.opsForValue().get(shopKey);
if(StrUtil.isNotBlank(shopCache)){
//redis中有数据存在,直接
return JSONUtil.toBean(shopCache,Shop.class);
}
//查询数据库
shop = getById(id);
//数据库中没有信息,报错
if(shop == null){
//写入redis
stringRedisTemplate.opsForValue().set(shopKey,"",RedisConstants.CACHE_NULL_TTL,TimeUnit.MINUTES);
return null;
}
//数据库中有信息,写回redis
stringRedisTemplate.opsForValue().set(shopKey,JSONUtil.toJsonStr(shop),RedisConstants.CACHE_SHOP_TTL, TimeUnit.MINUTES);
}
}catch (Exception e){
e.printStackTrace();
}finally {
releaseLock(lockKey);
}
return shop;
}
/**
*
* @param key
* @return
*/
private boolean tryLock(String key){
Boolean tryRes = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
//它这里会做拆箱的,因为你的tryRes为包装类,而你的返回值为基本数据类型
return BooleanUtil.isTrue(tryRes);
}
/**
* 释放锁
* @param key
*/
private void releaseLock(String key){
stringRedisTemplate.delete(key);
}
使用逻辑过期解决缓存击穿问题
private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);
public Shop queryWithLogical(Long id){
//提交商铺id
//从redis中查询缓存
String shopJson = stringRedisTemplate.opsForValue().get(id);
if(StrUtil.isBlank(shopJson)){
//没有数据,证明不是热点商品,直接删除
return null;
}
//如果命中,那么证明就是热点商品,需要将json转为bean,并且判断过期时间
RedisData redisData = JSONUtil.toBean(shopJson, RedisData.class);
//未过期,直接返回
JSONObject data = (JSONObject) redisData.getData();
Shop shop = JSONUtil.toBean(data, Shop.class);
LocalDateTime expireTime = redisData.getExpireTime();
if(!expireTime.isBefore(LocalDateTime.now())){//未过期
return shop;
}
//过期
//缓存重建
//获取互斥锁
String lockKey = RedisConstants.LOCK_SHOP_KEY+id;
//判断是否获取锁成功
boolean isGetLock = tryLock(lockKey);
//获取成功,开启线程执行缓存重建
if(isGetLock){
try{
CACHE_REBUILD_EXECUTOR.submit(()->{
saveShop2Redis(id,30L);
});
}catch (Exception e){
e.printStackTrace();
}finally {
releaseLock(lockKey);
}
}
//获取失败||开启线程后,返回过期的商品信息
return shop;
}
3.6 缓存工具类封装
基于StringRedisTemplate
封装为一个缓存工具类,满足下列需求
- 将任意的Java对象序列化为json并存储在
String
类型的key中,并且可以设置TTL过期时间 - 将任意的Java对象序列化为json并存储在
String
类型的key中,并且可以设置逻辑过期时间,用于处理缓存击穿的问题 - 根据指定的key查询缓存,并反序列化为指定类型,利用缓存空值的方式解决穿透问题
- 根据指定的key查询缓存,并反序列化为指定类型,需要利用逻辑过期解决缓存击穿的问题
package com.hmdp.utils;
@Component
@Slf4j
public class RedisClient {
@Resource
private StringRedisTemplate stringRedisTemplate;
private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);
/**
* 实现的是Redis管理过期
* @param key
* @param val
* @param time
* @param timeUnit
*/
public void set(String key, Object val, Long time, TimeUnit timeUnit){
//1.序列化为Json对象
log.info("缓存参数,key:{},val:{},time:{},timeUnit:{}",key,val.toString(),time,timeUnit);
stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(val),time,timeUnit);
}
/**
* 实现逻辑过期,解决缓存击穿问题
* @param key:键的名称
* @param val:键值存储的数据对象
* @param time:时间数
* @param timeUnit:时间单位
*/
public void setWithLogicalExpire(String key, Object val, Long time, TimeUnit timeUnit){
//1.序列化为Json对象
log.info("缓存参数,key:{},val:{},time:{},timeUnit:{}",key,val.toString(),time,timeUnit);
//2.存储数据的时候,给对象添加字段
RedisData redisData = new RedisData();
redisData.setData(val);
redisData.setExpireTime(LocalDateTime.now().plusSeconds(timeUnit.toSeconds(time)));
stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(redisData));
}
/**
* 解决缓存穿透问题:避免不存在的key导致压力压垮数据库
* @param keyPrefix:key在redis中的业务前缀
* @param id:id值,请注意,如果你的id不是Long或者String,请确保你的这个对象能够在toString()后提供正确的值
* @param type:你的类的对象,因为通过泛型无法推断出来
* @param <R>:返回对象
* @param <ID>:ID类型
* @return
*/
public <R,ID> R queryWithPassThrough(
String keyPrefix, ID id, Class<R> type, Function<ID,R> dbFallback,Long notNullTime,Long nullTime,TimeUnit timeUnit) {
log.info("缓存参数:keyPrefix{},id:{},type:{}", keyPrefix, id, type.getSimpleName());
//1.组装key
String key = keyPrefix + id;
//2.在redis中查询key是否存在
String RJson = stringRedisTemplate.opsForValue().get(key);
if (StrUtil.isNotBlank(RJson)) {
//3.如果存在,那么就直接反序列化后返回
return JSONUtil.toBean(RJson, type);
}
//4.否则的话就是不存在
//然后我们判断命中的是不是空值
if (RJson != null) {
//就是空值,是我们设置的空对象
return null;
}
//5.redis中确实没有这个数据,去数据库中查询
//这里的话无法做到,我们需要调用者给我们设定一段查询数据的逻辑,让调用者传入
R result = dbFallback.apply(id);
//6.如果数据库中也没有,那么缓存一个空对象
if (result == null) {
this.set(key, "", nullTime, timeUnit);
return null;
}
//7.否则的话返回,并且将数据写入到redis
this.set(key, result, notNullTime, timeUnit);
return result;
}
public <R,ID> R queryWithLogical(String keyPrefix, ID id, Class<R> type, Function<ID,R> dbFallBack, Long time,TimeUnit timeUnit){
//提交商铺id
//从redis中查询缓存
String key = keyPrefix+id;
String RJson = stringRedisTemplate.opsForValue().get(key);
if(StrUtil.isBlank(RJson)){
//没有数据,证明不是热点商品,直接删除
return null;
}
//如果命中,那么证明就是热点商品,需要将json转为bean,并且判断过期时间
RedisData redisData = JSONUtil.toBean(RJson, RedisData.class);
JSONObject data = (JSONObject) redisData.getData();
R result = JSONUtil.toBean(data, type);
LocalDateTime expireTime = redisData.getExpireTime();
if(!expireTime.isBefore(LocalDateTime.now())){//未过期
return result; //未过期,直接返回
}
//过期
//缓存重建
//获取互斥锁
String lockKey = RedisConstants.LOCK_SHOP_KEY+id;
//判断是否获取锁成功
boolean isGetLock = tryLock(lockKey);
//获取成功,开启线程执行缓存重建
if(isGetLock){
try{
CACHE_REBUILD_EXECUTOR.submit(()->{
//先查询数据库
R queryFromDB = dbFallBack.apply(id);
//在写入redis
this.set(key,queryFromDB,time,timeUnit);
});
}catch (Exception e){
e.printStackTrace();
}finally {
releaseLock(lockKey);
}
}
//获取失败||开启线程后,返回过期的商品信息
return result;
}
/**
*
* @param key
* @return
*/
private boolean tryLock(String key){
Boolean tryRes = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
//它这里会做拆箱的,因为你的tryRes为包装类,而你的返回值为基本数据类型
return BooleanUtil.isTrue(tryRes);
}
/**
* 释放锁
* @param key
*/
private void releaseLock(String key){
stringRedisTemplate.delete(key);
}
}