Redis学习与实战
缓存穿透
-
缓存穿透是指客户端请求的数据,Redis和Mysql里面都没有,缓存永远不会生效,如果客户端一直请求相同的id,请求就会一直到达数据库,给数据库上压力了.

解决方案1:缓存空对象
-
缓存空对象:如果客户端请求找不到的数据,就把找不到的数据缓存到Redis里,并且设置过期时间,在一定时间内,客户端的空对象请求不会经过Mysql
-
缺点:有额外内存消耗,如果管理端新增对象和空对象id相同,可能造成缓存与数据库内容不一致
解决缓存穿透的业务逻辑:

对应的java逻辑代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
@Override
public Result getByIdRedis(Long id) {
String shop= stringRedisTemplate.opsForValue().get(RedisConstants.CACHE_SHOP_KEY+id);
//查询到对应的value,并且这个value不是"",则店铺存在,直接返回
if(shop!=null&&!"".equals(shop)){
Shop shopRedis= JSONUtil.toBean(shop,Shop.class);
return Result.ok(shopRedis);
}
//店铺value不等于null,但是为"",说明缓存和数据库中都没有,直接返回不存在信息
if(shop!=null){
return Result.fail("店铺信息不存在");
}
Shop shop1= getById(id);
//数据没查到,说明这个店铺id不存在,这次缓存穿透了,但是可以把空信息写入redis
if(shop1==null){
stringRedisTemplate.opsForValue().set(RedisConstants.CACHE_SHOP_KEY+id,"",RedisConstants.CACHE_SHOP_TTL, TimeUnit.MINUTES);
return Result.fail("店铺信息不存在");
}
String json= JSON.toJSONString(shop1);
//查询到店铺,返回店铺信息,并且把信息写入Redis
if(shop1!=null){
stringRedisTemplate.opsForValue().set(RedisConstants.CACHE_SHOP_KEY+id,json,RedisConstants.CACHE_SHOP_TTL, TimeUnit.MINUTES);
return Result.ok(shop1);
}
return Result.fail("店铺信息不存在");
}
|
解决方案2:布隆过滤
缓存穿透后,穿透的信息进入布隆过滤器,如果再进行查询,先查询布隆过滤器,如果这个id是穿透信息,就直接拒绝查询

其他解决方案:
缓存雪崩
简单解决办法
-
把数据存储进Redis的时候直接随机过期时间存储
java代码如下(其实就是生成随机数)
1
2
3
4
5
|
private Long cacheAvalanche(){
Random random=new Random();
Long number=random.nextInt(11)+20L;
return number;
}
|
其他高级解决方案:
缓存击穿
一个热点数据过期,大量线程同时访问,每个线程都选择查询完Redis后查询数据库,导致数据库压力剧增

解决方案1:互斥锁
-
使用Redis的setnx作为互斥条件,所有线程同时设置一个键值对,只有一个线程可以设置成功,并且操作数据库,写入缓存,写完后释放锁资源
1
2
3
4
5
6
7
8
9
10
11
12
13
|
//设置锁和释放锁的方法
private Boolean setLock(String key){
Boolean flag= stringRedisTemplate.opsForValue().setIfAbsent(key,"1",10,TimeUnit.SECONDS);
if(flag==null){
return false;
}else if(flag){
return true;
}
return false;
}
private void unLock(String key){
stringRedisTemplate.delete(key);
}
|
互斥锁解决缓存穿透的代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
@Override
public Result getByIdRedis(Long id) throws InterruptedException {
String shop= stringRedisTemplate.opsForValue().get(RedisConstants.CACHE_SHOP_KEY+id);
//查询到对应的value,并且这个value不是"",则店铺存在,直接返回
if(shop!=null&&!"".equals(shop)){
Shop shopRedis= JSONUtil.toBean(shop,Shop.class);
return Result.ok(shopRedis);
}
//店铺value不等于null,但是为"",说明缓存和数据库中都没有,直接返回不存在信息
if(shop!=null){
return Result.fail("店铺信息不存在");
}
//所有线程同时去抢夺互斥锁资源,只会有一个线程抢到
if(setLock(RedisConstants.LOCK_SHOP_KEY+id)){
Shop shop1= getById(id);
//数据没查到,说明这个店铺id不存在,这次缓存穿透了,但是可以把空信息写入redis
if(shop1==null){
stringRedisTemplate.opsForValue().set(RedisConstants.CACHE_SHOP_KEY+id,"",RedisConstants.CACHE_SHOP_TTL, TimeUnit.MINUTES);
return Result.fail("店铺信息不存在");
}
String json= JSON.toJSONString(shop1);
//查询到店铺,返回店铺信息,并且把信息写入Redis
if(shop1!=null){
stringRedisTemplate.opsForValue().set(RedisConstants.CACHE_SHOP_KEY+id,json,RedisConstants.CACHE_SHOP_TTL, TimeUnit.MINUTES);
return Result.ok(shop1);
}
//所有工作做完后释放锁资源
unLock(RedisConstants.LOCK_SHOP_KEY+id);
//其他线程休眠对应的时间后重新尝试获取资源(递归)
}else {
Thread.sleep(50);
getByIdRedis(id);
}
return Result.fail("店铺信息不存在");
}
|
其中为防止获得锁的线程挂了不会释放锁资源,给锁设置过期时间
解决方案2:逻辑过期时间

