Redis应用实战
(登录凭证:网页请求服务器后,服务器会将SESSIONID自动写入到浏览器当中,每一个session都有唯一的一个sessionID,而cookie携带session。而后每次网页的请求都会携带一个Cookie,Cookie中都会携带唯一的SESSIONID,然后去获取对应的用户数据)
后端服务器视图层接收请求(@RestController("/user"))
调用业务层处理参数: userService.sendCode(phone,session);
业务层:
视图层调用业务层
业务层根据手机号查询用户,若存在直接登录,不存在创新新用户(这里密码可以置空,用户在没有设置密码前可以使用手机短信登录,昵称默认随机字符串,CSDN手机注册便是如此)
问题:每次访问xxxController都要进行一次登录验证,那么就要进行很多次登录验证,因此需要使用拦截器。将拦截器拦截到的用户传到Controller当中去,而且还需要考虑线程问题。
拦截器
WebMVCConfig
ThreadLocal使用一个类包装,然后用静态修饰
在查看我的资料时,请求/user/me返回的数据包含密码
其实查看的时候,只需要返回手机号,账号等信息即可,密码不需要;而且还有一点,将从数据库查询到user对象存储的session中,数据越完整,使用是会更方便,但同时内存压力也越大。
解决办法,在UserServiceImplements处理login时,将User对象的一部分信息复制到一个UserDTO当中去(其他相应部分也作类型转换)
问题一:将验证码存储到session改成存储到redis中,redis采用哪种数据结构存储,采用String类型即可;而key不能为SESSIONID,因为SESSIONID是每一台Tomcat服务器独有,当请求切换到不同的Tomcat服务器时,SESSIONID就会改变,也就说明访问redis数据库时不能取出相同的验证码。因此,用手机号Phone作为key。
问题二:将用户信息存储到session改成存储到redis中,redis采用哪种数据结构,采用Hash结构,因为Hash结构可以将对象中的每个字段独立存储,也就是说可以针对单个字段做CRUD,比较灵活,并且内存占用更少(如果用String类型,value是可以采用JSON格式保存,比较直观,但是对象数据一旦很长,就会有产生很多符号,给内存造成额外的消耗);
问题三:存放对象的key应该设置为什么以保证唯一性?不建议采用手机号,建议采用随机token作为key存储用户数据。
问题四:基于Session短信登录的校检登录状态,登录凭证是网页请求中携带的Cookie中的SESSIONID。那使用Redis存储数据后,那么token作为登录凭证,但是Tomcat不会自动将token自动写入到浏览器中,因此我们要将token手动返回给前端,然后客户端(浏览器)将token保存下来,于是每次请求网页都会携带一个token。
简单介绍浏览器中的token,和每次网页请求携带的token
这是登录页面的登录函数,请求收到的数据(axios.post....then(({data}) =>))中的data就是上面请求login后,访问redis数据库返回的token,然后前端(浏览器的一种存储方式)sessionStorage.setItem("token",data)保存token。
然后每次发送异步axios请求,拦截器都会将token放进请求头'authorization'中,至此,请求中也就携带了token。
从上面的简介token中,也就可以解释为什么不能将手机号作为token了,这样将手机号作为token保存在浏览器会使得个人信息不太安全,容易泄露。
(登录凭证:网页请求中的请求头中携带的token)
发送短信验证码((添加业务前缀来区分,设置key的有效期,减少数据库内存消耗) set key value ex 120)
短信验证码登录、注册
从redis中获取手机验证码,与请求中formData中的验证码进行对比,判断
生成一个token,作为用户数据的key,使用UUID生成
基于formData.phone查询数据库,判断用户是否存在,不存在则创建用户,初始化一些基本信息
利用hutool工具类,将获取到的用户数据user转换成UserDTO类型,然后再将UserDTO对象转换成HashMap对象存放至Redis数据库中
将UserDTO的HashMap对象以Hash类型存入redis中时,需要设置有效期
因为StringRedisTemplate将HashMap对象存放入数据库时,需要value值都是String,当出现不是value值的字段时会报错
解决办法:第一种是自己手写将用户类型数据转换成HashMap,第二种是如下:
需要注意一点:很重要,就是拦截器中的StringRedisTemplate对象不能使用注解@Autowire或者@Resource,因为这个拦截器类不是Spring容器自己创建的(比如视图类Controller都有@RestController等注解),而是手动创建的,不带注解。因此我们可以使用构造函数,将StringRedisTemplate作为参数传进去,而这个StringRedisTemplate就由谁用了这个拦截器,就由谁创建,这里是(WebMVCConfig,带注解的,帮我们使用依赖注入StringRedis)。
获取请求中的请求头token,判断请求头是否存在
基于token获取redis中的用户数据HashMap,判断用户数据是否存在
利用hutool工具类,将HashMap用户数据转换成UserDTO
将UserDTO保存至ThreadLocal当中
有效期需要动态更新,可以通过拦截器来实现,拦截器每次截取到,利用请求中的请求头所带的token,重新设置用户数据的有效期
放行
拦截登录器的优化
因为上一步是在拦截器中刷新了token的有效期,但是否访问每个页面,也就是每个请求都能刷新token有效期呢,答案是否定的。因此不是每个请求都在拦截器拦截的路径范围中!
解决办法:在已有的拦截器情况下,再添加一个拦截器,拦截路径为全部。
添加ReflashTokenInterceptor
修改LoginInterceptor
WebMVCConfig修改,并设置拦截器的优先级
拦截器在被添加到mvcconfig后,会被注册为一个InterceptorRegisteration,它有一个默认属性order为0,在不设置order情况下,多个拦截器执行的顺序就是拦截器添加先后的顺序。为了严谨,可以设置order属性。order值越大,执行优先级越低。
什么是缓存:数据交换的缓冲区(cache),是存贮数据的临时地方,一般读写性能比较高。
缓存模型
缓存步骤流程
代码实现缓存流程
视图层ShopController
@GetMapping("/{id}")
public Result queryShopById(@PathVariable("id") Long id) {
return shopService.queryById(id);
}
业务层ShopServiceImpl
@Service
public class ShopServiceImpl extends ServiceImpl<ShopMapper, Shop> implements IShopService {
@Resource
private StringRedisTemplate stringRedisTemplate;
@Override
public Result queryById(Long id) {
// 1.从redis查询商铺缓存(这里采用String类型,然后要确保店铺id的唯一)
String shopJson = stringRedisTemplate.opsForValue().get(SystemConstants.SHOP_KEY_PRE + id);
// 2.判断商铺是否存在
if (StrUtil.isNotBlank(shopJson)) {
// 3.存在,将从redis去到的json字符串转换成shop对象返回
Shop shop = JSONUtil.toBean(shopJson, Shop.class);
return Result.ok(shop);
}
// 4.不存在,根据id查询数据库
Shop shop = getById(id);// 这是MP中继承了ServiceImpl中有的方法
if (shop == null) {
// 5.数据库不存在商铺信息,返回错误
return Result.fail("店铺不存在!");
}
// 6.存在,将商铺信息写入到redis(要将shop对象转成json字符串)
String jsonStr = JSONUtil.toJsonStr(shop);
stringRedisTemplate.opsForValue().set(SystemConstants.SHOP_KEY_PRE + id, jsonStr);
// 7.返回
return Result.ok(shop);
}
}
主动更新策略
解释:
第一种发生极端情况发生的概率较大。因为redis写入数据是微秒级别的,相较于数据库写入是非常快的。在更新数据库时,用时较长,这时其他线程有可能抢到CPU使用权,进行查询缓存,但是线程1已经删除了缓存,所以线程2也去查询数据库,但是线程1 更新的数据还没有提交,因此线程2 查到的数据还是旧数据,再次将旧数据写入缓存。
第二种情况发生的概率较小。因为线程2先更新数据库,然后删除缓存(redis中读写速度快,所以在删除缓存时线程1争夺CPU使用权的概率较小)。所以即使在查询数据库时,数据读取到了,但是还没写入到缓存,线程2这时抢到了CPU使用权更新数据库,删除缓存,所以线程1读到的是旧数据,但是我在写入缓存的时候设置TTL过期兜底,这样就能有效减少旧数据的影响。
作业
@RestController
@RequestMapping("/shop-type")
public class ShopTypeController {
@Resource
private IShopTypeService typeService;
@GetMapping("list")
public Result queryTypeList() {
// 相当于select * from tb_shop_type order by sort asc
// TODO 修改代码,实现缓存店铺
List<ShopType> typeList = typeService
.query().orderByAsc("sort").list();
return Result.ok(typeList);
}
}
视图层修改
业务层修改
@Resource
private StringRedisTemplate stringRedisTemplate;
@Override
public List<ShopType> queryForList() {
// 1.从redis中查询商铺种类缓存(这里商铺种类信息的值采用String类型,分层结构,通过模糊查询寻找所有相关key)
// 1.1先获取keySet(Redis: KEYS SHOP_TYEP_KEY_PRE*....)
Set<String> keys = stringRedisTemplate.keys(SystemConstants.SHOP_TYEP_KEY_PRE + "*");
List<ShopType> shopTypeList = null;
if(keys != null){
shopTypeList = new ArrayList<>();
// 1.2根据keySet来逐个寻找商铺种类信息
for(String key : keys){
String shopTypeJson = stringRedisTemplate.opsForValue().get(key);
// 2.判断商铺种类是否存在
// 将String转换成ShopType
if(StrUtil.isNotBlank(shopTypeJson)){
// 3.redis存在商铺种类信息,将从redis取到的HashMap转为ShopType类型,借助hutool工具插件
ShopType shopType = JSONUtil.toBean(shopTypeJson, ShopType.class);
shopTypeList.add(shopType);
}
}
} else{
// 4.redis不存在商铺种类信息,查询数据库select * from tb_shop_type order by sort asc
shopTypeList = query().orderByAsc("sort").list();
// 5.数据库不存在商铺种类信息,返回错误信息
// return Result.fail("商铺种类暂未存储在数据库中!");
// 6.数据库存在商铺种类信息,将商铺种类信息写入到redis中(从集合中取出ShopType对象,然后转成字符串,逐条存放)
if(shopTypeList != null){
for(int i = 0; i < shopTypeList.size(); i++){
ShopType st = shopTypeList.get(i);
String json_st = JSONUtil.toJsonStr(st);
//尾编号为shopType中的sort属性值
stringRedisTemplate.opsForValue().set(SystemConstants.SHOP_TYEP_KEY_PRE + st.getSort(), json_st);
}
}
}
// 7.返回
return shopTypeList;
}
修改ShopController中的业务逻辑,满足下面需求
根据id查询店铺时,如果缓存未命中,则查询数据库,将数据库结果写入缓存,并设置超时时间
@Service
public class ShopServiceImpl extends ServiceImpl<ShopMapper, Shop> implements IShopService {
@Resource
private StringRedisTemplate stringRedisTemplate;
@Override
public Result queryById(Long id) {
// 1.从redis查询商铺缓存(这里采用String类型,然后要确保店铺id的唯一)
String shopJson = stringRedisTemplate.opsForValue().get(SystemConstants.SHOP_KEY_PRE + id);
// 2.判断商铺是否存在
if (StrUtil.isNotBlank(shopJson)) {
// 3.存在,将从redis去到的json字符串转换成shop对象返回
Shop shop = JSONUtil.toBean(shopJson, Shop.class);
return Result.ok(shop);
}
// 4.不存在,根据id查询数据库
Shop shop = getById(id);
if (shop == null) {
// 5.数据库不存在商铺信息,返回错误
return Result.fail("店铺不存在!");
}
// 6.存在,将商铺信息写入到redis(要将shop对象转成json字符串)
String jsonStr = JSONUtil.toJsonStr(shop);
// 6.1设置TTL超时剔除
stringRedisTemplate.opsForValue().set(SystemConstants.SHOP_KEY_PRE + id, jsonStr, SystemConstants.SHOP_TTL, TimeUnit.MINUTES);
// 7.返回
return Result.ok(shop);
}
}
根据id修改店铺时,先修改数据库,再删除缓存(更新缓存有可能造成太多无效写入)
@Override
@Transactional //添加事务,确保数据库与缓存操作原子性
public Result update(Shop shop) {
// 判断shop是否存在
Long shopId = shop.getId();
if(shopId == null){
// 返回错误
return Result.fail("店铺id不能为空");
}
System.out.println("Test, ShopId: " + shopId);
// 1.更新数据库
updateById(shop); // 需要注意不能直接使用MP内部的update(T t),因为其可能没有排除null的字段,而传入的t可能有部分数据为null
// 2.删除缓存
stringRedisTemplate.delete(SystemConstants.SHOP_KEY_PRE + shopId);
return Result.ok();
}
缓存穿透:是指客户端请求的数据在缓存中额数据库中都不存在,这样缓存永远不会生效,这些请求都会打到数据库。所以有些不怀好意的人可能会利用该穿透,并发向服务器发送一些不存在的数据,这样可能会搞垮数据库。
缓存空对象缺点详细描述:首先有可能造成大量额外的内存消耗,因为可能会有人恶意大量请求不存在的数据;第二点,有可能刚请求完返回了Redis的空数据,而这时数据库插入了该条数据,导致数据库与Redis数据的不一致。可以设置TTL,减少这种额外内存消耗和错误的影响。
布隆过滤器:使用二进制形式存放数据库中对数据哈希后的哈希值,但这是概率算法,如果不用过滤器拒绝,则表示数据库真的不存在该数据,但过滤器放行了,数据库不一定存在该数据,所以还是存在穿透风险。
编码解决商铺查询的缓存穿透问题
将空值写入Redis
StrUtil。isNotBlank(String str)
判断命中的是否是空值
@Resource
private StringRedisTemplate stringRedisTemplate;
@Override
public Result queryById(Long id) {
// 1.从redis查询商铺缓存(这里采用String类型,然后要确保店铺id的唯一)
String shopJson = stringRedisTemplate.opsForValue().get(SystemConstants.SHOP_KEY_PRE + id);
// 2.判断商铺是否存在
if (StrUtil.isNotBlank(shopJson)) {
// 3.存在,将从redis去到的json字符串转换成shop对象返回
Shop shop = JSONUtil.toBean(shopJson, Shop.class);
return Result.ok(shop);
}
// --穿透解决,判断命中的是否是空值
if(shopJson != null){
//返回错误信息
return Result.fail("店铺信息不存在!");
}
// 4.不存在,根据id查询数据库
Shop shop = getById(id);
if (shop == null) {
// 将空值写入缓存
stringRedisTemplate.opsForValue().set(SystemConstants.SHOP_KEY_PRE + id, "", SystemConstants.SHOP_NULL_TTL, TimeUnit.MINUTES);
// 5.数据库不存在商铺信息,返回错误
return Result.fail("店铺不存在!");
}
// 6.存在,将商铺信息写入到redis(要将shop对象转成json字符串)
String jsonStr = JSONUtil.toJsonStr(shop);
// 6.1设置TTL超时剔除
stringRedisTemplate.opsForValue().set(SystemConstants.SHOP_KEY_PRE + id, jsonStr, SystemConstants.SHOP_TTL, TimeUnit.MINUTES);
// 7.返回
return Result.ok(shop);
}
缓存击穿和缓存雪崩的区别就在于:缓存雪崩是同一时间段大量key同时失效或redis服务宕机,导致给数据库造成巨大压力;而缓存击穿是某些热点(一个可以被高并发访问并且缓存重建业务较复杂,耗时较长)的key在同一时间段突然失效了,无数的请求访问会在瞬间给数据库带来巨大的冲击。比如上述图片,缓存查询(读操作)未命中,所需时间较短,但是查询数据库(其中需要的准备)的时间较长,可能其他线程也在查询未命中的情况下查询数据库并重建缓存,导致数据库垮掉。
解决击穿的解决方案
添加互斥锁,缓存未命中的线程只有获取到互斥锁才可以查询数据库,重建缓存数据。但需要等待线程结束后另一个线程才能获得锁查询,效率较低。
业务层代码实现
在Redis中,有一条指令跟互斥锁十分相似,那就是SETNX key value,通过SETNX lock 1来设置一把互斥锁,然后获取锁的指令不是GET lock,而是SETNX lock 1 (1000),有返回值表明获得锁(该值一开始不存在,因此能创建,转换为该锁没有人占用,因此能获得),释放锁的指令就是删除锁DEL lock,然后其他线程就能创建该锁(获得该锁)。为防止意外情况导致锁没有释放,可以在设置锁的时候添加一个有效期。
//使用redis中的SETNX key value digital timeUnit来模拟获得锁
private boolean tryLock(String key){
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
//不能直接返回,有可能null的时候自动装箱,然后就变成false
return BooleanUtil.isTrue(flag);
}
//使用redis中的DEL key来模拟释放锁
private void unlock(String key){
stringRedisTemplate.delete(key);
}
//封装缓存击穿的方法
public Shop queryWithMutex(Long id){
// 1.从redis查询商铺缓存(这里采用String类型,然后要确保店铺id的唯一)
String shopJson = stringRedisTemplate.opsForValue().get(SystemConstants.SHOP_KEY_PRE + id);
// 2.判断商铺是否存在
if (StrUtil.isNotBlank(shopJson)) {
// 3.存在,将从redis去到的json字符串转换成shop对象返回
return JSONUtil.toBean(shopJson, Shop.class);
}
// --穿透解决,判断命中的是否是空值
if(shopJson != null){ // shopJson = "";
//返回错误信息
System.out.println("=====设置空值解决缓存穿透=====");
return null;
}
// 4 实现缓存重建
String lockKey = SystemConstants.REDIS_LOCK_PRE + id;
Shop shop = null;
try {
// 4.1 获取互斥锁
boolean lock = tryLock(lockKey);
// 4.2 判断获取互斥锁是否成功
if(!lock){
// 4.3 失败,休眠并重试
Thread.sleep(50);
// 递归重试
return queryWithMutex(id);
}
// 4.4 成功,查询数据库,重建缓存
shop = getById(id);
// 模拟重建延时,实现模拟重建数据库需要较长时间,容易引发线程并发安全问题
Thread.sleep(200);
if (shop == null) {
// 将空值写入缓存,解决缓存穿透问题
stringRedisTemplate.opsForValue().set(SystemConstants.SHOP_KEY_PRE + id, "", SystemConstants.SHOP_NULL_TTL, TimeUnit.MINUTES);
// 5.数据库不存在商铺信息,返回错误
System.out.println("=====不存在该商品信息======");
return null;
}
// 6 存在,将商铺信息写入到redis(要将shop对象转成json字符串)
String jsonStr = JSONUtil.toJsonStr(shop);
// 6.1 设置TTL超时剔除
stringRedisTemplate.opsForValue().set(SystemConstants.SHOP_KEY_PRE + id, jsonStr, SystemConstants.SHOP_TTL, TimeUnit.MINUTES);
} catch (InterruptedException e){
//这异常直接抛出,因为是打断的异常,不需要管
throw new RuntimeException(e);
} finally {
// 7 释放互斥锁
unlock(lockKey);
}
// 8 返回
return shop;
}
@Resource
private StringRedisTemplate stringRedisTemplate;
@Override
public Result queryById(Long id) {
// 缓存穿透
// Shop shop = queryWithPassThrough(id);
// 互斥锁解决缓存击穿
Shop shop = queryWithMutex(id);
if(shop == null){
return Result.fail("该商铺不存在!");
}
// 7.返回
return Result.ok(shop);
}
逻辑过期,在设置缓存的时候,不设置TTL,但是在key对应的value上添加expire字段值,就是在查询到该缓存时,查看expire值,通过判断expire值来判断该缓存是否过期,如果过期表明这是旧的数据,需要更新,但是更新操作通知另一个线程操作,也需要获得互斥锁,当前线程就先返回旧数据。这样客户端用户就不会因为查询数据库,重建缓存数据这步操作而等待了。
关于在value(Shop对象)中添加过期时间如果解决
关于在逻辑流程图中,从Redis查询商铺缓存,判断缓存未命中直接返回空,是因为热点数据需要提前导入进Redis。因此未命中就意味着该商铺不是热点商铺(活动中),直接返回空。
业务层代码实现
先创建一个方法,将商铺保存至Redis中
private void saveShopPre(Long id, Long expireSeconds) throws InterruptedException {
//根据id查询商铺
Shop shop = getById(id);
//模拟重建时间,增大线程并发安全问题概率
Thread.sleep(200);
//封装逻辑过期时间
RedisData redisData = new RedisData();
redisData.setData(shop);
redisData.setExpireTime(LocalDateTime.now().plusSeconds(expireSeconds));
//存入Redis
stringRedisTemplate.opsForValue().set(SystemConstants.SHOP_KEY_PRE + id, JSONUtil.toJsonStr(redisData));
}
然后开启一个线程池,里面有10个线程
// 线程池,存入10个线程
private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);
逻辑过期方法
//封装缓存击穿的方法,设置逻辑过期
public Shop queryWithLogicalExpire(Long id){
// 1.从redis查询商铺缓存(这里采用String类型,然后要确保店铺id的唯一)
String shopJson = stringRedisTemplate.opsForValue().get(SystemConstants.SHOP_KEY_PRE + id);
// 2.判断商铺是否存在
if (StrUtil.isBlank(shopJson)) {
// 3.不存在,直接返回null
return null;
}
// 4.命中,需要把json反序列化为对象
RedisData redisData = JSONUtil.toBean(shopJson, RedisData.class);
// 店铺信息
// Shop shop = (Shop) redisData.getData();
JSONObject data = (JSONObject) redisData.getData();
Shop shop = JSONUtil.toBean(data, Shop.class);
// 过期时间
LocalDateTime expireTime = redisData.getExpireTime();
// 5.判断是否过期
if(expireTime.isAfter(LocalDateTime.now())){
// 5.1未过期,直接返回店铺信息
return shop;
}
// 5.2已过期,需要缓存重建
// 6.缓存重建
// 6.1.获取互斥锁
String lockKey = SystemConstants.REDIS_LOCK_PRE + id;
boolean lock = tryLock(lockKey);
// 6.2.判断是否获取锁成功
if(lock){
// 6.3.成功,开启独立线程,实现缓存重建
CACHE_REBUILD_EXECUTOR.submit(() -> {
try{
// 调用预设热点商铺重建缓存
this.saveShopPre(id, 20L);
} catch (Exception e){
throw new RuntimeException(e);
} finally {
// 释放锁
unlock(lockKey);
}
});
}
// 6.4.没有获得锁,返回过期的商铺信息
return shop;
}
主方法
@Override
public Result queryById(Long id) {
// 设置空缓存解决缓存穿透
// Shop shop = queryWithPassThrough(id);
// 互斥锁解决缓存击穿
// Shop shop = queryWithMutex(id);
// 逻辑过期解决缓存击穿
Shop shop = queryWithLogicalExpire(id);
if(shop == null){
return Result.fail("该商铺不存在!");
}
// 7.返回
return Result.ok(shop);
}
解决击穿的两种方法优势与缺点的对比:
涉及新的知识点:函数式编程Function<T,R> function, function可以用lambda式子来表达:(id2) -> this.getById(id2)或者this::getByid
温故而知新,泛型的使用:修饰符 <T, T1, ...> 返回类型T 方法名(Class type, T1 id, T2 data, ...)
缓存穿透工具类
@Slf4j
@Component
public class CacheClient {
// @Resource
// private StringRedisTemplate stringRedisTemplate;
//为什么不用注入,CacheClient里面的四个方法,都是用在解决缓存穿透、击穿的Service层上,Service层
//上已经注入了一次,因此这里使用构造方法
private final StringRedisTemplate stringRedisTemplate;
public CacheClient(StringRedisTemplate stringRedisTemplate){
this.stringRedisTemplate = stringRedisTemplate;
}
//将任意Java对象序列化为json并存储在redis的String类型的key中,并且可以设置TTL过期时间
public void set(String key, Object value, Long time, TimeUnit unit){
//将java对象序列化为json
String valueStr = JSONUtil.toJsonStr(value);
//写入redis
stringRedisTemplate.opsForValue().set(key, valueStr, time, unit);
}
//将任意Java对象序列化为json并存储在redis的String类型的key中,并且可以设置逻辑过期时间
public void setWithLogicalExpire(String key, Object value, Long time, TimeUnit unit){
//给对象加上过期日期字段
RedisData redisData = new RedisData();
//利用TimeUnit的toSeconds(time),将time*unit转换成seconds,然后加上LocalDateTime.now()
redisData.setExpireTime(LocalDateTime.now().plusSeconds(unit.toSeconds(time)));
redisData.setData(value);
//将java对象序列化为json
String valueStr = JSONUtil.toJsonStr(redisData);
//写入redis
stringRedisTemplate.opsForValue().set(key, valueStr);
}
//解决缓存穿透,返回类型不确定,因此用泛型,首先在public后加上<R>,返回类型为R,然后参数里面加上推断的类型
//因为传入的id不确定类型,因此泛型也应该加上一个ID,参数里面id的类型也设置为ID
//在java中,有参数和返回值的函数叫做Function(函数式编程)
public <R,ID> R queryWithPassThrough(
String keyPreFix, ID id, Class<R> type, Function<ID,R> dbFallBack, Long time, TimeUnit unit){
String key = keyPreFix + id;
// 1.从redis查询商铺缓存
String json = stringRedisTemplate.opsForValue().get(key);
// 2.判断是否存在
if(StrUtil.isNotBlank(json)){//不是以下的内容""、"\t\n"、null
// 3.存在,直接返回
return JSONUtil.toBean(json, type);
}
// 判断命中的是否是空值,json != null&&json.length() == 0
if(json != null){
// 是空值,证明无效参数,返回错误信息
return null;
}
// 4.缓存未命中,根据id查询数据库
R r = dbFallBack.apply(id);
// 5.不存在,返回错误
if(r == null){
// 将空值写入redis
stringRedisTemplate.opsForValue().set(key,"",SystemConstants.SHOP_NULL_TTL, TimeUnit.MINUTES);
// 返回错误信息
return null;
}
// 6.存在,写入redis
this.set(key,r,time,unit);
// 7.返回
return r;
}
}
缓存击穿工具类
// 线程池,存入10个线程
private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);
//使用redis中的SETNX key value digital timeUnit来模拟获得锁
private boolean tryLock(String key){
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
//不能直接返回,有可能null的时候自动装箱,然后就变成false
return BooleanUtil.isTrue(flag);
}
//使用redis中的DEL key来模拟释放锁
private void unlock(String key){
stringRedisTemplate.delete(key);
}
//解决缓存击穿
public <R,ID> R queryWithLogicalExpire(
String keyPreFix, ID id, Class<R> type, Function<ID,R> dbFallBack, Long time, TimeUnit unit){
String key = keyPreFix + id;
// 1.从redis查询商铺缓存
String json = stringRedisTemplate.opsForValue().get(key);
// 2.判断商铺是否存在
if(StrUtil.isBlank(json)){
// 3.不存在,直接返回
return null;
}
// 4.命中,需要先把json反序列化为对象
RedisData redisData = JSONUtil.toBean(json, RedisData.class);
R r = JSONUtil.toBean((JSONObject) redisData.getData(), type);
LocalDateTime expireTime = redisData.getExpireTime();
// 5.判断是否过期
if(expireTime.isAfter(LocalDateTime.now())){
// 5.1未过期,直接返回店铺信息
return r;
}
// 5.2.已过期,需要缓存重建
// 6.缓存重建
// 6.1.获取互斥锁
String lockKey = SystemConstants.REDIS_LOCK_PRE + id;
boolean isLock = tryLock(lockKey);
// R r1 = null;
// 6.2.判断是否获取锁成功
if(isLock){
// 6.3.成功,开启独立线程,实现缓存重建
CACHE_REBUILD_EXECUTOR.submit(() -> {
try {
// 查询数据库
R r1 = dbFallBack.apply(id);
//写入redis
this.setWithLogicalExpire(key,r1,time,unit);
} catch(Exception e){
throw new RuntimeException(e);
} finally {
// 释放锁
unlock(lockKey);
}
});
}
// 7.没有获得锁,返回过期的商铺信息
return r; //千万要搞清楚,这里返回的是从缓存取出来的,而不是查询数据库得到的!
}
![image-20220330105719075](C:\Users\luzhipeng\AppData\Roaming\Typora\typora-user-images\image-20220330105719075.png)
为什么不使用表的自增,因为表的自增在多表业务下,自增可能会导致ID的重复,无法唯一识别,因此不能使用自增。
全局ID生成器:是一种在分布式系统下用来生成全局唯一ID的工具,一般要满足下列特性:唯一性、高可用、高性能、递增性、安全性。
唯一性:可以利用Redis数据库来实现全局ID生成,因为Redis中String类型有一个方法INCR,让一个整型自增1
高可用:Redis中的集群方案、主从方案、哨兵方案
高性能:Redis读写性能比MySQL好太多
递增性:可以使用Redis中的INCRBY、INCRBYFLOAT来自增长,减少规律性
安全性:为了增加ID的安全性,我们可以不直接使用Redis自增的数值,而是拼接一些其他的信息,让规律性不那么明显。为了提高数据库性能,ID会使用数值类型(转为Bit,编码使用二进制,可以存储更多的信息)
全局唯一ID生成策略:
例如Redis自增ID策略:时间戳 + 自增ID,使用java的long类型,8字节-64bit,Redis自增的上限值是(-2^54~2^64-1)
在Util工具包下定义一个RedisIDWorker类,加上@Component标签使他成为Spring容器中管理的Bean
开始时间戳的秒数生成及定义
定义一个nextId方法,参数为String keyPreFix,表示redis中不同业务的key对应的唯一Id,自增长可对对应的key进行操作
@Component
public class RedisIdWorker {
//初始时间戳
private static final long BEGIN_TIMESTAMP = 1640995200L;
//序列号的位数
private static final int COUNT_BITS = 32;
@Resource
private StringRedisTemplate stringRedisTemplate;
public Long nextId(String keyPrefix){
// 完整全局Id: 符号位1位,时间戳31位,序列号32位
// 1.生成当前时间戳
LocalDateTime now = LocalDateTime.now();
long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
Long timestamp = nowSecond - BEGIN_TIMESTAMP;
// 2.生成序列号(注意更新key,不能永远使用同一个key,虽然Redis的自增上限值为2^64,但实际用于记录序列号的只有32位,可以在key后面拼上时间戳)
// 2.1.获取当前日期,精确到天,根据年月日分层,可以便于统计每天、月、年的销量
String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
// 2.2.自增长,如果key不存在,则会自动创建
Long count = stringRedisTemplate.opsForValue().increment("incr:" + keyPrefix + ":" + date);
// 3.拼接并返回
return timestamp << COUNT_BITS | count;
}
//如何生成时间戳的秒数,利用LocalDateTime
public static void main(String[] args) {
//利用LocalDateTime的静态方法,设置日期时间
LocalDateTime time = LocalDateTime.of(2022, 1, 1, 0, 0);
//获取日期时间对应的时间戳
long second = time.toEpochSecond(ZoneOffset.UTC);
//输出秒数
System.out.println("second: " + second);
}
}
多线程测试,使用线程池
@Autowired
private RedisIdWorker redisIdWorker;
private ExecutorService es = Executors.newFixedThreadPool(500);
//多线程测试
@Test
void testIdWorker() throws InterruptedException {
// 新建一个线程结束计数器,传入需要线程的个数
CountDownLatch latch = new CountDownLatch(300);
Runnable task = () -> {
for (int i = 0; i < 100; i++) {
Long id = redisIdWorker.nextId("order");
System.out.println(id);
}
//表示结束一个线程
latch.countDown();
};
long start = System.currentTimeMillis();
for (int i = 0; i < 300; i++) {
es.submit(task);
}
//等待所有线程结束
latch.await();
long end = System.currentTimeMillis();
System.out.println("生成900个id总共耗时: " + (long)(end - start) + "ms");
}
数据库两张表的定义:
数据库添加优惠券
voucher:除了优惠券的字段,还包括秒杀券的字段
@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
@TableName("tb_voucher")
public class Voucher implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 主键
*/
@TableId(value = "id", type = IdType.AUTO)
private Long id;
/**
* 商铺id
*/
private Long shopId;
/**
* 代金券标题
*/
private String title;
/**
* 副标题
*/
private String subTitle;
/**
* 使用规则
*/
private String rules;
/**
* 支付金额
*/
private Long payValue;
/**
* 抵扣金额
*/
private Long actualValue;
/**
* 优惠券类型
*/
private Integer type;
/**
* 优惠券类型
*/
private Integer status;
/**
* 库存
*/
@TableField(exist = false)
private Integer stock;
/**
* 生效时间
*/
@TableField(exist = false)
private LocalDateTime beginTime;
/**
* 失效时间
*/
@TableField(exist = false)
private LocalDateTime endTime;
/**
* 创建时间
*/
private LocalDateTime createTime;
/**
* 更新时间
*/
private LocalDateTime updateTime;
}
VoucherService:添加秒杀券的步骤,先将Voucher对象保存至数据库,然后再根据Voucher对象的秒杀基本信息新建秒杀对象存放至数据库
/**
* 新增秒杀券
* @param voucher 优惠券信息,包含秒杀信息
* @return 优惠券id
*/
@PostMapping("seckill")
public Result addSeckillVoucher(@RequestBody Voucher voucher) {
// Voucher对象里面还包含秒杀券的基本信息字段
voucherService.addSeckillVoucher(voucher);
// 返回秒杀券(优惠券)的id,传到前端页面,方便后续抢券操作
return Result.ok(voucher.getId());
}
@Override
@Transactional
public void addSeckillVoucher(Voucher voucher) {
// 保存优惠券
save(voucher);
// 保存秒杀信息
SeckillVoucher seckillVoucher = new SeckillVoucher();
seckillVoucher.setVoucherId(voucher.getId());
seckillVoucher.setStock(voucher.getStock());
seckillVoucher.setBeginTime(voucher.getBeginTime());
seckillVoucher.setEndTime(voucher.getEndTime());
seckillVoucherService.save(seckillVoucher);
}
因为IDEA中没有管理端,只能通过APIPOST来添加秒杀券
用户在商铺页面抢购优惠券
优惠券秒杀下单(购买秒杀券)代码实现
在实现用户下单(购买秒杀券)时需要判断两点:
秒杀是否开始或结束,如果尚未开始或已经结束则无法下单
库存是否充足,不足则无法下单
视图层
@RestController
@RequestMapping("/voucher-order")
public class VoucherOrderController {
@Resource
private IVoucherOrderService voucherOrderService;
@PostMapping("seckill/{id}")
public Result seckillVoucher(@PathVariable("id") Long voucherId) {
return voucherOrderService.secKillVoucher(voucherId);
}
// @PostMapping("seckill/{id}")
// public Result seckillVoucher(@PathVariable("id") Long voucherId) {
// return Result.fail("功能未完成");
// }
}
业务层
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
//注入Id生成器
@Resource
private RedisIdWorker redisIdWorker;
//注入秒杀券服务
@Resource
private ISeckillVoucherService seckillVoucherService;
@Override
@Transactional //添加事务注解,保证原子性(涉及两张表的改变)
public Result secKillVoucher(Long voucherId) {
// 1.查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2.判断秒杀是否开始
if(voucher.getBeginTime().isAfter(LocalDateTime.now())){
// 秒杀未开始,返回
return Result.fail("秒杀活动尚未开始!");
}
// 3.判断秒杀是否已经结束
if(voucher.getEndTime().isBefore(LocalDateTime.now())){
// 秒杀已经结束,返回
return Result.fail("秒杀活动已经结束!");
}
// 4.判断库存是否充足
Integer stock = voucher.getStock();
if(stock <= 0){
// 库存不足
return Result.fail("秒杀券已经被抢光!");
}
// 5.扣减库存
// voucher.setStock(stock - 1);
// seckillVoucherService.updateById(voucher);
boolean flag = seckillVoucherService.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherId).update();
if(!flag){
// 扣减失败
return Result.fail("秒杀券已经被抢光!");
}
// 6.创建订单
VoucherOrder voucherOrder = new VoucherOrder();
// 6.1.订单id
Long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
// 6.2.用户id
Long userId = UserHolder.getUser().getId();
voucherOrder.setUserId(userId);
// 6.3.代金券id
voucherOrder.setVoucherId(voucherId);
// 7.将订单写入数据库
this.save(voucherOrder);
// 8.返回订单id
return Result.ok(orderId);
}
}
库存超卖问题分析
多线程并发问题,解决方法,加锁(悲观锁、乐观锁)
锁的选择技巧
悲观锁:成本较高,将并行转换为串行,客户端响应时间延长,用户体验不好
,乐观锁:仍是并行,在更新时判断其他线程是在进行修改,但是成功率低,因为假如100个线程同时查到stock = 100,但是在第一个线程做完 更新后,stock = 99,那么其他线程就不会成功了。因此可以将stock = oldStock改成stock > 0即可。
// 5.扣减库存(对比版本号前后是否相同)
boolean flag = seckillVoucherService.update()
.setSql("stock = stock - 1") // set stock = stock - 1
.eq("voucher_id", voucherId).gt("stock",0) // where voucher_id = ? and stock > 0
.update();
一人一单
##### 封装方法,添加synchronize,锁住的对象是this,也就是当前方法。但是那就意味着,每个用户进来执行创建订单就变成了串行,其实只需要锁住相同用户,就可以了,使得其他用户不会变成串行,而是并发。将synchronize修饰方法改成锁关键字(关键字转字符串)。为什么要将关键字userId转String().intern()呢,因为每次请求,即使是user的id相同,但是对象会不同,也就是内存地址不同,但装的id是相同的,可比较的是对象。因此需要toString().intern(0),当用户id的值一样时,锁就一样。
JMeter中的配置
实现代码
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
//注入Id生成器
@Resource
private RedisIdWorker redisIdWorker;
//注入秒杀券服务
@Resource
private ISeckillVoucherService seckillVoucherService;
@Override
public Result secKillVoucher(Long voucherId) {
// 1.查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2.判断秒杀是否开始
if(voucher.getBeginTime().isAfter(LocalDateTime.now())){
// 秒杀未开始,返回
return Result.fail("秒杀活动尚未开始!");
}
// 3.判断秒杀是否已经结束
if(voucher.getEndTime().isBefore(LocalDateTime.now())){
// 秒杀已经结束,返回
return Result.fail("秒杀活动已经结束!");
}
// 4.判断库存是否充足
Integer stock = voucher.getStock();
// 采用乐观锁,将stock看作是"版本号"
if(stock <= 0){
// 库存不足
return Result.fail("秒杀券已经被抢光!");
}
//解决判断同一用户是否存在订单重复,采用悲观锁来实现一人一单
return createVoucherOrder(voucherId);
}
@Transactional //添加事务注解,保证原子性(涉及两张表的改变)
public Result createVoucherOrder(Long voucherId) {
// 5.一人一单
Long userId = UserHolder.getUser().getId();
synchronized(userId.toString().intern()){
// 5.1.查询订单,不需要查出具体数据,只需要查出有无这条数据,用count()
Integer count = this.query().eq("user_id", userId).eq("voucher_id", voucherId).count();
// 5.2.判断数据用户是否已经下过单
if(count > 0){
return Result.fail("不能重复抢购秒杀券!");
}
// 6.扣减库存(对比版本号前后是否相同)
// voucher.setStock(stock - 1);
// seckillVoucherService.updateById(voucher);
boolean flag = seckillVoucherService.update()
.setSql("stock = stock - 1") // set stock = stock - 1
.eq("voucher_id", voucherId).gt("stock",0) // where voucher_id = ? and stock > 0
.update();
if(!flag){
// 扣减失败
return Result.fail("秒杀券已经被抢光!");
}
// 7.创建订单
VoucherOrder voucherOrder = new VoucherOrder();
// 7.1.订单id
Long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
// 7.2.用户id
voucherOrder.setUserId(userId);
// 7.3.代金券id
voucherOrder.setVoucherId(voucherId);
// 8.将订单写入数据库
this.save(voucherOrder);
// 8.返回订单id
return Result.ok(orderId);
}
}
}
一人一单的并发安全问题
服务器轮流接收请求时,synchronize锁住的关键字情况如下:
分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁。
分布式锁的功能性:可重入性(也就是获取锁是阻塞还是非阻塞的)
分布式锁的核心:
分布式锁的初级实现:
分布式锁的实现:
定义一个类,实现下面接口,利用Redis实现分布式锁功能
public class SimpleRedisLock implements ILock {
// 锁的key
private String lockKey;
private StringRedisTemplate stringRedisTemplate;
public SimpleRedisLock(String lockKey, StringRedisTemplate stringRedisTemplate){
this.lockKey = lockKey;
this.stringRedisTemplate = stringRedisTemplate;
}
@Override
public boolean tryLock(long timeoutSec) {
// 获取线程标示
long threadId = Thread.currentThread().getId();
String value = String.valueOf(threadId);
// 尝试获取锁, value表示那个线程获取了锁
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(LOCK_KEY_PRE + lockKey, value, timeoutSec, TimeUnit.SECONDS);
// 为防止Unboxing of 'flag' may produce 'NullPointerException' ,也就是flag有可能是null,然后return flag自动拆箱时就会报错
return Boolean.TRUE.equals(flag);
}
@Override
public void unLock() {
//释放锁
stringRedisTemplate.delete(LOCK_KEY_PRE + lockKey);
}
}
业务层
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
//注入Id生成器
@Resource
private RedisIdWorker redisIdWorker;
//注入秒杀券服务
@Resource
private ISeckillVoucherService seckillVoucherService;
//注入RedisStringTemplate
@Resource
private StringRedisTemplate stringRedisTemplate;
@Override
public Result secKillVoucher(Long voucherId) {
// 1.查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2.判断秒杀是否开始
if(voucher.getBeginTime().isAfter(LocalDateTime.now())){
// 秒杀未开始,返回
return Result.fail("秒杀活动尚未开始!");
}
// 3.判断秒杀是否已经结束
if(voucher.getEndTime().isBefore(LocalDateTime.now())){
// 秒杀已经结束,返回
return Result.fail("秒杀活动已经结束!");
}
// 4.判断库存是否充足
Integer stock = voucher.getStock();
// 采用乐观锁,将stock看作是"版本号"
if(stock <= 0){
// 库存不足
return Result.fail("秒杀券已经被抢光!");
}
//解决判断同一用户是否存在订单重复,采用悲观锁来实现一人一单
//解决判断同一用户是否存在订单重复,采用分布式锁简单版来实现一人一单
return createVoucherOrder(voucherId);
}
/**
* 一人一单,采用分布式锁简单版,用Redis中的锁机制,能在多服务器上生效
* @param voucherId
* @return
*/
@Transactional //添加事务注解,保证原子性(涉及两张表的改变)
public Result createVoucherOrder(Long voucherId) {
// 5.一人一单
// 获取本地线程中的用户Id
Long userId = UserHolder.getUser().getId();
//创建Redis锁对象
SimpleRedisLock simpleLock = new SimpleRedisLock("seckill" + userId, stringRedisTemplate);
//尝试获取锁
boolean lock = simpleLock.tryLock(10L);
//判断是否获取锁
if(!lock){
//获取锁失败,返回错误信息或重试(递归)
return Result.fail("不能重复抢购秒杀券!");
}
try {
// 5.1.查询订单,不需要查出具体数据,只需要查出有无这条数据,用count()
Integer count = this.query().eq("user_id", userId).eq("voucher_id", voucherId).count();
// 5.2.判断数据用户是否已经下过单
if (count > 0) {
return Result.fail("不能重复抢购秒杀券!");
}
boolean flag = seckillVoucherService.update()
.setSql("stock = stock - 1") // set stock = stock - 1
.eq("voucher_id", voucherId).gt("stock", 0) // where voucher_id = ? and stock > 0
.update();
if (!flag) {
// 扣减失败
return Result.fail("秒杀券已经被抢光!");
}
// 7.创建订单
VoucherOrder voucherOrder = new VoucherOrder();
// 7.1.订单id
Long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
// 7.2.用户id
voucherOrder.setUserId(userId);
// 7.3.代金券id
voucherOrder.setVoucherId(voucherId);
// 8.将订单写入数据库
this.save(voucherOrder);
// 9.返回订单id
return Result.ok(orderId);
} finally {
//释放锁
simpleLock.unLock();
}
}
}
Redis分布式锁误删问题
线程并发安全问题:在线程一处理业务时,锁因为超时而被释放,线程2拿到锁,在执行业务的时候,线程1业务完成,释放了线程2的锁。线程3因此也拿到了锁。以此类推.....解决办法,在业务完成时,获取锁的标识(value: 线程Id),是否与获得锁时的线程Id前后一致,一致,则表示业务没有超时,正常删除时;不一致,证明业务超时,锁已经被自动释放。
解决分布式锁被误删而引发的并发线程安全问题业务流程图
改进Redis分布式锁
步骤一:在获取锁时存入线程标识(可用UUID表示),为什么不使用当前线程Id,因为当前线程Id是一个自增的数字,每个JVM内部都有一个,当出现集群时,多个服务器意味着存在有多个线程Id自增的数字冲突的风险,有可能两个自增的数字会一样而导致误删锁。
步骤二:在释放锁时先获取锁中的线程标识,判断是否与当前线程标识一样,一样则释放锁,不一样则不释放锁
//UUID默认会带有横线,参数isSimple表示是否把横线去除,true为去除
private static final String ID_PRE = UUID.randomUUID().toString(true) + "-";
@Override
public boolean tryLock(long timeoutSec) {
// 获取线程标示
String threadId =ID_PRE + Thread.currentThread().getId();
// 尝试获取锁, value表示那个线程获取了锁
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(LOCK_KEY_PRE + lockKey, threadId, timeoutSec, TimeUnit.SECONDS);
// 为防止Unboxing of 'flag' may produce 'NullPointerException' ,也就是flag有可能是null,然后return flag自动拆箱时就会报错
return Boolean.TRUE.equals(flag);
}
@Override
public void unLock() {
//获取当前线程标识
String threadIdCur = ID_PRE + Thread.currentThread().getId();
//获取当前锁的线程标识
String threadId = stringRedisTemplate.opsForValue().get(LOCK_KEY_PRE + lockKey);
//如果标识一致则释放锁
if(threadIdCur.equals(threadId)){
//释放锁
stringRedisTemplate.delete(LOCK_KEY_PRE + lockKey);
}
}
分布式锁的原子性问题
在上一个流程图判断锁标识是否是自己线程的时候,这个GC垃圾回收机制发生,阻塞了线程,当阻塞时间足够长时,锁就会超时自动释放,线程2因此获得锁,而当线程1删除锁时,就会发生误删删除了线程2的锁,产生并发线程安全问题。因此必须保证判断锁标识和释放锁必须保证原子性。
Lua脚本解决多条命令原子性问题
用Lua语言编写脚本去调用Redis,在Lua脚本里面可以使判断锁标识和释放锁保证原子性,要么都执行,要么都不执行。
Lua脚本语言
Java代码修改
RedisTemplate调用Lua脚本的API如下:
如何在IDEA中创建Lua脚本文件
选择File->setting->plugins->EmmyLua
-- 比较线程标识与锁中的标识是否一致
if(redis.call('get', KEYS[1]) == ARGV[1]) then
-- 释放锁
return redis.call('del', KEYS[1])
end
return 0
代码实现
@Override
public void unLock() {
//调用lua
stringRedisTemplate.execute(
UNLOCK_SCRIPT,// Lua脚本文件的加载
Collections.singletonList(LOCK_KEY_PRE + lockKey),// 快捷创建单个元素的集合List
ID_PRE + Thread.currentThread().getId() // ThreadId
);
}
关于SETNX实现分布式锁存在下面的问题:
Redisson入门
引入依赖
配置Redisson客户端
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient(){
// 配置
Config config = new Config();
config.useSingleServer().setAddress("redis://192.168.0.101:6379").setPassword("123321");
// 创建RedissonClient对象
return Redisson.create(config);
}
}
使用Redisson的分布式锁
//创建Redis锁对象
RLock redisson_lock = redissonClient.getLock(LOCK_KEY_PRE + "seckill:" + userId);
//尝试获取锁
boolean lock = redisson_lock.tryLock(); //如果选择空参,也就是非阻塞队列,等待时间默认为-1就是不等待,释放时间默认是30s
//释放锁
redisson_lock.unlock();
Redisson的可重入锁原理
Redisson的锁重试和WatchDog机制
Redisson的MultiLock原理
测试秒杀业务的性能:提前在数据库创建好1000个用户,然后登陆1000个用户(将redis中用户的TTL设置为-1永久化),然后在JMeter中的头信息中authorization添加redis中各用户的token(${token})
异步秒杀思路(采用阻塞队列)
版本1:两个线程分别对MySQL数据进行查询
版本2:因为Redis读写的效率高于MySQL,因此可以将判断秒杀库存和校检一人一单放入Redis中缓存,线程1做判断,然后线程2减库存和创建订单写入数据库。
关于判断秒杀库存和校检一人一单在redis中采用的数据结构
关于秒杀库存和校检一人一单两个步骤保证原子性
需求分析
新增秒杀券的同时,首先将优惠券、秒杀券存入数据库,也要将秒杀券库存信息存入Redis
修改VoucherServiceImpl代码
@Override
@Transactional
public void addSeckillVoucher(Voucher voucher) {
// 保存优惠券
save(voucher);
// 保存秒杀信息
SeckillVoucher seckillVoucher = new SeckillVoucher();
seckillVoucher.setVoucherId(voucher.getId());
seckillVoucher.setStock(voucher.getStock());
seckillVoucher.setBeginTime(voucher.getBeginTime());
seckillVoucher.setEndTime(voucher.getEndTime());
seckillVoucherService.save(seckillVoucher);
// 将秒杀券库存信息存入Redis
stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY + voucher.getId(), String.valueOf(voucher.getStock()));
}
基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功
创建Lua脚本
-- 1.参数列表
-- 1.1.优惠券id
local voucherId = ARGV[1]
-- 1.2.用户id
local userId = ARGV[2]
-- 2.数据key
-- 2.1.库存key
local stockKey = "seckill:stock:" .. voucherId
-- 2.2.订单key 形成购买了该优惠券的用户列表,Set数据类型
local orderKey = "seckill:order:" .. voucherId
-- 3.脚本业务
-- 3.1判断库存是否充足 get stockKey,因为取出来的数据是字符串,因此要将他转为数字
if(tonumber(redis.call('get', stockKey)) <= 0) then
-- 3.2.库存不足,返回1
return 1
end
-- 3.2.判断用户是否下单 SISMEMBER orderKey userId
if(redis.call('sismember', orderKey, userId) == 1)then
-- 3.3.存在,说明是重复下单,返回2
return 2
end
-- 3.4.扣库存 incrby stockKey -1
redis.call('incrby', stockKey, -1)
-- 3.5.下单(不存用户) sadd orderKey userId
redis.call('sadd', orderKey, userId)
-- 3.6.返回0
return 0
如果抢购成功,将优惠券id和用户id封装后存入阻塞队列
开启线程任务,不断从阻塞队列中获取信息,实现异步下单
//阻塞队列
private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);
//开启单个线程的线程池
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
//PostConstruct注解是使该方法在当前类初始化完后就执行
@PostConstruct
private void init(){
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}
private class VoucherOrderHandler implements Runnable{
@Override
public void run() {
//死循环,一直从阻塞队列中取订单信息
while(true){
try{
//有订单信息就取出,没有则阻塞(当secKillVoucher(voucherId)执行完即可以取到)
VoucherOrder voucherOrder = orderTasks.take();
//创建订单
createVoucherOrder(voucherOrder);
} catch (Exception e){
//如果有异常,则记录日志
log.error("处理订单异常: ", e);
}
}
}
}
//创建订单
private void createVoucherOrder(VoucherOrder voucherOrder) {
//从订单信息中获取用户id、优惠券id
Long userId = voucherOrder.getUserId();
Long voucherId = voucherOrder.getVoucherId();
//创建Redis锁对象
RLock redisson_lock = redissonClient.getLock(LOCK_KEY_PRE + "seckill:" + userId);
//尝试获取锁
boolean lock = redisson_lock.tryLock(); //如果选择空参,也就是非阻塞队列,等待时间默认为-1就是不等待,释放时间默认是30s
//判断是否获取锁
if(!lock){
//获取锁失败,返回错误信息或重试(递归)
log.error("获取锁失败,不允许重复下单!");
return ;
}
//一下是兜底,防止出现未预料到的线程并发安全问题,此前一人一单,库存判断已经在秒杀业务中从Redis中判断过了,redis读写速度快,一般不会出现问题
try {
// 5.1.查询订单,不需要查出具体数据,只需要查出有无这条数据,用count()
Integer count = this.query().eq("user_id", userId).eq("voucher_id", voucherId).count();
// 5.2.判断数据用户是否已经下过单
if (count > 0) {
log.error("不允许重复下单!");
return ;
}
boolean flag = seckillVoucherService.update()
.setSql("stock = stock - 1") // set stock = stock - 1
.eq("voucher_id", voucherId).gt("stock", 0) // where voucher_id = ? and stock > 0
.update();
if (!flag) {
// 扣减失败
log.error("秒杀券库存不足!");
return ;
}
// 8.将订单写入数据库
this.save(voucherOrder);
} finally {
//释放锁
redisson_lock.unlock();
}
}
// 设置Lua脚本的...
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static{
// 初始化
SECKILL_SCRIPT = new DefaultRedisScript<>();
// 加载lua文件资源
SECKILL_SCRIPT.setLocation(new ClassPathResource("script\\seckill.lua"));
// 设置返回值类型
SECKILL_SCRIPT.setResultType(Long.class);
}
/**
* 基于Lua脚本
* @param voucherId
* @return
*/
@Override
public Result secKillVoucher(Long voucherId) {
// 获取当前用户id
Long userId = UserHolder.getUser().getId();
// 1.执行lua脚本
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(), userId.toString()
);
// 将lua脚本返回的结果拆箱
int res = result.intValue();
// 2.判断结果是否为0
if(res != 0){
// 2.1.补位0, 代表没有购买资格
return Result.fail(res == 1 ? "秒杀券库存不足!" : "不能重复抢购秒杀券!");
}
// 2.2.为0,有购买资格,把下单信息保存到阻塞队列
// 2.3.生成订单id
long orderId = redisIdWorker.nextId("order");
// 2.4.设置订单信息
VoucherOrder voucherOrder = new VoucherOrder();
voucherOrder.setId(orderId); //设置订单id
voucherOrder.setUserId(userId); //设置用户id
voucherOrder.setVoucherId(voucherId); //设置优惠券id
// TODO 保存订单信息到阻塞队列
// 2.5.创建阻塞队列
orderTasks.add(voucherOrder);
// 3.返回订单id
return Result.ok(orderId);
}
阻塞队列异步秒杀虽然提升了秒杀业务的性能,但也存在一些问题:
内存限制问题:我们使用的是JDK里面的阻塞队列,它使用的是JVM的内存,如果不加以限制,在高并发的环境下可能会导致堆栈溢出,也有可能当阻塞队列中空间已经存满了,后面的订单信息就存放不进去了
数据安全问题:如果服务突然宕机,则订单信息有可能全部丢失,用户已经付款了,但是订单信息没有生成;又或者在执行中间发生了一些异常,导致订单生成没有完成,即使修复异常,但是业务也不会再执行了,相当于任务丢失,导致Redis数据库和MySQL数据库数据前后不一致
认识消息队列
消息队列与阻塞队列的不同:消息队列是在JVM以外的独立服务,不受JVM内存的限制;消息队列不仅仅是做存储,还需要确保数据的安全,对数据做持久化,不管服务重启还是宕机,数据都不会丢失。而且消息队列的数据传到"消费者"那里会进行数据确认。如果确认失败,消息队列会再次发送消息,直到确认成功。
消息队列:MQ--RabbitMQ、SpringAMQP
基于Redis的消息队列:
基于List实现消息队列
redis中list数据结构是使用双向链表实现的。但是其与java中JVM的阻塞队列BlockingQueue不同的是,redis中list的LPUSH、RPOP在队列中没有消息时并不像JVM的阻塞队列那样阻塞并等待消息。如果要实现阻塞效果,需要用BLPOP或者BRPOP。
测试:
优点:
缺点:
基于PubSub实现消息队列
原理
优点和缺点
基于Stream实现消息队列
Stream是Redis5.0引入的一种新数据类型,可以实现一个功能非常完善的消息队列。
命令介绍
读取消息XREAD
读取最新消息,只能读取一次:XREAD COUNT 1 STREAMS stream1 $
增加阻塞,如果阻塞设置为0,则是永久阻塞,时间单位是毫秒ms
总结:
Stream的消费者组模式
创建消费者组:
其它常见命令:
XACK key group ID [ID...] 确认消费者组中的消息
key: 消费者组中的key
group: 消费者组的名称
ID:消费者组消费的消息ID
XPENDING:判断pending-list中的消息
工作流程:
总结:
基于Stream消息队列实现异步秒杀
需求:
创建消费者组(使用XGROUP,最后加上MKSTREAM,若是队列不存在就创建,因为队列和消费者组两个都可以创建成功)
command:XGROUP CREATE stream.orders g1 0 MKSTREAM
修改之前的秒杀下单Lua脚本,在认定有抢够资格后,直接向stream.orders中添加消息,内容包含voucherId、userId、orderId
-- 1.参数列表
-- 1.1.优惠券id
local voucherId = ARGV[1]
-- 1.2.用户id
local userId = ARGV[2]
-- 1.3.订单id
local orderId = ARGV[3]
-- 2.数据key
-- 2.1.库存key
local stockKey = "seckill:stock:" .. voucherId
-- 2.2.订单key 形成购买了该优惠券的用户列表,Set数据类型
local orderKey = "seckill:order:" .. voucherId
-- 3.脚本业务
-- 3.1判断库存是否充足 get stockKey,因为取出来的数据是字符串,因此要将他转为数字
if(tonumber(redis.call('get', stockKey)) <= 0) then
-- 3.2.库存不足,返回1
return 1
end
-- 3.2.判断用户是否下单 SISMEMBER orderKey userId
if(redis.call('sismember', orderKey, userId) == 1)then
-- 3.3.存在,说明是重复下单,返回2
return 2
end
-- 3.4.扣库存 incrby stockKey -1
redis.call('incrby', stockKey, -1)
-- 3.5.下单(不存用户) sadd orderKey userId
redis.call('sadd', orderKey, userId)
-- 3.6.发送消息至Stream队列中,XADD stream.orders * k1 v1 k2 v2 ...
redis.call('xadd','stream.orders','*','userId', userId, 'voucherId',voucherId, 'id', orderId)
return 0
@Override
public Result secKillVoucher(Long voucherId) {
// 获取当前用户id
Long userId = UserHolder.getUser().getId();
// 生成订单id
long orderId = redisIdWorker.nextId("order");
// 1.执行lua脚本
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(), userId.toString(), String.valueOf(orderId)
);
// 将lua脚本返回的结果拆箱
int res = result.intValue();
// 2.判断结果是否为0
if(res != 0){
// 2.1.补位0, 代表没有购买资格
return Result.fail(res == 1 ? "秒杀券库存不足!" : "不能重复抢购秒杀券!");
}
// 3.返回订单id
return Result.ok(orderId);
}
项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单
//开启单个线程的线程池
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
//PostConstruct注解是使该方法在当前类初始化完后就执行
@PostConstruct
private void init(){
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}
private class VoucherOrderHandler implements Runnable{
@Override
public void run() {
String stream = "stream.orders";
//死循环,一直从阻塞队列中取订单信息
while(true){
try{
// 1.获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS stream.orders >
List<MapRecord<String,Object,Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1","c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
StreamOffset.create(stream, ReadOffset.lastConsumed())
);
// 2.判断订单信息是否为空
if(list == null || list.isEmpty()){ //StringUtils.isEmpty(null) == true
// 如果为null,说明没有消息,继续下一次循环
continue;
}
// 解析消息
MapRecord<String, Object, Object> record = list.get(0);//获取list中从消息队列中取到的暂时排第一个的消息
Map<Object, Object> value = record.getValue();
// 将value转为VoucherOrder,利用hutool工具类,fillBeanWithMap
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
// 3.创建订单
createVoucherOrder(voucherOrder);
// 4.确认消息 XACK stream.order g1 id
stringRedisTemplate.opsForStream().acknowledge(stream,"g1",record.getId());
} catch (Exception e){
//如果有异常,则记录日志
log.error("处理订单异常: ", e);
handlePendingList();
}
}
}
private void handlePendingList() {
String stream = "stream.orders";
//死循环,一直从阻塞队列中取订单信息
while(true){
try{
// 1.获取pending-list中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS stream.orders 0
List<MapRecord<String,Object,Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1","c1"),
StreamReadOptions.empty().count(1),
StreamOffset.create(stream, ReadOffset.from("0"))
);
// 2.判断订单信息是否为空
if(list == null || list.isEmpty()){ //StringUtils.isEmpty(null) == true
// 如果为null,说明没有消息,结束
break;
}
// 解析消息
MapRecord<String, Object, Object> record = list.get(0);//获取list中从消息队列中取到的暂时排第一个的消息
Map<Object, Object> value = record.getValue();
// 将value转为VoucherOrder,利用hutool工具类,fillBeanWithMap
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
// 3.创建订单
createVoucherOrder(voucherOrder);
// 4.确认消息 XACK stream.order g1 id
stringRedisTemplate.opsForStream().acknowledge(stream,"g1",record.getId());
} catch (Exception e){
//如果有异常,则记录日志
log.error("(处理pending-list消息队列)处理订单异常: ", e);
try {
Thread.sleep(200);
} catch (InterruptedException interruptedException) {
interruptedException.printStackTrace();
}
}
}
}
}
上传图片(上传图片和发布笔记分开两步,先上传图片)
修改图片上传的路径,这里先存放至硬盘上(后面学了微服务后存储到云服务器),然后返回图片的路径,前端服务器作保存
@Slf4j
@RestController
@RequestMapping("upload")
public class UploadController {
@PostMapping("blog")//当请求的参数名称(页面表单上的name属性)与Controller的业务方法参数名称不一致时,就需要通过@RequestParam注解显示的绑定
public Result uploadImage(@RequestParam("file") MultipartFile image) {
try {
// 获取原始文件名称
String originalFilename = image.getOriginalFilename();
// 生成新文件名
String fileName = createNewFileName(originalFilename);
// 保存文件
image.transferTo(new File(SystemConstants.IMAGE_UPLOAD_DIR, fileName));
// 返回结果
log.debug("文件上传成功,{}", fileName);
return Result.ok(fileName);
} catch (IOException e) {
throw new RuntimeException("文件上传失败", e);
}
}
...
}
public class SystemConstants {
public static final String IMAGE_UPLOAD_DIR = "E:\\lesson\\nginx-1.18.0\\html\\hmdp\\imgs\\";
......
}
发布笔记(图片来自步骤1的上传图片后返回的图片地址,地址记录在前端页面猜测是JS的ES6对象属性当中)
@RestController
@RequestMapping("/blog")
public class BlogController {
@Resource
private IBlogService blogService;
@Resource
private IUserService userService;
@PostMapping
public Result saveBlog(@RequestBody Blog blog) {
// 获取登录用户
UserDTO user = UserHolder.getUser();
blog.setUserId(user.getId());
// 保存探店博文
blogService.save(blog);
// 返回id
return Result.ok(blog.getId());
}
...
}
需求:点击博客,请求 URL: http://localhost:8080/api/blog/id,请求方法GET,返回博客文章信息以及用户id、icon、姓名信息。可以在Blog类里面增加userId(是数据库字段)、icon和username(非数据库字段)
设计:当查询某个id的博客文章时,首先根据博客id查询博客,得到博客对象,再根据博客对象中里面的userId来查询user的icon和username,再对博客对象中非数据库表中的字段值username和icon进行赋值,返回博客对象。(参考热点文章的代码,里面附有分页代码)
Controller
@GetMapping("/{id}")
public Result queryBlogById(@PathVariable("id") Long id){
return blogService.queryBlogById(id);
}
Service
@Service
public class BlogServiceImpl extends ServiceImpl<BlogMapper, Blog> implements IBlogService {
@Resource
private IUserService userService;
@Override
public Result queryHotBlog(Integer current) {
// 根据用户查询
IPage<Blog> page = query()
.orderByDesc("liked")
.page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE));
// 获取当前页数据
List<Blog> records = page.getRecords();
// 查询用户
records.forEach(blog ->{
queryBlogUser(blog);
});
return Result.ok(records);
}
@Override
public Result queryBlogById(Long id) {
// 1.查询blog
Blog blog = getById(id);
if(blog == null){
return Result.fail("博客文章不存在!");
}
// 2.查询blog有关的用户
queryBlogUser(blog);
return Result.ok(blog);
}
private void queryBlogUser(Blog blog) {
Long userId = blog.getUserId();
User user = userService.getById(userId);
blog.setName(user.getNickName());
blog.setIcon(user.getIcon());
}
}
需求:
实现步骤
思考:但是存在一个问题,如果直接使用上面的private void isBlogLiked(Blog blog)方法,如果用户没有登录,则会报空指针的异常,导致页面的博客文章显示不出来。
个人想到的解决方法:在private void isBlogLiked(Blog blog)方法当中,判断用户是否登录,则什么都不做,而且在未登录显示的页面当中,热点文章的点赞需要先登录。
@Service
public class BlogServiceImpl extends ServiceImpl<BlogMapper, Blog> implements IBlogService {
@Resource
private IUserService userService;
@Resource
private StringRedisTemplate stringRedisTemplate;
//根据分页查询热点文章
@Override
public Result queryHotBlog(Integer current) {
// 根据用户查询
Page<Blog> page = query()
.orderByDesc("liked")
.page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE));
// 获取当前页数据
List<Blog> records = page.getRecords();
// 查询用户
records.forEach(blog ->{
queryBlogUser(blog);
this.isBlogLiked(blog);
});
return Result.ok(records);
}
//根据id查询文章
@Override
public Result queryBlogById(Long id) {
// 1.查询blog
Blog blog = getById(id);
if(blog == null){
return Result.fail("博客文章不存在!");
}
// 2.查询blog有关的用户
queryBlogUser(blog);
// 3.查询blog是否被点赞,前端页面的判断来自与这里blog对象中的isLike字段值
isBlogLiked(blog);
return Result.ok(blog);
}
//判断文章是否被当前用户点赞,设置blog的isLike字段值
private void isBlogLiked(Blog blog) {
try{
UserDTO userDTO = UserHolder.getUser();
if(userDTO != null){
// 1.获取登录用户
Long userId = userDTO.getId();
// 2.判断当前登录用户是否已经点赞
String key = BLOG_LIKED_KEY + blog.getId();
Boolean isMember = stringRedisTemplate.opsForSet().isMember(key, userId.toString());
blog.setIsLike(BooleanUtil.isTrue(isMember));
}
}catch (Exception e){
throw new RuntimeException(e);
}
}
//点赞文章
@Override
public Result likeBlog(Long id) {
// 1.获取登录用户
Long userId = UserHolder.getUser().getId();
// 2.判断当前登录用户是否已经点赞
String key = BLOG_LIKED_KEY + id;
Boolean isMember = stringRedisTemplate.opsForSet().isMember(key, userId.toString());
//因为isMember是包装类,所以不能直接判断,使用Boolean包装类
if(BooleanUtil.isFalse(isMember)){
// 3.如果未点赞,可以点赞
// 3.1数据库点赞数 + 1
boolean isSuccess = update()
.setSql("liked = liked + 1")
.eq("id", id).update();
// 3.2保存用户到Redis的set集合
if(isSuccess){
stringRedisTemplate.opsForSet().add(key,userId.toString());
}
}else{
// 4.如果已点赞,取消点赞
// 4.1.数据库点赞数 - 1
boolean isSuccess = update()
.setSql("liked = liked - 1")
.eq("id", id).update();
// 4.2.把用户从Redis的set集合移除
if(isSuccess){
stringRedisTemplate.opsForSet().remove(key,userId.toString());
}
}
return Result.ok();
}
//设置文章的用户相关信息
private void queryBlogUser(Blog blog) {
try{
Long userId = blog.getUserId();
User user = userService.getById(userId);
blog.setName(user.getNickName());
blog.setIcon(user.getIcon());
}catch (Exception e){
throw new RuntimeException(e);
}
}
}
需求:根据时间顺序,展示出最早点赞的五个用户的信息。请求URL:.../blog/likes/{id},请求方式GET,请求参数为blog的id,返回值List给这个笔记点赞的TopN用户集合
Redis数据类型对比
根据需求决定使用SortedSort,而增添元素的命令是ZADD,但SortedSet没有isMember这个命令,所以用ZSCORE这个命令输入key 和 member来获取member的score来判断是否存在该元素。另一条指令就是ZRANGE key start end,注意redis下标也是从0开始,返回排序从0-4的5个元素member。
修改代码
修改点赞代码,将点赞的用户id从原来存放的set集合改成sortedset集合
//点赞文章
@Override
public Result likeBlog(Long id) {
// 1.获取登录用户
Long userId = UserHolder.getUser().getId();
// 2.判断当前登录用户是否已经点赞
String key = BLOG_LIKED_KEY + id;
// Boolean isMember = stringRedisTemplate.opsForSet().isMember(key, userId.toString());
Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString());
//因为isMember是包装类,所以不能直接判断,使用Boolean包装类
if(score == null){
//BooleanUtil.isFalse(isMember
// 3.如果未点赞,可以点赞
// 3.1数据库点赞数 + 1
boolean isSuccess = update()
.setSql("liked = liked + 1")
.eq("id", id).update();
// 3.2保存用户到Redis的set集合
if(isSuccess){
// stringRedisTemplate.opsForSet().add(key,userId.toString());
//采用时间戳作为member的score,ZADD key score1 member1 [score value ...]
stringRedisTemplate.opsForZSet().add(key,userId.toString(),System.currentTimeMillis());
}
}else{
// 4.如果已点赞,取消点赞
// 4.1.数据库点赞数 - 1
boolean isSuccess = update()
.setSql("liked = liked - 1")
.eq("id", id).update();
// 4.2.把用户从Redis的set集合移除
if(isSuccess){
// stringRedisTemplate.opsForSet().remove(key,userId.toString());
stringRedisTemplate.opsForZSet().remove(key,userId.toString());
}
}
return Result.ok();
}
修改用户是否点赞的代码
//判断文章是否被当前用户点赞,设置blog的isLike字段值
private void isBlogLiked(Blog blog) {
try{
UserDTO userDTO = UserHolder.getUser();
if(userDTO != null){
// 1.获取登录用户
Long userId = userDTO.getId();
// 2.判断当前登录用户是否已经点赞
String key = BLOG_LIKED_KEY + blog.getId();
// Boolean isMember = stringRedisTemplate.opsForSet().isMember(key, userId.toString());
Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString());
// blog.setIsLike(BooleanUtil.isTrue(isMember));
blog.setIsLike(score != null);
}
}catch (Exception e){
throw new RuntimeException(e);
}
}
需要关注的一个异常报错:就是测试以上修改代码时,报错
这里说key值的数据类型与实际不相符,证明之前用的set数据没有删除导致出错。
增加查询点赞该id笔记的前五名用户信息
尽管redis中ZRANGE key 0 4是按score排序获得userId,但是对数据库进行读操作时,SELECT id,phone,password,nick_name,icon,create_time,update_time FROM tb_user WHERE id IN ( ? , ? )结果却是反过来的顺序。
解决办法:对field中的id进行排序:
SELECT id,phone,password,nick_name,icon,create_time,update_time FROM tb_user WHERE id IN ( 1010 , 1 ) ORDER BY FIELD(id,1010,1);
//查询点赞该id笔记的前五名用户信息
@Override
public Result queryBlogLikes(Long id) {
//redis中存放该笔记点赞用户id的集合中的key
String key = BLOG_LIKED_KEY + id;
// 1.查询top5的点赞用户 ZRANGE key 0 4
Set<String> top5 = stringRedisTemplate.opsForZSet().range(key, 0, 4);
if(top5 == null || top5.isEmpty()){
//返回空集合
return Result.ok(Collections.emptyList());
}
// 2.解析出其中的用户id
List<Long> ids = top5.stream().map(Long::valueOf).collect(Collectors.toList());
// 将ids转成以","拼接而成的字符串
String idsStr = StrUtil.join(",", ids);
// 3.根据用户id查询用户,不能直接返回User,涉及敏感信息,要返回UserDTO
// SELECT id,phone,password,nick_name,icon,create_time,update_time FROM tb_user WHERE id IN ( ? , ? ) ORDER BY FIELD(id , ?, ?...)
List<UserDTO> userDTOs = userService.query()
.in("id", ids).last("ORDER BY FIELD(id," + idsStr + ")").list()
.stream()
.map(user -> BeanUtil.copyProperties(user, UserDTO.class))
.collect(Collectors.toList());
/**
* userService.listByIds(ids) ==>SELECT id,phone,password,nick_name,icon,create_time,update_time FROM tb_user WHERE id IN ( ? , ? )
*/
// 4.返回
return Result.ok(userDTOs);
}
达人探店流程如下:
热点笔记分页查询、根据id查询笔记 ->(数据库:根据blog表中的liked值降序排序,分页第1页,页面size为5) -> 从IPage(Page)中获取records,然后从records中取出blog,设置blog的用户相关信息 -> 传入blog,根据用户id查询redis,判断该笔记是否被当前用户点赞;
传入blog,根据用户id查询redis判断该笔记是否被当前用户点赞,然后设置blog中的isLike属性,传到前端判断是否设置高亮。
点赞,根据传入的blog的id,读取redis数据判断是否被当前用户点赞,如果没有,则点赞,将用户id存入redis;否则,删除redis中用户id
需求分析:基于该表数据结构,实现两个接口。因为关注是user之间的关系,是博主与粉丝之间的关系,因此是多对多关系,需要建立一张表来表示:
关注和取关接口
@Override
public Result follow(Long followUserId, Boolean isFollow) {
// 获取登录用户
Long userId = UserHolder.getUser().getId();
String key = FOLLOW_KEY_PRE + userId;
// 1.判断到底是关注还是取关
if(isFollow){
// 2.关注,新增数据
// 2.1.数据库数据新增
Follow follow = new Follow(); //关注关系对象
follow.setFollowUserId(followUserId); //设置博主id
follow.setUserId(userId); //设置关注用户的id
follow.setCreateTime(LocalDateTime.now());
boolean isSuccess = save(follow);
// 2.2.Redis新增用户关注数据
if(isSuccess){
// SADD ..:userId followUserId
stringRedisTemplate.opsForSet().add(key,followUserId.toString());
}
}else{
// 3.取关,删除数据 delete from tb_follow where user_id = ? and follow_user_id = ?
QueryWrapper<Follow> qw = new QueryWrapper<>();
qw
.eq("user_id", userId)
.eq("follow_user_id", followUserId);
boolean isSuccess = remove(qw);
// 在Redis中把当前用户关注的followUserId删除
if(isSuccess){
stringRedisTemplate.opsForSet().remove(key, followUserId.toString());
}
}
return Result.ok();
}
判断是否已经关注接口
@Override
public Result isFollow(Long followUserId) {
// 获取登录用户
Long userId = UserHolder.getUser().getId();
// 1.查询是否关注 select count(*) from tb_follow where user_id = ? and follow_user_id = ?
QueryChainWrapper<Follow> eq = query()
.eq("user_id", userId)
.eq("follow_user_id", followUserId);
Integer count = eq.count();
return Result.ok(count > 0);
}
点击头像进入用户简介页面
自己完善规范下面以下两个代码:
点击共同关注
分析:Redis中能求交集的数据结构是Set或者SortedSet。因此,当前用户关注了谁,保存到Redis中的set中,然后用SINTER key_userId key_follow_userId得出共同关注的用户基本信息UserDTO
/**
* 根据用户id查询与当前用户共同关注的用户基本信息
* @param userId 用户id
* @return
*/
@GetMapping("/common/{id}")
public Result followCommons(@PathVariable("id") Long userId){
return followService.followCommons(userId);
}
@Override
public Result followCommons(Long userId) {
// 1.获取当前用户
Long curId = UserHolder.getUser().getId();
String key1 = FOLLOW_KEY_PRE + curId;
// 2.求交集
String key2 = FOLLOW_KEY_PRE + userId;
// 2.1.得到两个用户交集的用户id字符串集合
Set<String> userIdList = stringRedisTemplate.opsForSet().intersect(key1, key2);
// 判断集合是否为空
if(userIdList == null || userIdList.isEmpty()){
//无交集,返回空集
return Result.ok(Collections.emptyList());
}
// 3.解析id集合,利用stream流将字符串集合转换成Long集合
List<Long> userIds = userIdList.stream().map(Long::valueOf).collect(Collectors.toList());
// 4.集合,将User集合转换成UserDTO 集合
List<UserDTO> userDTOs = userServiceImpl.listByIds(userIds)
.stream().map(user -> BeanUtil.copyProperties(user, UserDTO.class)).collect(Collectors.toList());
// 5.返回
return Result.ok(userDTOs);
}
请求:/follow/count、/follow/info;返回值:粉丝数量和粉丝用户的简单信息UserDTO
代码实现
Controller
/**
* 返回当前用户的粉丝数量
* @return
*/
@GetMapping("/count")
public Result countOfFollows(){
return followService.countOfFollows();
}
/**
* 返回当前用户的粉丝信息
* @return
*/
@GetMapping("/info")
public Result infoOfFollows(){
return followService.infoOfFollows();
}
Service
@Override
public Result countOfFollows() {
// 1.获取当前用户信息
Long userId = UserHolder.getUser().getId();
// 2.查询当前用户的关注者人数
Long size = stringRedisTemplate.opsForSet().size(FOLLOW_KEY_PRE + userId);
if (size == null || size == 0){
return Result.ok(0);
}
return Result.ok(size);
}
@Override
public Result infoOfFollows() {
// 1.获取当前用户信息
Long userId = UserHolder.getUser().getId();
// 2.查询当前用户的关注者,返回一个userId集合
Set<String> ids_str = stringRedisTemplate.opsForSet().members(FOLLOW_KEY_PRE + userId);
if(ids_str == null || ids_str.isEmpty()){
//如果没有关注者,返回空集合
return Result.ok(Collections.emptyList());
}
List<Long> ids = ids_str.stream().map(Long::valueOf).collect(Collectors.toList());
// 3.根据userId集合查询数据库,将UserDTO存入集合
List<UserDTO> list = new ArrayList<>(ids.size());
List<User> users = userServiceImpl.listByIds(ids);
for (User user: users){
UserDTO userDTO = new UserDTO();
userDTO = BeanUtil.copyProperties(user, UserDTO.class);
list.add(userDTO);
}
// 4.返回关注者集合
return Result.ok(list);
}
关注推送也称做Feed流,直译为投喂。为用户持续提供“沉浸式”的体验,通过无限下拉刷新获取新的信息。
Feed流产品有两种常见模式:
Timeline:
智能排序:
本例的个人页面是基于关注的好友来做Feed流,因此使用的是Timeliness的模式。该模式有三种实现方案:
拉模式
推模式
推拉模式
总结:
基于推模式实现关注推送功能
需求:
Redis数据的选择:
关于Feed流的分页问题
Feed流中的数据会不断更新,所以数据的角标也会发生变化,因此不能采用传统的分页模式。(传统的分业模式:page = 1,size = 5,那么角标start = (page - 1) * size,limit = 5)
Feed的滚动分页模式
因为List中只能按照角标来查找,因此不太符合这个滚动分页。所以只能用SortedSet。因为SortedSet可以根据时间戳排名, 当第一次分页记录最后一个数据的时间戳,下次查询下一页的时候,只要查询比记录的时间戳小即可。(以后做排行榜也是这样考虑,考虑是否是动态数据更新)
代码实现需求:
需求①:每个粉丝都有自己的一个收件箱SortedSet
@Override
public Result saveBlog(Blog blog) {
// 1.获取登录用户
UserDTO user = UserHolder.getUser();
blog.setUserId(user.getId());
// 2.保存探店笔记
boolean isSuccess = save(blog);
if(!isSuccess){
Result.fail("新增笔记失败!");
}
// 3.查询笔记作者的所有粉丝,查询set集合或者查询数据库(select * from tb_follow where follow_user_id = ?)
List<Follow> fans = followService.query()
.eq("follow_user_id", user.getId()).list();
// 4.推送笔记id给所有粉丝
for(Follow fan : fans){
// 4.1.获取粉丝id
Long fanId = fan.getUserId();
// 4.2.推送笔记id给粉丝的收件箱
String key = FEED_KEY_PRE + fanId;
Boolean feed = stringRedisTemplate.opsForZSet().add(key, blog.getId().toString(), System.currentTimeMillis());
}
// 5.返回id
return Result.ok(blog.getId());
}
需求②:以满足。
需求③:实现滚动分页查询
问题分析:
一、容易发生的错误,在redis中输入ZADD z1 6 m6 5 m5 4 m4 3 m3 2 m2 1 m1,然后ZREVRANGE z1 0 2 WITHSCORES(按角标查询),出来的是m6 到 m4以及对应的分数,但如果此时插入了一条数据,ZADD z1 7 m7,当第二页的三个数据ZREVRANGE z1 3 5出来的却是 m4到 没m2以及对应的分数,而不是期望的m3~m1,因此使用角标就会出现这种问题。
二、若存在分数相同的key(删删了m8),先查询第一页ZREVRANGEBYSCORE z1 1000 0 WITHSCORES LIMIT 0 3,找到最小值为6,进行下一页的查询ZREVRANGEBYSCORE z1 6 0 WITHSCORES LIMIT 1 3,发现第一个数据重复了。因为SortedSet在进行该执行检索的时候,从大到小寻找,寻找到第一个score为6的key是m65,后面score也是6的边跳过不检索,于是便输出m65后面的三个key以及score,导致重复。
思路:从分数最高的开始取,取n个,记录第n个数据的score,然后下次开始取就从上次记录的第n个数据开始取。指令:ZREVRANGEBYSCORE key max min [WITHSCORES] [LIMIT offset count],LIMIT offset count表示从最大值的第几个偏移量开始,需要先找出上次记录的第n个数据的score有多少个,offset就等于score值为第n个数据的score的个数,然后取count个 因为在这里分数代表时间戳,所以分数最大值为当前时间戳。因为这里只需要知道开始的最大值(上次记录的第n个时间戳,如果是第一次则是当前时间戳),而且有取的限制数,因此最小值不需要在意。
代码实现:
前端的请求有上一次查询的最小时间戳(第一页是当前时间戳)和偏移量(默认值为0)
返回值有小于指定时间戳的笔记集合、本次查询的推送的最小时间戳和偏移量
//分页查询关注用户的博客文章笔记
@Override
public Result queryBlogOfFollow(Long minTime, Integer offset) {
// 1.获取当前用户
Long userId = UserHolder.getUser().getId();
// 2.查询当前用户的收件箱 ZREVRANGEBYSCORE key max min LIMIT offset count
String key = FEED_KEY_PRE + userId;
/**
* public interface TypedTuple<V> extends Comparable<ZSetOperations.TypedTuple<V>> {
* @Nullable
* V getValue();
*
* @Nullable
* Double getScore();
* }
*/
Set<ZSetOperations.TypedTuple<String>> typedTuples = stringRedisTemplate.opsForZSet()
.reverseRangeByScoreWithScores(key, 0, minTime, offset, 2); // key min max offset count
// 3.非空判断
if(typedTuples == null || typedTuples.isEmpty()){
return Result.ok();
}
// 4.解析数据:包含blogId、minTime时间戳、offset
List<Long> blogIds = new ArrayList<>(typedTuples.size()); //当前页数的笔记id
long min = 0L; //当前页数的笔记最小时间戳
Integer os = 1; //计算score最小时间戳的元素个数
for (ZSetOperations.TypedTuple<String> tuple : typedTuples){ // 5 4 4 2 2
// 4.1.获取Blog的id
String blogId = tuple.getValue();
blogIds.add(Long.valueOf(blogId));
// 4.2.获取时间戳
long time = tuple.getScore().longValue();
// 4.3.判断当前时间戳是否跟min上一个时间戳相同
if(time == min){
//最小时间戳不变,os自增1
os++;
}else{
min = time;
os = 1; //重置是因为最小时间戳重复的元素在分页的中间,因此需要重置
}
}
// 4.根据id查询blog,利用MP封装的函数
//List<Blog> blogList = listByIds(blogIds); //但这种是基于IN(?,?,?)来查的,不能保证顺序,跟点赞排行榜一样
String idStr = StrUtil.join(",", blogIds);
List<Blog> blogList = query().
in("id", blogIds).last("ORDER BY FIELD(id," + idStr + ")").list();
// 4.1.设置blog的相关信息
for (Blog blog : blogList) {
// 4.1.1查询blog相关的用户
queryBlogUser(blog);
// 4.1.2
isBlogLiked(blog);
}
// 5.封装并返回
ScrollResult result = new ScrollResult();
result.setList(blogList);
result.setMinTime(min);
result.setOffset(os);
return Result.ok(result);
}
GEO数据结构(底层存放地理空间信息的数据使用SortedSet)
添加三个火车站的地理坐标
计算北京西站到北京站
搜索天安门(116.397904 39.909005)附近10km内所有火车站,并按照距离升序排序
请求相关的信息
存储店铺信息到Redis的GEO数据结构中
因为MySQL数据库不能实现范围查询,因此首先需要将MySQL的店铺信息导入进Redis当中的GEO数据结构中,因为Redis是内存存储,读写速度较快,因此GEO数据中的value只需要存商铺的id,而score则是店铺的经度x、纬度y转化
需要注意的一点是:在请求中还有商户类型,但在GEO中并没有。因此,按照店铺的类型做分组,类型相同的店铺作为同一组,以typeId为key存入同一个GEO集合中即可。
代码实现:
直接在单元测试中写(模拟后台管理员导入数据)。
//模拟后端管理员将店铺的地理坐标信息存入Redis中的GEO集合
@Test
void testLoadShopData(){
// 1.查询店铺信息(因此数据库信息很多,可以分批查询)
List<Shop> list = shopService.list();
// 2.把店铺根据typeId分组,id相同的放到一个集合
Map<Long, List<Shop>> map = list.stream().collect(Collectors.groupingBy(Shop::getTypeId));
// 3.分批完成写入Redis
for (Map.Entry<Long, List<Shop>> entry: map.entrySet()){
// 3.1.获取类型id
Long typeId = entry.getKey();
String key = SHOP_GEO_KEY + typeId;
// 3.2.获取同类型的店铺集合
List<Shop> shops = entry.getValue();
/**
* public static class GeoLocation<T> {
* private final T name;
* private final Point point;public static class GeoLocation<T> {
*/
List<RedisGeoCommands.GeoLocation<String>> locations = new ArrayList<>(shops.size());
// 3.3.写入Redis GEO key(typeId) 经度 纬度 member(id)
for(Shop shop : shops){
//一条一条店铺的信息写入效率低
// stringRedisTemplate.opsForGeo().add(key, new Point(shop.getX(), shop.getY()), shop.getId().toString());
//使用迭代器直接先将所有同类型的店铺信息存入到集合中
locations.add(new RedisGeoCommands.GeoLocation<>(
shop.getId().toString(),
new Point(shop.getX(), shop.getY()))
);
}
//将集合写入到GEO中
stringRedisTemplate.opsForGeo().add(key, locations);
}
}
关于SpringDataRedis版本问题
先排除掉spring-data-redis和lettuce的版本,然后新增前两者的版本(可以利用Maven Helper插件来做,但是我的IDEA找不到该插件,只能手动)
代码编写:
Controller
/**
* 根据商铺类型滚动分页查询商铺信息
* @param typeId 商铺类型
* @param current 页码
* @return 商铺列表
*/
@GetMapping("/of/type")
public Result queryShopByType(
@RequestParam("typeId") Integer typeId,
@RequestParam(value = "current", defaultValue = "1") Integer current,
@RequestParam(value = "x", required = false) Double x,
@RequestParam(value = "y", required = false) Double y
) {
return shopService.queryShopByType(typeId,current,x,y);
}
Service
//滚动分页查询附近商铺
@Override
public Result queryShopByType(Integer typeId, Integer current, Double x, Double y) {
// 1.判断是否需要根据坐标查询
if(x == null || y == null){
// 根据类型分页查询
Page<Shop> page = this.query()
.eq("type_id", typeId)
.page(new Page<>(current, SystemConstants.DEFAULT_PAGE_SIZE));
// 返回数据
return Result.ok(page.getRecords());
}
// 2.计算分页参数,比如根据current计算出从哪个数据开始查询
int from = (current - 1) * SystemConstants.DEFAULT_PAGE_SIZE;
int end = current * SystemConstants.DEFAULT_PAGE_SIZE;
// 3.查询redis,按照距离排序、分页,结果:shopId、distance, GEOSEARCH key BYLONLAT x y BYRADIUS 10 WITHDISTANCE
String key = SHOP_GEO_KEY + typeId;
GeoResults<RedisGeoCommands.GeoLocation<String>> results = stringRedisTemplate.opsForGeo()
.search(key, //key
GeoReference.fromCoordinate(x, y), //圆心
new Distance(5000), //半径,单位默认是M
RedisGeoCommands.GeoSearchCommandArgs.newGeoSearchArgs().includeDistance().limit(end) //limit(int end)该方法永远从0开始,到end结束,from部分得手动截取
);
// 4.解析出id,截取从from到end的数据
// 4.1.如果结果就是方法后面显示黄色,有可能会是空指针,因此需要判断
if(results == null){
return Result.ok(Collections.emptyList());
}
List<GeoResult<RedisGeoCommands.GeoLocation<String>>> list = results.getContent();
// 4.2.截取from~end部分,有两种方式,一种是List的subList,需要拷贝集合,消耗内存,一种是使用Stream的skip,仅是跳过,不消耗内存
// List<GeoResult<RedisGeoCommands.GeoLocation<String>>> subList = list.subList(from, end);
//因为需要截取数据,因此截取完的数据有可能为空,因此这里需要判断,不要会报异常
if(list.size() <= from){
//没有下一页
return Result.ok(Collections.emptyList());
}
List<Long> ids = new ArrayList<>(list.size());
Map<String, Distance> distanceMap = new HashMap<>(list.size());
list.stream().skip(from).forEach(result -> {
// 4.2.1获取店铺id
String shopIdStr = result.getContent().getName(); //相当于GEOSEARCH...返回的店铺id,即GEO里面的value
ids.add(Long.valueOf(shopIdStr));
// 4.2.2获取距离
Distance distance = result.getDistance();
distanceMap.put(shopIdStr, distance);
});
// 5.根据id查询shop,保持有序不能直接用this.listByids(ids),得用chain链式查询
String idStr = StrUtil.join(",", ids);
List<Shop> shops = query().in("id", ids).last("ORDER BY FIELD(id, " + idStr + ")").list();
// 6.将map中商铺对应的距离存放到Shop类中非数据库表对应的字段Distance
for (Shop shop: shops) {
shop.setDistance(distanceMap.get(shop.getId().toString()).getValue());
}
// 6.返回
return Result.ok(shops);
}
常用指令注意点:
注意SETBIT的offset是从0开始!像key第一个位置存入一个0,指令是 SETBIT key 0 0
关于BITFIELD中[GET type offset]:
用户需求
// 用户签到
@Override
public Result sign() {
// 1.获得当前登录用户
Long userId = UserHolder.getUser().getId();
// 2.获取当前日期信息
LocalDateTime now = LocalDateTime.now();
// 3.拼接Key
String keySuffix = now.format(DateTimeFormatter.ofPattern("yyyyMM"));
String key = SIGN_KEY_PRE + userId + keySuffix;
// 4.获取今天是本月的第几天
int dayOfMonth = now.getDayOfMonth(); //月的一天,从1到31
// 5.写入Redis,SETBIT key offset 1
stringRedisTemplate.opsForValue().setBit(key, dayOfMonth-1, true);
return Result.ok();
}
测试的话可以用postman,不过需要先设置请求头的token,利用url签到;也可以直接使用Redis的命令行进行签到。
什么叫做连续签到天数:从最后一次签到开始向前统计,直到遇到第一次未签到为止,计算总的签到次数,就是连续签到的天数。
1 1 1 0 0 0 1 1 0 1 1 0 0 0 1 0 1 1 1 0 1 1 1 1 1 0 1 1 1 1
如何得到本月到今天为止的所有签到数据?
Redis指令:BITFIELD key GET u[dayOfMonth] 0,该条指令返回的数据是十进制的
如何从后向前遍历每个bit位?
需求实现:
//统计连续签到的天数
@Override
public Result signCount() {
// 1.获取当前登录用户
Long userId = UserHolder.getUser().getId();
// 2.获取当前日期所在月份的第几天
LocalDateTime now = LocalDateTime.now();
int dayOfMonth =now.getDayOfMonth(); //这里不是角标,因为无需减1
// 3.拼接Key
String keySuffix = now.format(DateTimeFormatter.ofPattern("yyyyMM"));
String key = SIGN_KEY_PRE + userId + keySuffix;
// 4.从Redis获取本月截止今天为止的所有签到数据(返回是一个10进制的数字) BITFIELD sign:101020224 GET u18 0
List<Long> result = stringRedisTemplate.opsForValue().bitField(
key, BitFieldSubCommands.create().get(BitFieldSubCommands.BitFieldType.unsigned(dayOfMonth)).valueAt(0L)
); //因为该命令可以同时GET SET INCRBY, 因此结果是一个集合
if(result == null || result.isEmpty()){
//没有任何签到结果
return Result.ok(0);
}
Long num = result.get(0);
System.out.println("签到数据的十进制是: " + num);
if(num == null || num == 0){
return Result.ok(0);
}
// 5.逐个bit位右移逻辑与,判断连续签到的天数
int count = 0;
while(true){
//判断这个bit位是否为0
if((num & 1) == 0){
//如果为0,说明未签到,结束
break;
}
//如果不为0,说明已签到,计数器+1
count++;
//把数字右移一位,抛弃最后一个bit位,继续下一个bit位
num >>>= 1; //>>:表示有符号右移,>>>:表示无符号右移
}
return Result.ok(count);
}
在Redis命令行中连续签到2022年4月17和18号这两天:
关于UV和PV的两个概念
HLL算法介绍
因为我们没有这么多用户,因此折中使用单元测试来测试UV统计
测试10000条数据
10000条数据,使用HyperLogLog插入了10011条,误差11条。
List<String> top5 = ......
List<Long> ids = top5.stream().map(Long::valueOf).collect(Collectors.toList());
// 1.查询店铺信息(因此数据库信息很多,可以分批查询)
List<Shop> list = shopService.list();
// 2.把店铺根据typeId分组,id相同的放到一个集合
Map<Long, List<Shop>> map = list.stream().collect(Collectors.groupingBy(Shop::getTypeId));
// 4.2.截取from~end部分,有两种方式,一种是List的subList,需要拷贝集合,消耗内存,一种是使用Stream的skip,仅是跳过,不消耗内存
// List<GeoResult<RedisGeoCommands.GeoLocation<String>>> subList = list.subList(from, end);
List<GeoResult<RedisGeoCommands.GeoLocation<String>>> resultList = list.stream().skip(from).collect(Collectors.toList());
UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);
BeanUtil.fillBeanWithMap(map, t, true);
List<Long> ids = new ArrayList<>(list.size());
// 5.根据id查询shop,保持有序不能直接用this.listByids(ids),得用chain链式查询
String idStr = StrUtil.join(",", ids); // "id1,id2,id3,id4,...,idn"
(92条消息) 学习MyBatis-Plus3这一篇就够了_轻松的小希的博客-CSDN博客_mybatisplus3使用
获取当前时间,并且按格式输出
// 2.获取当前日期信息
LocalDateTime now = LocalDateTime.now();
// 3.拼接Key
String keySuffix = now.format(DateTimeFormatter.ofPattern("yyyyMM"));
// 4.获取今天是本月的第几天
int dayOfMonth = now.getDayOfMonth(); //月的一天,从1到31
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。