- 在Redis中存储数据时,多存储一条过期时间,如果过期的话,最快发现的线程会主动申请互斥锁,并且查询数据库,查完后设置过期时间并且写回redis,同时返回给客户端,其他的线程请求完锁后,请求不到就直接返回过期的数据,这种方式可以防止死锁的发生,但是牺牲了一部分redis空间
java代码如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
@Override
public Result redisLogicExpireTime(Long id){
String shop= stringRedisTemplate.opsForValue().get(RedisConstants.CACHE_SHOP_KEY+id);
logger.info(shop);
//查询到对应的value,并且这个value不是"",则店铺存在,直接返回
if(shop!=null&&!"".equals(shop)){
RedisData redisData=JSONUtil.toBean(shop,RedisData.class);
//判断是否过期
if(redisData.getExpireTime().isAfter(LocalDateTime.now())){
Shop shopRedis= redisData.getData();
System.out.println(shopRedis);
logger.info("查询redis直接输出");
return Result.ok(shopRedis);
}
}
//店铺value不等于null,但是为"",说明缓存和数据库中都没有,直接返回不存在信息
if(shop!=null&&shop.equals("")){
return Result.fail("店铺信息不存在");
}
//所有线程同时去抢夺互斥锁资源,只会有一个线程抢到
if(setLock(RedisConstants.LOCK_SHOP_KEY+id)){
Shop shop1= getById(id);
//数据没查到,说明这个店铺id不存在,这次缓存穿透了,但是可以把空信息写入redis
if(shop1==null){
stringRedisTemplate.opsForValue().set(RedisConstants.CACHE_SHOP_KEY+id,"",cacheAvalanche(), TimeUnit.MINUTES);
return Result.fail("店铺信息不存在");
}
RedisData redisData=new RedisData();
redisData.setData(shop1);
redisData.setExpireTime(LocalDateTime.now().plusMinutes(cacheAvalanche()));
String json= JSON.toJSONString(redisData);
//查询到店铺,返回店铺信息,并且把信息写入Redis
if(shop1!=null){
stringRedisTemplate.opsForValue().set(RedisConstants.CACHE_SHOP_KEY+id,json,cacheAvalanche(), TimeUnit.MINUTES);
return Result.ok(shop1);
}
//所有工作做完后释放锁资源
unLock(RedisConstants.LOCK_SHOP_KEY+id);
//其他线程直接返回过期的数据
}else {
RedisData redisData=new RedisData();
redisData.setData(JSONUtil.toBean(shop,Shop.class));
Shop redis=redisData.getData();
return Result.ok(redis);
}
return Result.fail("店铺信息不存在");
}
|
订单秒杀问题
限量限时商品可能会在短时间内面临大量请求,可能会出现超卖的情况

如果第一次遇到1的时候没来得及写到数据库里,后面的线程查询的时候遇到的还是1,就可以继续执行扣减的操作,导致超卖的情况发生
JMeter测试结果

200次线程请求直接超卖了9个商品
解决方案1:乐观锁
• 假设:乐观锁假设冲突发生的概率很小,允许多个事务同时操作数据,但在提交时检查是否有其他事务修改了数据。
• 实现:通常通过版本号(version)或时间戳(timestamp)实现。在更新数据时,比较当前版本号与数据库中的版本号,如果一致则更新并增加版本号;如果不一致,则说明数据已被其他事务修改,需要重新获取数据并重试。
• 适用场景:适用于读多写少的场景,或者数据竞争不激烈的情况下。
Java实现
1
2
3
4
5
6
7
8
|
Boolean success=seckillVoucherServiceImpl.update()
.setSql("stock=stock-1")
.eq("voucher_id",voucherId)
.eq("stock", seckillVoucher.getStock())
.update();
if(!success){
return Result.fail("库存不足");
}
|
在写入数据库之前,先查询数据库目前的值和之前查询到的是否一样,是否在写入之前被修改了,如果被修改了就不能写入
JMeter测试结果


可以发现成功率低的可怜,200次请求只抢到了21张票,原因就是每次写入都要查询到之前是否已经写过,请求频率太高,导致不写入的概率也更高,写入越多的情况越不能用乐观锁
解决方案2:悲观锁
• 假设:悲观锁假设会发生冲突,即多个事务会同时修改同一数据,因此它在操作开始时就锁定数据,防止其他事务修改。
• 实现:通常通过数据库的锁机制实现,如行锁、表锁等。
• 适用场景:适用于写操作频繁的场景,或者数据竞争非常激烈的情况下。
只要在每次写入更新结果之前先查看一下剩余的量是不是大于0就可以了
Java代码
1
2
3
4
5
6
7
8
|
Boolean success=seckillVoucherServiceImpl.update()
.setSql("stock=stock-1")
.eq("voucher_id",voucherId)
.gt("stock", 0)
.update();
if(!success){
return Result.fail("库存不足");
}
|
JMeter测试结果

200次请求,只卖出100张,异常比例正确
单人订单问题
有些商品给购买者限量,比如买火车票或者限定周边,如果一个黄牛用脚本在短时间大量请求,则有可能会多卖
简单解决
1
2
3
4
|
List<VoucherOrder> list=voucherOrderMapper.selectByUserId(voucherId,UserHolder.getUser().getId());
if(!list.isEmpty()){
return Result.fail("该用户已下过单");
}
|
JMeter测试结果显示,200个请求同时下单,一共能购买10张票
解决方案:共享锁
在 Java 中, synchronized是一个关键字,用于控制对共享资源的访问,确保在同一时刻只有一个线程可以访问特定的代码块或方法。这是实现线程同步的一种方式,主要用于解决多线程环境下的并发问题。
我们可以把订单秒杀的代码先抽离出来
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
@Transactional(rollbackFor = Exception.class)
public Result createVoucherOrder(Long voucherId) {
//开始的话看库存够不够,够就库存减一,并且创建订单
List<VoucherOrder> list=voucherOrderMapper.selectByUserId(voucherId,UserHolder.getUser().getId());
if(!list.isEmpty()){
return Result.fail("该用户已下过单");
}
Boolean success=seckillVoucherServiceImpl.update()
.setSql("stock=stock-1")
.eq("voucher_id", voucherId)
.gt("stock", 0)
.update();
if(!success){
return Result.fail("库存不足");
}
Long id= UserHolder.getUser().getId();
VoucherOrder voucherOrder=new VoucherOrder();
voucherOrder.setVoucherId(voucherId);
voucherOrder.setUserId(id);
voucherOrder.setCreateTime(LocalDateTime.now());
voucherOrder.setUpdateTime(LocalDateTime.now());
voucherOrder.setPayTime(LocalDateTime.now());
voucherOrder.setStatus(1);
voucherOrder.setId(redisId.nextId("order"));
voucherOrderMapper.insert(voucherOrder);
return Result.ok(voucherOrder.getVoucherId() + "下单成功");
}
|
如果是对整个函数加锁,也就是在public后面,那么不是同一个用户也会被锁给拦截,性能不高
或者可以给锁限定userId,如果同一id就被拦截,串行进行
1
2
|
Long userId =UserHolder.getUser().getId();
synchronized (userId.toString().intern())
|
这里intern的作用是,toString会导致产生新的字符串对象,字符串对象虽然值是相同的,但是哈希值不一样,被锁认为是不同的对象,这时用intern可以从字符串池里找相同的串,哈希值相同
锁应当在事务结束之后再释放才行,否则又会产生冲突,事务还没结束,有个线程又进来了,会引发异常,因此应当把锁放到整个方法外面
1
2
3
4
|
Long userId =UserHolder.getUser().getId();
synchronized (userId.toString().intern()) {
return createVoucherOrder(voucherId);
}
|
JMeter测试结果

200次买票,只买到1次,解决了单人买票的问题
上述写法依然有问题
Java魅力时刻
1
2
3
4
|
Long userId =UserHolder.getUser().getId();
synchronized (userId.toString().intern()) {
return createVoucherOrder(voucherId);
}
|
注意到这里的return createVoucherOrder(voucherId);是目标对象引用的函数,相当于
return this.createVoucherOrder(voucherId);但是只有代理对象才有事务管理的功能,代理对象就是加上@Controller,@Service,@Mapper,@Component,@Bean的对象,这个对象协助目标对象完成工作.
要想在这个对象里面调用代理对象可以通过如下办法
1
2
3
4
5
|
Long userId =UserHolder.getUser().getId();
synchronized (userId.toString().intern()) {
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
}
|
AopContext.currentProxy()获取本类的代理对象,然后从Object转成本类的对象,然后调用对应的方法,要在接口里面重新声明这个方法
同时要引入依赖
1
2
3
4
5
|
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
<version>1.9.22.1</version>
</dependency>
|
在springboot启动类上加注解
1
|
@EnableAspectJAutoProxy(exposeProxy = true)
|
允许SpringIOC容器暴露代理对象,这样我们才能正常获取代理对象
Redis实现分布式锁
如果现在同时开两个进程,服务器集群部署,由nginx实现负载均衡,实现轮询发送请求,先给8081端口发送,再给8082端口,导致进程内的锁无法和另一个进程的锁联动

这时可以使用伟大的Redis制作分布式锁来解决这个问题!
分布式锁的设计
-
这个锁应当起到互斥作用,很多个线程同时发送过来,只能有一个线程获取锁资源,因此得用setnx.
-
在此线程结束运行的时候应当及时释放锁资源,防止服务器资源浪费,及时del key
-
如果一个服务器在发送完这个请求后就宕机了,不会执行这个释放锁资源的代码,那么锁资源就不会被释放,导致其他服务器资源浪费.这时候就要给锁设置过期时间,到时间自动释放锁资源
-
如果在还没执行expire time的时候服务器就宕机了,那么锁资源一样不会被释放,这时候就得这么写获取锁的语句
-
1
|
set lock thread1 nx ex 10
|
-
保持了原子性,让互斥和过期时间一起设置
-
同时采用非阻塞的锁,防止很多线程一直等待锁资源释放,尝试一次,如果没获取锁资源就return false,成功就return true
Java代码实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
package com.hmdp.utils;
import org.springframework.data.redis.core.StringRedisTemplate;
import java.util.concurrent.TimeUnit;
public class SimpleRedisLock implements ILock {
private StringRedisTemplate stringRedisTemplate;
private static final String LOCK_PREFIX = "lock:";
private String lockName;
public SimpleRedisLock(String lockName,StringRedisTemplate stringRedisTemplate) {
this.lockName = lockName;
this.stringRedisTemplate = stringRedisTemplate;
}
@Override
public Boolean tryLock(Long timeoutSec) {
Long threadId = Thread.currentThread().getId();
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(LOCK_PREFIX+lockName,threadId+"",timeoutSec, TimeUnit.SECONDS);
//这样写的话,是true就返回true,如果是false或者null都返回false
return Boolean.TRUE.equals(result);
}
@Override
public void unlock() {
stringRedisTemplate.delete(LOCK_PREFIX+lockName);
}
}
|
将分布式锁加入业务逻辑
1
2
3
4
5
6
7
8
9
10
11
12
|
Long userId =UserHolder.getUser().getId();
SimpleRedisLock lock=new SimpleRedisLock("order:"+userId,stringRedisTemplate);
Boolean isLock=lock.tryLock(1200L);
if(!isLock){
return Result.fail("一人只能买一张票");
}
try{
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
}finally {
lock.unlock();
}
|
分布式锁误删问题:

-
由于业务阻塞,导致线程1获取锁后没有及时释放锁资源,锁自动释放,线程2请求锁成功,开始执行业务逻辑
-
这时候线程1完成业务,执行释放锁的指令,导致业务2的锁被意外删除,以此类推,锁会被意外删除.
-
改进办法很简单,只要每次执行删除锁之前先查询锁的线程是否是自己的,是自己的就可以删,不是自己的就跳过.
Java代码改进
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
package com.hmdp.utils;
import org.springframework.data.redis.core.StringRedisTemplate;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
public class SimpleRedisLock implements ILock {
private StringRedisTemplate stringRedisTemplate;
private static final String LOCK_PREFIX = "lock:";
private static final String ID_PREFIX = UUID.randomUUID().toString();
private String lockName;
public SimpleRedisLock(String lockName,StringRedisTemplate stringRedisTemplate) {
this.lockName = lockName;
this.stringRedisTemplate = stringRedisTemplate;
}
@Override
public Boolean tryLock(Long timeoutSec) {
String threadId =ID_PREFIX+ Thread.currentThread().getId();
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(LOCK_PREFIX+lockName,threadId,timeoutSec, TimeUnit.SECONDS);
//这样写的话,是true就返回true,如果是false或者null都返回false
return Boolean.TRUE.equals(result);
}
@Override
public void unlock() {
String UnlockName= stringRedisTemplate.opsForValue().get(LOCK_PREFIX+lockName);
String threadId =ID_PREFIX+ Thread.currentThread().getId();
if(threadId.equals(UnlockName)){
stringRedisTemplate.delete(LOCK_PREFIX+lockName);
}
}
}
|

对程序打断点调试,获取锁后拦截,当我修改这里的ThreadId并且重新放行后,这里的新锁并没有被删掉,解决了误删的问题.
Lua脚本
当我们使用最新的分布式锁的时候,如果在执行finally语句里面的代码时,遭遇JVM进行垃圾回收,这时候会遇到无法战胜的业务阻塞,,代码还是没有做到原子性.如果已经验证完这个线程对应这个锁后突然垃圾回收,那么就会导致del锁这个操作会很危险.
这时候可以把整个unlock操作用Lua脚本完成
在Redis客户端使用lua脚本
无参脚本
1
|
eval "return redis.call('set','name','jack')" 0
|
语句的意思是使用脚本,脚本是一个字符串,就是引号里面的,脚本相当于set name jack 0代表没有参数
有参脚本
1
|
eval "return redis.call('set',KEYS[1],ARGV[1])" 1 dinglz sb
|
用KEYS[1]和ARGV[1]作为占位符,后面有一对参数
使用lua脚本解决删除锁问题
1
2
3
4
5
6
7
8
|
--查询锁的id
local id=redis.call('get',KEYS[1])
if(ARGV[1]==id) then
--释放锁
return redis.call('del',KEYS[1])
end
--不是自己的锁,不用释放锁
return 0
|
Java代码调用Lua脚本

1
2
3
4
5
6
7
8
9
|
@Override
public void unlock() {
List list = new ArrayList();
list.add(LOCK_PREFIX+lockName);
stringRedisTemplate.execute(
UNLOCK_SCRIPT,
list,
ID_PREFIX+ Thread.currentThread().getId());
}
|
-
向redis传输指令,显示脚本,然后是参数,KEYS[1]参数要用集合封装,第二个参数是线程id也就是ARGV[1]
-
同时要配置好脚本
1
2
3
4
5
6
|
private static DefaultRedisScript<Long> UNLOCK_SCRIPT;
static {
UNLOCK_SCRIPT = new DefaultRedisScript<>();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unLock.lua"));
UNLOCK_SCRIPT.setResultType(Long.class);
}
|
设置文件位置,然后设置返回格式,返回值为0表示删除锁失败,1表示删除锁成功
Redisson
Redisson是一个封装好的分布式锁工具,这里面的锁是已经写好的,并且比前文介绍的锁多一些功能和奇效

引入依赖
1
2
3
4
5
|
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.16.0</version>
</dependency>
|
配置Redisson
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
package com.hmdp.config;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redisson() {
Config config = new Config();
config.useSingleServer().setAddress("redis://Yourip:Yourport").setPassword("Yourpassword");
return Redisson.create(config);
}
}
|
@Configuration注解
- 这里顺带提一嘴Springboot框架面试高频考点,@Configuration是什么,和@Component有啥区别.
@Configuration是配置类要添加的注解,它的构成是
1
2
3
4
5
6
7
8
9
10
11
12
|
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Component
public @interface Configuration {
@AliasFor(
annotation = Component.class
)
String value() default "";
boolean proxyBeanMethods() default true;
}
|
-
这里面也有@Component,但是它比前者多实现单例模式
-
如果你在创建Bean对象的时候一次性创建多个,Spring容器并不会去对象池去找是否有已经创建过的对象,而是直接再创建,这样无法保证单例性
-
而如果用@Configuration就可以只创建一次Bean对象
使用Redisson制作分布式锁
1
2
3
|
//SimpleRedisLock lock=new SimpleRedisLock("order:"+userId,stringRedisTemplate);
Boolean isLock1=redissonClient.getLock("order"+userId).tryLock(1L, TimeUnit.SECONDS);
//Boolean isLock=lock.tryLock(1200L);
|
中间那一行代码就等价于之前写的SimpleRedisLock
Redisson解决不可重入
不可重入导致死锁
如果一个线程请求锁资源,然后又请求了一次锁资源,第二次请求会失败,因为已经有线程获取过锁,并且就是自身,这导致了这个线程请求不到锁,也释放不了锁,导致了死锁.
解决方案
于是Redisson改变了锁的结构,让锁的数据结构变为Hash,key存储锁的名称,field存储线程名称,value存储该锁被同一个线程调用了几次,每次调用会给value自增,如果释放锁资源就value自减一次,如果value==0,那么就删除这个锁资源.
Redisson解决不可重试
不可重试导致大量请求失败
正常线程如果获取锁失败就直接返回false了,或者说一直循环递归等待下去,导致了大量请求无法返回客户需要的内容
解决方案
1
|
Boolean isLock1=redissonClient.getLock("order"+userId).tryLock(1L, TimeUnit.SECONDS);
|
注意到我们之前在设置tryLock的时候设置了尝试时间1L,单位是秒.这个意思是我可以总共等待1秒,如果这1秒内获取到锁资源,就返回true,如果没请求到就返回false,当然这个1L可以改为任何数值.
Redisson的tryLock原码
1
2
3
4
5
6
7
|
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
Long ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
if (ttl == null) {
return true;
}
|
ttl是剩余等待时间,后面的tryAcquire是看是否获取锁成功,如果获取锁成功就返回null,如果获取成功就返回剩余等待时间.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
|
else {
current = System.currentTimeMillis();
RFuture<RedissonLockEntry> subscribeFuture = this.subscribe(threadId);
if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
if (!subscribeFuture.cancel(false)) {
subscribeFuture.onComplete((res, e) -> {
if (e == null) {
this.unsubscribe(subscribeFuture, threadId);
}
});
}
this.acquireFailed(waitTime, unit, threadId);
return false;
} else {
boolean var16;
try {
time -= System.currentTimeMillis() - current;
if (time <= 0L) {
this.acquireFailed(waitTime, unit, threadId);
boolean var20 = false;
return var20;
}
do {
long currentTime = System.currentTimeMillis();
ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
if (ttl == null) {
var16 = true;
return var16;
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0L) {
this.acquireFailed(waitTime, unit, threadId);
var16 = false;
return var16;
}
currentTime = System.currentTimeMillis();
if (ttl >= 0L && ttl < time) {
((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
time -= System.currentTimeMillis() - currentTime;
} while(time > 0L);
this.acquireFailed(waitTime, unit, threadId);
var16 = false;
} finally {
this.unsubscribe(subscribeFuture, threadId);
}
return var16;
}
}
|
主要看else中的原码,如果获取锁失败,并且要进入等待,RFuture subscribeFuture = this.subscribe(threadId)指的是这个线程订阅了请求的锁的信息,如果锁被释放了,这个线程就会收到信息,这正是观察者模式.但是这个等待并不是无限制的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
RFuture<RedissonLockEntry> subscribeFuture = this.subscribe(threadId);
if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
if (!subscribeFuture.cancel(false)) {
subscribeFuture.onComplete((res, e) -> {
if (e == null) {
this.unsubscribe(subscribeFuture, threadId);
}
});
}
this.acquireFailed(waitTime, unit, threadId);
return false;
}
|
-
如果时间到达上限了,直接返回false,并且取消订阅.
-
如果时间没到上限,就再尝试获取锁,没获取到就继续订阅,因为很多线程同时请求过来,得按顺序获取锁.直到获取到锁或者直接到时间才结束这个循环.
Redis本地集群部署
只要改一下端口号就能实现本地的Redis集群

在config文件下修改端口号

用记事本或者vscode打开都可以,然后按下Ctrl+F搜索6379,下面的端口号改成没设置过的端口号,但也不要跟其他进程冲突了,我这里改为了6381

然后分别启动Redis就实现Redis集群了
Redisson实现分布式联锁
1
2
3
4
|
RLock lock1=redisson6379.getLock("lock:order:"+userId);
RLock lock2=redisson6380.getLock("lock:order:"+userId);
RLock lock3=redisson6381.getLock("lock:order:"+userId);
RLock lock=redisson6380.getMultiLock(lock1,lock2,lock3);
|
-
使用随机一个redisson客户端来调用它的.getMultiLock方法就可以制作出联锁
-
联锁就可以在Redis集群的情况下,所有Redis共有一把锁,以免出现重复申请锁资源的情况
-
getMultiLock
会尝试同时获取所有指定的锁,只有当所有锁都成功获取时,才算加锁成功。如果任何一个锁获取失败,它会回滚已经获取的锁,确保加锁操作的原子性。
-
通过将多个锁的获取和释放封装在一起,getMultiLock
简化了在复杂业务场景下的并发控制逻辑,减少了开发人员在处理多个锁时的错误风险。
其中redisson.getMultilock使用任意一个对象都可以的原因是,这里是统一新创建
1
2
3
|
public RLock getMultiLock(RLock... locks) {
return new RedissonMultiLock(locks);
}
|
通过这样制作联锁,可以让每一个Redis都有lock


异步秒杀问题
将超卖和判重用Redis解决
把卖票过程比喻成厨子做饭,如果饭店只有一个人,那么他自己得招待顾客,还得自己下厨,但是这时候多来了个服务员,服务员负责招待顾客下单,厨子只是来做菜,卖票过程就是用Redis作为先手,把订单都收到手了,交给后端再交给数据库来异步处理这些订单.
JMeter测试数据时,随着时间请求量会增长,因此可能会面对短时间超高并发,并且现有的代码还有很多查询MySQL的操作,比如:
-
查询优惠券剩余数量,查询stock
-
顾客是否已经买过一张票,查询是否有这个订单
于是我们可以通过改变订单流程来解决

在商家添加优惠券信息的时候就把优惠券的信息和数量同步到Redis
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
@Override
@Transactional
public void addSeckillVoucher(Voucher voucher) {
// 保存优惠券
save(voucher);
// 保存秒杀信息
SeckillVoucher seckillVoucher = new SeckillVoucher();
seckillVoucher.setVoucherId(voucher.getId());
seckillVoucher.setStock(voucher.getStock());
seckillVoucher.setBeginTime(voucher.getBeginTime());
seckillVoucher.setEndTime(voucher.getEndTime());
seckillVoucherService.save(seckillVoucher);
stringRedisTemplate.opsForValue().set("Inventory"+voucher.getId()+":stock",voucher.getStock().toString());
}
|
然后在请求到达后端时,先查询Redis,判断优惠券数量是否够,这个用户是否下过单了
lua脚本
- 由于一次性操作Redis次数过多,直接使用多条java语句没有原子性,因此要用lua脚本
1
2
3
4
5
6
7
8
9
10
|
local stock=redis.call('get',KEYS[1])
if(tonumber(stock)<1) then
return 1
end
local success = redis.call('SADD', KEYS[2], ARGV[1])
if(success==0) then
return 2
end
redis.call('decr',KEYS[1])
return 0
|
由于KEYS传过来时必须得是String类型,而只有number才能比较大小,因此先转化为数字,如果票没了就返回1.
Redis的set集合
Redis的set集合在插入的时候,要判重,如果重复就不能插入,返回0,如果插入成功就返回1,故可以作为锁.
多人秒杀的存储效果是这样的

然后就可以写代码调用脚本了
1
2
3
4
5
6
7
8
9
10
11
|
//前期判断
List<String> KEYS = new ArrayList<>();
//List<String> ARGS = new ArrayList<>();
KEYS.add("Inventory"+voucherId+":stock");
KEYS.add("Inventory"+voucherId+":set");
Long result = stringRedisTemplate.execute(TICKET_SNATCHING_SCRIPT, KEYS, UserHolder.getUser().getId()+"");
if(result==1){
return Result.fail("没票了");
}else if (result==2){
return Result.fail("一个人不能抢多张票");
}
|
后端接受到可以下单的信息后制作订单
1
2
3
4
5
6
7
8
9
|
Long id= UserHolder.getUser().getId();
VoucherOrder voucherOrder=new VoucherOrder();
voucherOrder.setVoucherId(voucherId);
voucherOrder.setUserId(id);
voucherOrder.setCreateTime(LocalDateTime.now());
voucherOrder.setUpdateTime(LocalDateTime.now());
voucherOrder.setPayTime(LocalDateTime.now());
voucherOrder.setStatus(1);
voucherOrder.setId(redisId.nextId("order"));
|
然后把订单数据交给阻塞队列来异步执行,这里要用java自带的阻塞队列,然后多创建一个线程来解决这些下单问题
1
2
3
4
5
6
7
8
9
|
//创建阻塞队列
private BlockingQueue<VoucherOrder> queue=new ArrayBlockingQueue<VoucherOrder>(1024*1024);
//创建线程池
private static final ExecutorService SECKILL_ORDER_EXECUTOR= Executors.newSingleThreadExecutor( );
//在启动类初始化完就马上把
@PostConstruct
private void init(){
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}
|
@PostConstruct
private void init(){
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}着重讲一下
-
@PostConstruct
注解:
-
SECKILL_ORDER_EXECUTOR
:
- 它的作用是管理线程的生命周期,避免频繁创建和销毁线程的开销。
-
submit(new VoucherOrderHandler())
:
这是线程池的任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
//给线程池分配任务
private class VoucherOrderHandler implements Runnable{
@Override
public void run() {
while(true){
try{
VoucherOrder voucherOrder=queue.take();
voucherHandler(voucherOrder);
}catch (Exception e){
System.out.println("订单处理异常"+e.getMessage());
}
}
}
}
|
在队列中有元素的时候,子线程取出voucherOrder对象,然后解决下单问题
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
private void voucherHandler(VoucherOrder voucherOrder) throws InterruptedException {
Long userId=voucherOrder.getUserId();
//SimpleRedisLock lock=new SimpleRedisLock("order:"+userId,stringRedisTemplate);
RLock lock1=redisson6379.getLock("lock:order:"+userId);
RLock lock2=redisson6380.getLock("lock:order:"+userId);
RLock lock3=redisson6381.getLock("lock:order:"+userId);
RLock lock=redisson6380.getMultiLock(lock1,lock2,lock3);
Boolean isLock1=lock.tryLock(1L, TimeUnit.SECONDS);
//Boolean isLock=lock.tryLock(1200L);
if(!isLock1){
return ;
}
try{
proxy.createVoucherOrder(voucherOrder);
}finally {
System.out.println("id为"+userId+"的顾客买到票了");
lock.unlock();
}
}
|
注意到这是个子线程的任务,子线程的任务无法获得代理对象,也就是SpringBoot的IOC容器管理的对象,但是我们需要事务回滚,所以还是得用代理对象,所以只能把代理对象作为私有变量,然后在类的方法中对这个私有代理对象进行赋值
1
2
3
4
5
6
7
8
|
private IVoucherOrderService proxy;
@Override
public Result order(Long voucherId) {
...
//获取代理对象
proxy=(IVoucherOrderService) AopContext.currentProxy();
...
}
|
然后改进createVoucherOrde方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
@Transactional(rollbackFor = Exception.class)
public void createVoucherOrder(VoucherOrder voucherOrder) {
//开始的话看库存够不够,够就库存减一,并且创建订单
List<VoucherOrder> list=voucherOrderMapper.selectByUserId(voucherOrder.getVoucherId(),voucherOrder.getUserId());
if(!list.isEmpty()){
return ;
}
Boolean success=seckillVoucherServiceImpl.update()
.setSql("stock=stock-1")
.eq("voucher_id", voucherOrder.getVoucherId())
.gt("stock", 0)
.update();
if(!success){
return ;
}
voucherOrderMapper.insert(voucherOrder);
}
|
至此,异步下单已经初步完成了.
JMeter测试
由于要实现多人秒杀,所以就得用多个账户登录来抢票,但是找不到1000个真人来测试,总不能一个一个敲登录来获取token吧,所以可以写一个自动登录1000个账户的测试类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
package com.hmdp;
import com.hmdp.utils.RedisConstants;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.core.StringRedisTemplate;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@SpringBootTest
public class TokenTest {
private static final Integer NUMBER_OF_TOKEN = 1000;
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Test
public void test() {
for (Integer i = 1; i < NUMBER_OF_TOKEN+1; i++) {
Map<String, String> map = new HashMap<>();
map.put("nickName", "");
map.put("icon", "");
map.put("id", i.toString());
String token= UUID.randomUUID().toString();
stringRedisTemplate.opsForHash().putAll(RedisConstants.LOGIN_USER_KEY+token,map);
System.out.println(token);
stringRedisTemplate.expire(token, RedisConstants.CACHE_SHOP_TTL, TimeUnit.MINUTES);
}
}
}
|
在 Redis里存一份即可,然后把所有的token输出出来,后面拿来给JMeter,这样绕过了拦截器
把tokens存到txt文件中
在JMeter里面这样设置一下

注意那个路径就是txt文件存储位置
然后改一下登录状态头

然后就可以开始测试了!
测试结果:

1000人抢200张票,只有20%成功率,80%的异常率,测试成功
Redis实现消息队列
认识消息队列
- 消息队列(Message Queue,简称MQ)是一种在软件架构中用于实现应用程序之间异步通信的中间件。它允许一个或多个生产者(消息的发送者)将消息发送到队列中,然后由一个或多个消费者(消息的接收者)从队列中读取消息。消息队列的主要作用是解耦、缓冲、异步通信和负载均衡等,可以用来削峰填谷,减轻服务器和数据库的处理压力。

之前有用到spring的阻塞队列,但是这个阻塞队列效果不是很好,如果中途服务挂机了,或者说停机后JVM会强制删除所有内存数据,那么后续请求就无法完成,并且不保留记录.这时候就需要外部的消息队列了,这里先用Redis实现,后面还可以用实现好的消息队列,kafka,RocketMQ
Redis要求:
redis必须达到5版本及其以上,这里使用的是stream数据结构来制作消息队列
Redis单消费模式
我们可以使用XADD来向消息队列里面加入元素,用XREAD来读取消息队列的元素

-
第一次redis会去查存储空间里面有没有users这个消息队列如果没有就创建
-
这个语句的意思是添加名称为users的消息队列,然后*为自动生成id,k2 v2为一个键值对
-
添加成功后会返回一个数据"1743079174227-0"这是一个时间戳,用来唯一标识这个队列
同样的我们可以用XREAD来读取消息队列中的数据

1
|
XREAD count 1 streams users 0
|
这个语句的意思是读消息,一次读一条,读取消息队列users的消息,并且从0开始读
1
|
XREAD count 1 block 0 streams users $
|
这个语句的意思是读取users的消息,($)读取最新的消息,然后无限阻塞,直到有消息进入队列就读取一条消息.
这样看上去一个消息队列建好了,但是有bug,如果存储数据时间较长,存储数据没来得及存储完,又传来了一堆新的消息,$只能读取最新的消息,导致中间有很多消息都漏掉了
Redis消费者组模式

省流版:
-
消费者组里面有很多消费者,它们是竞争关系,都来争着处理消息,可以解决消息漏处理的问题
-
消息表示指的是最后被处理的消息会被打上标签,就像看书有个书签,看完后下次再看就立马知道在哪了
-
消息确认就跟TCP三次握手相似,只是没有最后一段客户端向服务端的确认信息,每次处理完信息都会确认一下,并且有个pending-list,如果没处理这个消息就根据这个list继续处理
创建消费者组

1
|
XGROUP CREATE users group1 0
|
这个语句的意思是在users这个消息队列中创建消费者组,如果没有users这个消息队列就会创建失败,这个消费者组的名称为group1,每次从id=0开始读
通过消费者组读消息队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
localhost:6379> XREADGROUP group group1 c1 count 1 block 2000 streams users >
1) 1) "users"
2) 1) 1) "1743074094428-0"
2) 1) "k1"
2) "v1"
localhost:6379> XREADGROUP group group1 c1 count 1 block 2000 streams users >
1) 1) "users"
2) 1) 1) "1743079174227-0"
2) 1) "k2"
2) "v2"
localhost:6379> XREADGROUP group group1 c1 count 1 block 2000 streams users >
1) 1) "users"
2) 1) 1) "1743082841091-0"
2) 1) "k3"
2) "v3"
localhost:6379> XREADGROUP group group1 c1 count 1 block 2000 streams users >
1) 1) "users"
2) 1) 1) "1743082847320-0"
2) 1) "k4"
2) "v4"
localhost:6379> XREADGROUP group group1 c1 count 1 block 2000 streams users >
1) 1) "users"
2) 1) 1) "1743082852967-0"
2) 1) "k5"
2) "v5"
|
这个语句的含义是通过消费者组进行读取,group的名称是group1(刚刚创建的),消费者名称叫c1(随便取的名字,没有的话会自动创建),每次消费一条信息,阻塞2秒,消息队列名称为users,选择从最早发过来的未读消息开始读,>换成0可以使因为之前处理消息的时候,服务器炸了,重新处理的时候读取pending-list,直接从未处理完成的消息继续处理
确认信息方法XACK
1
2
|
XACK users group1 1743074094428-0 1743079320687-0 1743082841091-0 1743082847320-0
(integer) 4
|
前面是消息队列名称和消费者组名,后面是消息被消费后发出的时间戳
通过如下语句查询pending-list
1
2
3
4
5
6
|
xpending users group1
1) (integer) 1
2) "1743079174227-0"
3) "1743079174227-0"
4) 1) 1) "c1"
2) "1"
|
通过Redis实现消息队列替代阻塞队列
其实没必要这么实现(绝对不是因为我这边IDEA无法识别我写的函数才不实现的)
我们可以直接上正经的消息队列
通过RabbitMQ替代阻塞队列
RabbitMQ是个消息队列,文档非常完整好懂,可以通过文档学习.要使用RabbitMQ要先下载,这里我在ubuntu虚拟机上下载并使用这个消息队列
在 Ubuntu 上安装 RabbitMQ 通常需要以下步骤:
1. 更新系统包列表
在安装任何软件之前,建议先更新系统的包列表,以确保安装的软件是最新的版本。运行以下命令:
2. 安装 Erlang
RabbitMQ 是基于 Erlang 编程语言开发的,因此需要先安装 Erlang。可以使用以下命令安装 Erlang:
1
|
sudo apt install erlang
|
3. 添加 RabbitMQ 的官方仓库
为了获取最新版本的 RabbitMQ,建议添加 RabbitMQ 的官方仓库。运行以下命令:
1
2
3
|
sudo apt install apt-transport-https
wget -O- https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc | sudo apt-key add -
echo "deb https://dl.bintray.com/rabbitmq/debian $(lsb_release -sc) main" | sudo tee /etc/apt/sources.list.d/rabbitmq.list
|
4. 更新包列表
添加完官方仓库后,需要再次更新包列表:
5. 安装 RabbitMQ Server
现在可以安装 RabbitMQ 了。运行以下命令:
1
|
sudo apt install rabbitmq-server
|
6. 启动和启用 RabbitMQ 服务
安装完成后,RabbitMQ 服务应该会自动启动。可以通过以下命令检查服务状态:
1
|
sudo systemctl status rabbitmq-server
|
如果服务未启动,可以手动启动并设置开机自启:
1
2
|
sudo systemctl start rabbitmq-server
sudo systemctl enable rabbitmq-server
|
7. 配置 RabbitMQ(可选)
如果需要进行额外配置,例如设置用户、权限等,可以使用以下命令:
正式使用RabbitMQ
按照以上步骤下载完rabbitmq后,我们可以打开网址馆里rabbitmq
http://虚拟机的ip地址:15672
注!
- 如果你用的不是docker下载的rabbitmq,那么你虚拟机的ip地址会改变,下次连不上rabbitmq有可能是因为ip地址变了
rabbitmq管理界面长这样
你可以在主机上打开,也可以在虚拟机上打开,第一次登录只能在虚拟机上打开,因为有默认账号密码.输入账号:guest,密码:guest.然后进去后可以给自己之前用命令行创建的用户改权限

点击创建的用户,这样修改权限

这样你就可以用后端语言操控rabbitmq了!
然后我们就可以打开心爱的IDEA
引入maven坐标
先引入几个maven坐标
1
2
3
4
5
6
7
8
9
10
11
|
<!--rabbitMQ的maven坐标-->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-amqp</artifactId>
<version>2.2.18.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.2.18.RELEASE</version>
</dependency>
|
注意:springboot2.0版本只能用这套maven坐标,要用3.0版本maven坐标的话会出bug
配置rabbitmq
先写好yaml

然后这样写配置类,把rabbitmq交给IOC容器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
package com.hmdp.config;
import com.hmdp.properties.RabbitMQProperties;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
@Autowired
private RabbitMQProperties rabbitMQProperties;
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitMQProperties.getHost());
connectionFactory.setUsername(rabbitMQProperties.getUsername());
connectionFactory.setPassword(rabbitMQProperties.getPassword());
connectionFactory.setVirtualHost(rabbitMQProperties.getVirtualHost());
connectionFactory.setPort(rabbitMQProperties.getPort());
return connectionFactory;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
return new RabbitTemplate(connectionFactory);
}
@Bean
public Queue voucherOrderQueue() {
return new Queue("voucher_order_queue", true);
}
}
|
之前我们用的是阻塞队列,单独为消费者new了个线程来处理消息,现在我们可以创建一个消费者类,并且把对象交给IOC容器,让这个消费者持续处理信息
消费者类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
package com.hmdp.Consumer;
import com.hmdp.entity.VoucherOrder;
import com.hmdp.service.IVoucherOrderService;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;
@Component
public class VoucherOrderConsumer {
@Autowired
private IVoucherOrderService voucherOrderService;
@Resource
private RedissonClient redisson6379;
@Resource
private RedissonClient redisson6380;
@Resource
private RedissonClient redisson6381;
@Transactional
@RabbitListener(queues = "voucher_order_queue")
public void handle(VoucherOrder voucherOrder) throws InterruptedException {
//SimpleRedisLock lock=new SimpleRedisLock("order:"+userId,stringRedisTemplate);
RLock lock1=redisson6379.getLock("lock:order:"+voucherOrder.getUserId());
RLock lock2=redisson6380.getLock("lock:order:"+voucherOrder.getUserId());
RLock lock3=redisson6381.getLock("lock:order:"+voucherOrder.getUserId());
RLock lock=redisson6380.getMultiLock(lock1,lock2,lock3);
Boolean isLock1=lock.tryLock(1L, TimeUnit.SECONDS);
//Boolean isLock=lock.tryLock(1200L);
if(!isLock1){
return ;
}
try{
voucherOrderService.createVoucherOrder(voucherOrder);
}finally {
System.out.println("订单处理成功:" + voucherOrder.getId());
lock.unlock();
}
}
}
|
然后ServiceImpl类就成为了生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
@Override
public Result order(Long voucherId) {
//查询优惠券信息
SeckillVoucher seckillVoucher = seckillVoucherMapper.selectById(voucherId);
//判断秒杀是否开始
LocalDateTime timeEnd = seckillVoucher.getEndTime();
LocalDateTime timeBegin=seckillVoucher.getBeginTime();
if(timeBegin.isAfter(LocalDateTime.now())){
return Result.fail("秒杀活动暂未开始");
}else if (timeEnd.isBefore(LocalDateTime.now())){
return Result.fail("秒杀活动现已结束");
}
//前期判断
List<String> KEYS = new ArrayList<>();
//List<String> ARGS = new ArrayList<>();
KEYS.add("Inventory"+voucherId+":stock");
KEYS.add("Inventory"+voucherId+":set");
Long id=redisId.nextId("order");
Long result = stringRedisTemplate.execute(TICKET_SNATCHING_SCRIPT, KEYS, UserHolder.getUser().getId()+"");
if(result==1){
return Result.fail("没票了");
}else if (result==2){
return Result.fail("一个人不能抢多张票");
}
Long userId= UserHolder.getUser().getId();
VoucherOrder voucherOrder=new VoucherOrder();
voucherOrder.setVoucherId(voucherId);
voucherOrder.setUserId(userId);
voucherOrder.setCreateTime(LocalDateTime.now());
voucherOrder.setUpdateTime(LocalDateTime.now());
voucherOrder.setPayTime(LocalDateTime.now());
voucherOrder.setStatus(1);
voucherOrder.setId(id);
// 使用 RabbitMQ 发送消息
rabbitTemplate.convertAndSend("voucher_order_queue", voucherOrder);
//queue.add(voucherOrder);
//获取代理对象
proxy=(IVoucherOrderService) AopContext.currentProxy();
return Result.ok("下单成功"+id);
}
|
这样我们就使用了rabbitmq作为消息队列完成了异步秒杀.这只是rabbitmq的一点实力,后面我可能会专开一个坑来学rabbitmq
测试类更新
简单更新了一下测试类
把token一键写入.txt文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
@Test
public void test1() {
// 指定文件路径
String filePath = "C:\\Users\\86180\\Desktop\\Test\\秒杀订单TOKEN数据.txt";
File file = new File(filePath);
// 如果文件不存在则创建文件夹和文件
if (!file.getParentFile().exists()) {
file.getParentFile().mkdirs();
}
// 如果文件存在,先清空文件内容
try (BufferedWriter writer = new BufferedWriter(new FileWriter(file))) {
writer.write(""); // 清空文件内容
} catch (IOException e) {
e.printStackTrace();
}
// 写入新的 token 数据
try (BufferedWriter writer = new BufferedWriter(new FileWriter(file, true))) {
for (Integer i = 1; i < NUMBER_OF_TOKEN + 1; i++) {
Map<String, String> map = new HashMap<>();
map.put("nickName", "");
map.put("icon", "");
map.put("id", i.toString());
String token = UUID.randomUUID().toString();
stringRedisTemplate.opsForHash().putAll(RedisConstants.LOGIN_USER_KEY + token, map);
stringRedisTemplate.expire(RedisConstants.LOGIN_USER_KEY + token, RedisConstants.CACHE_SHOP_TTL, TimeUnit.MINUTES);
// 将 token 写入文件
writer.write(token);
writer.newLine(); // 换行
}
} catch (IOException e) {
e.printStackTrace();
}
}
|
一键刷新测试数据,让我们和JMETER再抢一千次优惠券吧!!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
package com.hmdp;
import com.hmdp.mapper.SeckillVoucherMapper;
import com.hmdp.service.impl.VoucherOrderServiceImpl;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.core.StringRedisTemplate;
@SpringBootTest
public class RefreshData {
private static final Long VOUCHER_ID=16L;
private static final Long STOCK=1000L;
@Autowired
private VoucherOrderServiceImpl voucherOrderService;
@Autowired
private SeckillVoucherMapper seckillVoucherMapper;
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Test
public void test() {
//删除所有订单
voucherOrderService.deleteByVoucherId(VOUCHER_ID);
//将剩余票数刷新为指定数值
seckillVoucherMapper.updateByVoucherId(VOUCHER_ID,STOCK);
//同步修改redis的剩余票数
stringRedisTemplate.opsForValue().set("Inventory"+VOUCHER_ID+":stock",STOCK+"");
//删除redis的set
stringRedisTemplate.delete("Inventory"+VOUCHER_ID+":set");
}
}
|
点赞功能
问题分析
点赞要求我们第一次点上去是点赞数+1,再点一次是点赞数-1,这完全可以交给数据库去解决,然后就是
关注与互关
TODO明天写完