Redis学习与实战

Redis实战篇

Redis学习与实战

缓存穿透

  • 缓存穿透是指客户端请求的数据,Redis和Mysql里面都没有,缓存永远不会生效,如果客户端一直请求相同的id,请求就会一直到达数据库,给数据库上压力了.

解决方案1:缓存空对象

  • 缓存空对象:如果客户端请求找不到的数据,就把找不到的数据缓存到Redis里,并且设置过期时间,在一定时间内,客户端的空对象请求不会经过Mysql

  • 缺点:有额外内存消耗,如果管理端新增对象和空对象id相同,可能造成缓存与数据库内容不一致

解决缓存穿透的业务逻辑:

对应的java逻辑代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Override
    public Result getByIdRedis(Long id) {
        String shop= stringRedisTemplate.opsForValue().get(RedisConstants.CACHE_SHOP_KEY+id);
        //查询到对应的value,并且这个value不是"",则店铺存在,直接返回
        if(shop!=null&&!"".equals(shop)){
            Shop shopRedis= JSONUtil.toBean(shop,Shop.class);
            return Result.ok(shopRedis);
        }
        //店铺value不等于null,但是为"",说明缓存和数据库中都没有,直接返回不存在信息
        if(shop!=null){
            return Result.fail("店铺信息不存在");
        }
        Shop shop1= getById(id);
        //数据没查到,说明这个店铺id不存在,这次缓存穿透了,但是可以把空信息写入redis
        if(shop1==null){
            stringRedisTemplate.opsForValue().set(RedisConstants.CACHE_SHOP_KEY+id,"",RedisConstants.CACHE_SHOP_TTL, TimeUnit.MINUTES);
            return Result.fail("店铺信息不存在");
        }
        String json= JSON.toJSONString(shop1);
        //查询到店铺,返回店铺信息,并且把信息写入Redis
        if(shop1!=null){
            stringRedisTemplate.opsForValue().set(RedisConstants.CACHE_SHOP_KEY+id,json,RedisConstants.CACHE_SHOP_TTL, TimeUnit.MINUTES);
            return Result.ok(shop1);
        }
        return Result.fail("店铺信息不存在");
    }

解决方案2:布隆过滤

缓存穿透后,穿透的信息进入布隆过滤器,如果再进行查询,先查询布隆过滤器,如果这个id是穿透信息,就直接拒绝查询

其他解决方案:

  • 做好热点参数的限流

  • 加强用户权限校验

缓存雪崩

  • 缓存雪崩是指大量数据同时到期,或者Redis服务直接宕机,大量请求涌入Mysql)

简单解决办法

  • 把数据存储进Redis的时候直接随机过期时间存储

    java代码如下(其实就是生成随机数)

    1
    2
    3
    4
    5
    
    private Long cacheAvalanche(){
            Random random=new Random();
            Long number=random.nextInt(11)+20L;
            return number;
        }
    

其他高级解决方案:

  • Redis集群,分布式部署

  • 给缓存业务添加降级限流策略,如果redis集体驾崩,就直接拒绝大量请求,防止MySQL数据库压力过大

  • 给业务添加多级缓存

缓存击穿

一个热点数据过期,大量线程同时访问,每个线程都选择查询完Redis后查询数据库,导致数据库压力剧增

解决方案1:互斥锁

  • 使用Redis的setnx作为互斥条件,所有线程同时设置一个键值对,只有一个线程可以设置成功,并且操作数据库,写入缓存,写完后释放锁资源

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    
    //设置锁和释放锁的方法
    private Boolean setLock(String key){
            Boolean flag= stringRedisTemplate.opsForValue().setIfAbsent(key,"1",10,TimeUnit.SECONDS);
            if(flag==null){
                return false;
            }else if(flag){
                return true;
            }
            return false;
        }
        private void unLock(String key){
            stringRedisTemplate.delete(key);
        }
    

互斥锁解决缓存穿透的代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Override
    public Result getByIdRedis(Long id) throws InterruptedException {
        String shop= stringRedisTemplate.opsForValue().get(RedisConstants.CACHE_SHOP_KEY+id);
        //查询到对应的value,并且这个value不是"",则店铺存在,直接返回
        if(shop!=null&&!"".equals(shop)){
            Shop shopRedis= JSONUtil.toBean(shop,Shop.class);
            return Result.ok(shopRedis);
        }
        //店铺value不等于null,但是为"",说明缓存和数据库中都没有,直接返回不存在信息
        if(shop!=null){
            return Result.fail("店铺信息不存在");
        }
        //所有线程同时去抢夺互斥锁资源,只会有一个线程抢到
        if(setLock(RedisConstants.LOCK_SHOP_KEY+id)){
            Shop shop1= getById(id);

            //数据没查到,说明这个店铺id不存在,这次缓存穿透了,但是可以把空信息写入redis
            if(shop1==null){
                stringRedisTemplate.opsForValue().set(RedisConstants.CACHE_SHOP_KEY+id,"",RedisConstants.CACHE_SHOP_TTL, TimeUnit.MINUTES);
                return Result.fail("店铺信息不存在");
            }
            String json= JSON.toJSONString(shop1);
            //查询到店铺,返回店铺信息,并且把信息写入Redis
            if(shop1!=null){
                stringRedisTemplate.opsForValue().set(RedisConstants.CACHE_SHOP_KEY+id,json,RedisConstants.CACHE_SHOP_TTL, TimeUnit.MINUTES);
                return Result.ok(shop1);
            }
            //所有工作做完后释放锁资源
            unLock(RedisConstants.LOCK_SHOP_KEY+id);
        //其他线程休眠对应的时间后重新尝试获取资源(递归)
        }else {
            Thread.sleep(50);
            getByIdRedis(id);
        }
        return Result.fail("店铺信息不存在");
    }

其中为防止获得锁的线程挂了不会释放锁资源,给锁设置过期时间

解决方案2:逻辑过期时间

  • 在Redis中存储数据时,多存储一条过期时间,如果过期的话,最快发现的线程会主动申请互斥锁,并且查询数据库,查完后设置过期时间并且写回redis,同时返回给客户端,其他的线程请求完锁后,请求不到就直接返回过期的数据,这种方式可以防止死锁的发生,但是牺牲了一部分redis空间

java代码如下

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
@Override
    public Result redisLogicExpireTime(Long id){
        String shop= stringRedisTemplate.opsForValue().get(RedisConstants.CACHE_SHOP_KEY+id);
        logger.info(shop);
        //查询到对应的value,并且这个value不是"",则店铺存在,直接返回
        if(shop!=null&&!"".equals(shop)){
            RedisData redisData=JSONUtil.toBean(shop,RedisData.class);
            //判断是否过期
            if(redisData.getExpireTime().isAfter(LocalDateTime.now())){
                Shop shopRedis= redisData.getData();
                System.out.println(shopRedis);
                logger.info("查询redis直接输出");
                return Result.ok(shopRedis);
            }
        }
        //店铺value不等于null,但是为"",说明缓存和数据库中都没有,直接返回不存在信息
        if(shop!=null&&shop.equals("")){
            return Result.fail("店铺信息不存在");
        }
        //所有线程同时去抢夺互斥锁资源,只会有一个线程抢到
        if(setLock(RedisConstants.LOCK_SHOP_KEY+id)){
            Shop shop1= getById(id);
            //数据没查到,说明这个店铺id不存在,这次缓存穿透了,但是可以把空信息写入redis
            if(shop1==null){
                stringRedisTemplate.opsForValue().set(RedisConstants.CACHE_SHOP_KEY+id,"",cacheAvalanche(), TimeUnit.MINUTES);
                return Result.fail("店铺信息不存在");
            }
            RedisData redisData=new RedisData();
            redisData.setData(shop1);
            redisData.setExpireTime(LocalDateTime.now().plusMinutes(cacheAvalanche()));
            String json= JSON.toJSONString(redisData);
            //查询到店铺,返回店铺信息,并且把信息写入Redis
            if(shop1!=null){
                stringRedisTemplate.opsForValue().set(RedisConstants.CACHE_SHOP_KEY+id,json,cacheAvalanche(), TimeUnit.MINUTES);
                return Result.ok(shop1);
            }
            //所有工作做完后释放锁资源
            unLock(RedisConstants.LOCK_SHOP_KEY+id);
            //其他线程直接返回过期的数据
        }else {
            RedisData redisData=new RedisData();
            redisData.setData(JSONUtil.toBean(shop,Shop.class));
            Shop redis=redisData.getData();
            return Result.ok(redis);
        }
        return Result.fail("店铺信息不存在");
    }

订单秒杀问题

限量限时商品可能会在短时间内面临大量请求,可能会出现超卖的情况

如果第一次遇到1的时候没来得及写到数据库里,后面的线程查询的时候遇到的还是1,就可以继续执行扣减的操作,导致超卖的情况发生

JMeter测试结果

200次线程请求直接超卖了9个商品

解决方案1:乐观锁

• 假设:乐观锁假设冲突发生的概率很小,允许多个事务同时操作数据,但在提交时检查是否有其他事务修改了数据。

• 实现:通常通过版本号(version)或时间戳(timestamp)实现。在更新数据时,比较当前版本号与数据库中的版本号,如果一致则更新并增加版本号;如果不一致,则说明数据已被其他事务修改,需要重新获取数据并重试。

• 适用场景:适用于读多写少的场景,或者数据竞争不激烈的情况下。

Java实现

1
2
3
4
5
6
7
8
Boolean success=seckillVoucherServiceImpl.update()
                    .setSql("stock=stock-1")
                    .eq("voucher_id",voucherId)
                    .eq("stock", seckillVoucher.getStock())
                    .update();
            if(!success){
                return Result.fail("库存不足");
            }

在写入数据库之前,先查询数据库目前的值和之前查询到的是否一样,是否在写入之前被修改了,如果被修改了就不能写入

JMeter测试结果

可以发现成功率低的可怜,200次请求只抢到了21张票,原因就是每次写入都要查询到之前是否已经写过,请求频率太高,导致不写入的概率也更高,写入越多的情况越不能用乐观锁

解决方案2:悲观锁

假设:悲观锁假设会发生冲突,即多个事务会同时修改同一数据,因此它在操作开始时就锁定数据,防止其他事务修改。

实现:通常通过数据库的锁机制实现,如行锁、表锁等。

适用场景:适用于写操作频繁的场景,或者数据竞争非常激烈的情况下。

只要在每次写入更新结果之前先查看一下剩余的量是不是大于0就可以了

Java代码

1
2
3
4
5
6
7
8
Boolean success=seckillVoucherServiceImpl.update()
                    .setSql("stock=stock-1")
                    .eq("voucher_id",voucherId)
                    .gt("stock", 0)
                    .update();
            if(!success){
                return Result.fail("库存不足");
            }

JMeter测试结果

200次请求,只卖出100张,异常比例正确

单人订单问题

有些商品给购买者限量,比如买火车票或者限定周边,如果一个黄牛用脚本在短时间大量请求,则有可能会多卖

简单解决

1
2
3
4
List<VoucherOrder> list=voucherOrderMapper.selectByUserId(voucherId,UserHolder.getUser().getId());
        if(!list.isEmpty()){
            return Result.fail("该用户已下过单");
        }

JMeter测试结果显示,200个请求同时下单,一共能购买10张票

解决方案:共享锁

在 Java 中, synchronized是一个关键字,用于控制对共享资源的访问,确保在同一时刻只有一个线程可以访问特定的代码块或方法。这是实现线程同步的一种方式,主要用于解决多线程环境下的并发问题。

我们可以把订单秒杀的代码先抽离出来

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Transactional(rollbackFor = Exception.class)
    public Result createVoucherOrder(Long voucherId) {
        //开始的话看库存够不够,够就库存减一,并且创建订单
        List<VoucherOrder> list=voucherOrderMapper.selectByUserId(voucherId,UserHolder.getUser().getId());
        if(!list.isEmpty()){
            return Result.fail("该用户已下过单");
        }
        Boolean success=seckillVoucherServiceImpl.update()
                .setSql("stock=stock-1")
                .eq("voucher_id", voucherId)
                .gt("stock", 0)
                .update();
        if(!success){
            return Result.fail("库存不足");
        }
        Long id= UserHolder.getUser().getId();
        VoucherOrder voucherOrder=new VoucherOrder();
        voucherOrder.setVoucherId(voucherId);
        voucherOrder.setUserId(id);
        voucherOrder.setCreateTime(LocalDateTime.now());
        voucherOrder.setUpdateTime(LocalDateTime.now());
        voucherOrder.setPayTime(LocalDateTime.now());
        voucherOrder.setStatus(1);
        voucherOrder.setId(redisId.nextId("order"));
        voucherOrderMapper.insert(voucherOrder);
        return Result.ok(voucherOrder.getVoucherId() + "下单成功");
    }

如果是对整个函数加锁,也就是在public后面,那么不是同一个用户也会被锁给拦截,性能不高

或者可以给锁限定userId,如果同一id就被拦截,串行进行

1
2
Long userId =UserHolder.getUser().getId();
        synchronized (userId.toString().intern())

这里intern的作用是,toString会导致产生新的字符串对象,字符串对象虽然值是相同的,但是哈希值不一样,被锁认为是不同的对象,这时用intern可以从字符串池里找相同的串,哈希值相同

锁应当在事务结束之后再释放才行,否则又会产生冲突,事务还没结束,有个线程又进来了,会引发异常,因此应当把锁放到整个方法外面

1
2
3
4
Long userId =UserHolder.getUser().getId();
        synchronized (userId.toString().intern()) {
            return createVoucherOrder(voucherId);
        }

JMeter测试结果

200次买票,只买到1次,解决了单人买票的问题

上述写法依然有问题

Java魅力时刻

1
2
3
4
Long userId =UserHolder.getUser().getId();
        synchronized (userId.toString().intern()) {
            return createVoucherOrder(voucherId);
        }

注意到这里的return createVoucherOrder(voucherId);是目标对象引用的函数,相当于

return this.createVoucherOrder(voucherId);但是只有代理对象才有事务管理的功能,代理对象就是加上@Controller,@Service,@Mapper,@Component,@Bean的对象,这个对象协助目标对象完成工作.

要想在这个对象里面调用代理对象可以通过如下办法

1
2
3
4
5
Long userId =UserHolder.getUser().getId();
        synchronized (userId.toString().intern()) {
            IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
            return proxy.createVoucherOrder(voucherId);
        }

AopContext.currentProxy()获取本类的代理对象,然后从Object转成本类的对象,然后调用对应的方法,要在接口里面重新声明这个方法

同时要引入依赖

1
2
3
4
5
<dependency>
            <groupId>org.aspectj</groupId>
            <artifactId>aspectjweaver</artifactId>
            <version>1.9.22.1</version>
        </dependency>

在springboot启动类上加注解

1
@EnableAspectJAutoProxy(exposeProxy = true)

允许SpringIOC容器暴露代理对象,这样我们才能正常获取代理对象

Redis实现分布式锁

如果现在同时开两个进程,服务器集群部署,由nginx实现负载均衡,实现轮询发送请求,先给8081端口发送,再给8082端口,导致进程内的锁无法和另一个进程的锁联动

这时可以使用伟大的Redis制作分布式锁来解决这个问题!

分布式锁的设计

  • 这个锁应当起到互斥作用,很多个线程同时发送过来,只能有一个线程获取锁资源,因此得用setnx.

  • 在此线程结束运行的时候应当及时释放锁资源,防止服务器资源浪费,及时del key

  • 如果一个服务器在发送完这个请求后就宕机了,不会执行这个释放锁资源的代码,那么锁资源就不会被释放,导致其他服务器资源浪费.这时候就要给锁设置过期时间,到时间自动释放锁资源

  • 如果在还没执行expire time的时候服务器就宕机了,那么锁资源一样不会被释放,这时候就得这么写获取锁的语句

  • 1
    
    set lock thread1 nx ex 10
    
  • 保持了原子性,让互斥和过期时间一起设置

  • 同时采用非阻塞的锁,防止很多线程一直等待锁资源释放,尝试一次,如果没获取锁资源就return false,成功就return true

Java代码实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package com.hmdp.utils;

import org.springframework.data.redis.core.StringRedisTemplate;

import java.util.concurrent.TimeUnit;
public class SimpleRedisLock implements ILock {
    private StringRedisTemplate stringRedisTemplate;
    private static final String LOCK_PREFIX = "lock:";
    private String lockName;
    public SimpleRedisLock(String lockName,StringRedisTemplate stringRedisTemplate) {
        this.lockName = lockName;
        this.stringRedisTemplate = stringRedisTemplate;
    }
    @Override
    public Boolean tryLock(Long timeoutSec) {
        Long threadId = Thread.currentThread().getId();
        Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(LOCK_PREFIX+lockName,threadId+"",timeoutSec, TimeUnit.SECONDS);
        //这样写的话,是true就返回true,如果是false或者null都返回false
        return Boolean.TRUE.equals(result);
    }

    @Override
    public void unlock() {
        stringRedisTemplate.delete(LOCK_PREFIX+lockName);
    }
}

将分布式锁加入业务逻辑

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
Long userId =UserHolder.getUser().getId();
        SimpleRedisLock lock=new SimpleRedisLock("order:"+userId,stringRedisTemplate);
        Boolean isLock=lock.tryLock(1200L);
        if(!isLock){
            return Result.fail("一人只能买一张票");
        }
        try{
            IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
            return proxy.createVoucherOrder(voucherId);
        }finally {
            lock.unlock();
        }

分布式锁误删问题:

  • 由于业务阻塞,导致线程1获取锁后没有及时释放锁资源,锁自动释放,线程2请求锁成功,开始执行业务逻辑

  • 这时候线程1完成业务,执行释放锁的指令,导致业务2的锁被意外删除,以此类推,锁会被意外删除.

  • 改进办法很简单,只要每次执行删除锁之前先查询锁的线程是否是自己的,是自己的就可以删,不是自己的就跳过.

Java代码改进

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.hmdp.utils;

import org.springframework.data.redis.core.StringRedisTemplate;

import java.util.UUID;
import java.util.concurrent.TimeUnit;
public class SimpleRedisLock implements ILock {
    private StringRedisTemplate stringRedisTemplate;
    private static final String LOCK_PREFIX = "lock:";
    private static final String ID_PREFIX = UUID.randomUUID().toString();
    private String lockName;
    public SimpleRedisLock(String lockName,StringRedisTemplate stringRedisTemplate) {
        this.lockName = lockName;
        this.stringRedisTemplate = stringRedisTemplate;
    }
    @Override
    public Boolean tryLock(Long timeoutSec) {
        String threadId =ID_PREFIX+ Thread.currentThread().getId();
        Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(LOCK_PREFIX+lockName,threadId,timeoutSec, TimeUnit.SECONDS);
        //这样写的话,是true就返回true,如果是false或者null都返回false
        return Boolean.TRUE.equals(result);
    }

    @Override
    public void unlock() {
        String UnlockName= stringRedisTemplate.opsForValue().get(LOCK_PREFIX+lockName);
        String threadId =ID_PREFIX+ Thread.currentThread().getId();
        if(threadId.equals(UnlockName)){
            stringRedisTemplate.delete(LOCK_PREFIX+lockName);
        }

    }
}

对程序打断点调试,获取锁后拦截,当我修改这里的ThreadId并且重新放行后,这里的新锁并没有被删掉,解决了误删的问题.

Lua脚本

当我们使用最新的分布式锁的时候,如果在执行finally语句里面的代码时,遭遇JVM进行垃圾回收,这时候会遇到无法战胜的业务阻塞,,代码还是没有做到原子性.如果已经验证完这个线程对应这个锁后突然垃圾回收,那么就会导致del锁这个操作会很危险.

这时候可以把整个unlock操作用Lua脚本完成

在Redis客户端使用lua脚本

无参脚本

1
eval "return redis.call('set','name','jack')" 0

语句的意思是使用脚本,脚本是一个字符串,就是引号里面的,脚本相当于set name jack 0代表没有参数

有参脚本

1
eval "return redis.call('set',KEYS[1],ARGV[1])" 1 dinglz sb

用KEYS[1]和ARGV[1]作为占位符,后面有一对参数

使用lua脚本解决删除锁问题

1
2
3
4
5
6
7
8
--查询锁的id
local id=redis.call('get',KEYS[1])
if(ARGV[1]==id) then
    --释放锁
    return redis.call('del',KEYS[1])
end
--不是自己的锁,不用释放锁
return 0

Java代码调用Lua脚本

  • 把lua脚本创建在这个目录下,等下方便读取

  • 重写unlock方法
1
2
3
4
5
6
7
8
9
@Override
    public void unlock() {
        List list = new ArrayList();
        list.add(LOCK_PREFIX+lockName);
        stringRedisTemplate.execute(
                UNLOCK_SCRIPT,
                list,
                ID_PREFIX+ Thread.currentThread().getId());
    }
  • 向redis传输指令,显示脚本,然后是参数,KEYS[1]参数要用集合封装,第二个参数是线程id也就是ARGV[1]

  • 同时要配置好脚本

1
2
3
4
5
6
private static DefaultRedisScript<Long> UNLOCK_SCRIPT;
    static {
        UNLOCK_SCRIPT = new DefaultRedisScript<>();
        UNLOCK_SCRIPT.setLocation(new ClassPathResource("unLock.lua"));
        UNLOCK_SCRIPT.setResultType(Long.class);
    }

设置文件位置,然后设置返回格式,返回值为0表示删除锁失败,1表示删除锁成功

Redisson

Redisson是一个封装好的分布式锁工具,这里面的锁是已经写好的,并且比前文介绍的锁多一些功能和奇效

引入依赖

1
2
3
4
5
<dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.16.0</version>
        </dependency>

配置Redisson

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
package com.hmdp.config;

import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RedissonConfig {
    @Bean
    public RedissonClient redisson() {
        Config config = new Config();
        config.useSingleServer().setAddress("redis://Yourip:Yourport").setPassword("Yourpassword");
        return Redisson.create(config);
    }
}

@Configuration注解

  • 这里顺带提一嘴Springboot框架面试高频考点,@Configuration是什么,和@Component有啥区别.

@Configuration是配置类要添加的注解,它的构成是

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Component
public @interface Configuration {
    @AliasFor(
        annotation = Component.class
    )
    String value() default "";

    boolean proxyBeanMethods() default true;
}
  • 这里面也有@Component,但是它比前者多实现单例模式

  • 如果你在创建Bean对象的时候一次性创建多个,Spring容器并不会去对象池去找是否有已经创建过的对象,而是直接再创建,这样无法保证单例性

  • 而如果用@Configuration就可以只创建一次Bean对象

使用Redisson制作分布式锁

1
2
3
        //SimpleRedisLock lock=new SimpleRedisLock("order:"+userId,stringRedisTemplate);
        Boolean isLock1=redissonClient.getLock("order"+userId).tryLock(1L, TimeUnit.SECONDS);
        //Boolean isLock=lock.tryLock(1200L);

中间那一行代码就等价于之前写的SimpleRedisLock

Redisson解决不可重入

不可重入导致死锁

如果一个线程请求锁资源,然后又请求了一次锁资源,第二次请求会失败,因为已经有线程获取过锁,并且就是自身,这导致了这个线程请求不到锁,也释放不了锁,导致了死锁.

解决方案

于是Redisson改变了锁的结构,让锁的数据结构变为Hash,key存储锁的名称,field存储线程名称,value存储该锁被同一个线程调用了几次,每次调用会给value自增,如果释放锁资源就value自减一次,如果value==0,那么就删除这个锁资源.

Redisson解决不可重试

不可重试导致大量请求失败

正常线程如果获取锁失败就直接返回false了,或者说一直循环递归等待下去,导致了大量请求无法返回客户需要的内容

解决方案

1
Boolean isLock1=redissonClient.getLock("order"+userId).tryLock(1L, TimeUnit.SECONDS);

注意到我们之前在设置tryLock的时候设置了尝试时间1L,单位是秒.这个意思是我可以总共等待1秒,如果这1秒内获取到锁资源,就返回true,如果没请求到就返回false,当然这个1L可以改为任何数值.

Redisson的tryLock原码

1
2
3
4
5
6
7
long time = unit.toMillis(waitTime);
        long current = System.currentTimeMillis();
        long threadId = Thread.currentThread().getId();
        Long ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
        if (ttl == null) {
            return true;
        }

ttl是剩余等待时间,后面的tryAcquire是看是否获取锁成功,如果获取锁成功就返回null,如果获取成功就返回剩余等待时间.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
else {
                current = System.currentTimeMillis();
                RFuture<RedissonLockEntry> subscribeFuture = this.subscribe(threadId);
                if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
                    if (!subscribeFuture.cancel(false)) {
                        subscribeFuture.onComplete((res, e) -> {
                            if (e == null) {
                                this.unsubscribe(subscribeFuture, threadId);
                            }

                        });
                    }

                    this.acquireFailed(waitTime, unit, threadId);
                    return false;
                } else {
                    boolean var16;
                    try {
                        time -= System.currentTimeMillis() - current;
                        if (time <= 0L) {
                            this.acquireFailed(waitTime, unit, threadId);
                            boolean var20 = false;
                            return var20;
                        }

                        do {
                            long currentTime = System.currentTimeMillis();
                            ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
                            if (ttl == null) {
                                var16 = true;
                                return var16;
                            }

                            time -= System.currentTimeMillis() - currentTime;
                            if (time <= 0L) {
                                this.acquireFailed(waitTime, unit, threadId);
                                var16 = false;
                                return var16;
                            }

                            currentTime = System.currentTimeMillis();
                            if (ttl >= 0L && ttl < time) {
                                ((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                            } else {
                                ((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                            }

                            time -= System.currentTimeMillis() - currentTime;
                        } while(time > 0L);

                        this.acquireFailed(waitTime, unit, threadId);
                        var16 = false;
                    } finally {
                        this.unsubscribe(subscribeFuture, threadId);
                    }

                    return var16;
                }
            }

主要看else中的原码,如果获取锁失败,并且要进入等待,RFuture subscribeFuture = this.subscribe(threadId)指的是这个线程订阅了请求的锁的信息,如果锁被释放了,这个线程就会收到信息,这正是观察者模式.但是这个等待并不是无限制的

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
RFuture<RedissonLockEntry> subscribeFuture = this.subscribe(threadId);
                if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
                    if (!subscribeFuture.cancel(false)) {
                        subscribeFuture.onComplete((res, e) -> {
                            if (e == null) {
                                this.unsubscribe(subscribeFuture, threadId);
                            }

                        });
                    }

                    this.acquireFailed(waitTime, unit, threadId);
                    return false;
                }
  • 如果时间到达上限了,直接返回false,并且取消订阅.

  • 如果时间没到上限,就再尝试获取锁,没获取到就继续订阅,因为很多线程同时请求过来,得按顺序获取锁.直到获取到锁或者直接到时间才结束这个循环.

Redis本地集群部署

只要改一下端口号就能实现本地的Redis集群

在config文件下修改端口号

用记事本或者vscode打开都可以,然后按下Ctrl+F搜索6379,下面的端口号改成没设置过的端口号,但也不要跟其他进程冲突了,我这里改为了6381

然后分别启动Redis就实现Redis集群了

Redisson实现分布式联锁

1
2
3
4
RLock lock1=redisson6379.getLock("lock:order:"+userId);
        RLock lock2=redisson6380.getLock("lock:order:"+userId);
        RLock lock3=redisson6381.getLock("lock:order:"+userId);
        RLock lock=redisson6380.getMultiLock(lock1,lock2,lock3);
  • 使用随机一个redisson客户端来调用它的.getMultiLock方法就可以制作出联锁

  • 联锁就可以在Redis集群的情况下,所有Redis共有一把锁,以免出现重复申请锁资源的情况

  • getMultiLock会尝试同时获取所有指定的锁,只有当所有锁都成功获取时,才算加锁成功。如果任何一个锁获取失败,它会回滚已经获取的锁,确保加锁操作的原子性。

  • 通过将多个锁的获取和释放封装在一起,getMultiLock简化了在复杂业务场景下的并发控制逻辑,减少了开发人员在处理多个锁时的错误风险。

其中redisson.getMultilock使用任意一个对象都可以的原因是,这里是统一新创建

1
2
3
public RLock getMultiLock(RLock... locks) {
        return new RedissonMultiLock(locks);
    }

通过这样制作联锁,可以让每一个Redis都有lock

异步秒杀问题

将超卖和判重用Redis解决

把卖票过程比喻成厨子做饭,如果饭店只有一个人,那么他自己得招待顾客,还得自己下厨,但是这时候多来了个服务员,服务员负责招待顾客下单,厨子只是来做菜,卖票过程就是用Redis作为先手,把订单都收到手了,交给后端再交给数据库来异步处理这些订单.

JMeter测试数据时,随着时间请求量会增长,因此可能会面对短时间超高并发,并且现有的代码还有很多查询MySQL的操作,比如:

  • 查询优惠券剩余数量,查询stock

  • 顾客是否已经买过一张票,查询是否有这个订单

于是我们可以通过改变订单流程来解决

在商家添加优惠券信息的时候就把优惠券的信息和数量同步到Redis

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
@Override
    @Transactional
    public void addSeckillVoucher(Voucher voucher) {
        // 保存优惠券
        save(voucher);
        // 保存秒杀信息
        SeckillVoucher seckillVoucher = new SeckillVoucher();
        seckillVoucher.setVoucherId(voucher.getId());
        seckillVoucher.setStock(voucher.getStock());
        seckillVoucher.setBeginTime(voucher.getBeginTime());
        seckillVoucher.setEndTime(voucher.getEndTime());
        seckillVoucherService.save(seckillVoucher);
        stringRedisTemplate.opsForValue().set("Inventory"+voucher.getId()+":stock",voucher.getStock().toString());
    }

然后在请求到达后端时,先查询Redis,判断优惠券数量是否够,这个用户是否下过单了

lua脚本

  • 由于一次性操作Redis次数过多,直接使用多条java语句没有原子性,因此要用lua脚本
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
local stock=redis.call('get',KEYS[1])
if(tonumber(stock)<1) then
    return 1
end
local success = redis.call('SADD', KEYS[2], ARGV[1])
if(success==0) then
    return 2
end
redis.call('decr',KEYS[1])
return 0

由于KEYS传过来时必须得是String类型,而只有number才能比较大小,因此先转化为数字,如果票没了就返回1.

Redis的set集合

Redis的set集合在插入的时候,要判重,如果重复就不能插入,返回0,如果插入成功就返回1,故可以作为锁.

多人秒杀的存储效果是这样的

然后就可以写代码调用脚本了

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
//前期判断
        List<String> KEYS = new ArrayList<>();
        //List<String> ARGS = new ArrayList<>();
        KEYS.add("Inventory"+voucherId+":stock");
        KEYS.add("Inventory"+voucherId+":set");
        Long result = stringRedisTemplate.execute(TICKET_SNATCHING_SCRIPT, KEYS, UserHolder.getUser().getId()+"");
        if(result==1){
            return Result.fail("没票了");
        }else if (result==2){
            return Result.fail("一个人不能抢多张票");
        }

后端接受到可以下单的信息后制作订单

1
2
3
4
5
6
7
8
9
Long id= UserHolder.getUser().getId();
        VoucherOrder voucherOrder=new VoucherOrder();
        voucherOrder.setVoucherId(voucherId);
        voucherOrder.setUserId(id);
        voucherOrder.setCreateTime(LocalDateTime.now());
        voucherOrder.setUpdateTime(LocalDateTime.now());
        voucherOrder.setPayTime(LocalDateTime.now());
        voucherOrder.setStatus(1);
        voucherOrder.setId(redisId.nextId("order"));

然后把订单数据交给阻塞队列来异步执行,这里要用java自带的阻塞队列,然后多创建一个线程来解决这些下单问题

1
2
3
4
5
6
7
8
9
//创建阻塞队列
    private BlockingQueue<VoucherOrder> queue=new ArrayBlockingQueue<VoucherOrder>(1024*1024);
    //创建线程池
    private static final ExecutorService SECKILL_ORDER_EXECUTOR= Executors.newSingleThreadExecutor(                    );
    //在启动类初始化完就马上把
    @PostConstruct
    private void init(){
        SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
    }

@PostConstruct private void init(){ SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler()); }着重讲一下

  1. @PostConstruct 注解

    • 当类的实例被创建后,Spring容器会自动调用这个方法。

    • 通常用于执行一些初始化操作,比如启动线程、加载配置等。

  2. SECKILL_ORDER_EXECUTOR

    • 它的作用是管理线程的生命周期,避免频繁创建和销毁线程的开销。
  3. submit(new VoucherOrderHandler())

    • submit方法将一个任务提交到线程池中执行。

    • VoucherOrderHandler实现了Runnable接口的类

    • 这里创建了一个VoucherOrderHandler实例,并将其提交到线程池中,线程池会选择一个空闲线程来执行这个任务。

这是线程池的任务

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
//给线程池分配任务
    private class VoucherOrderHandler implements Runnable{
        @Override
        public void run() {
            while(true){
                try{
                    VoucherOrder voucherOrder=queue.take();
                    voucherHandler(voucherOrder);
                }catch (Exception e){
                    System.out.println("订单处理异常"+e.getMessage());
                }
            }
        }
    }

在队列中有元素的时候,子线程取出voucherOrder对象,然后解决下单问题

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
private void voucherHandler(VoucherOrder voucherOrder) throws InterruptedException {
        Long userId=voucherOrder.getUserId();
        //SimpleRedisLock lock=new SimpleRedisLock("order:"+userId,stringRedisTemplate);
        RLock lock1=redisson6379.getLock("lock:order:"+userId);
        RLock lock2=redisson6380.getLock("lock:order:"+userId);
        RLock lock3=redisson6381.getLock("lock:order:"+userId);
        RLock lock=redisson6380.getMultiLock(lock1,lock2,lock3);
        Boolean isLock1=lock.tryLock(1L, TimeUnit.SECONDS);
        //Boolean isLock=lock.tryLock(1200L);
        if(!isLock1){
            return ;
        }
        try{
            proxy.createVoucherOrder(voucherOrder);
        }finally {
            System.out.println("id为"+userId+"的顾客买到票了");
            lock.unlock();
        }
    }

注意到这是个子线程的任务,子线程的任务无法获得代理对象,也就是SpringBoot的IOC容器管理的对象,但是我们需要事务回滚,所以还是得用代理对象,所以只能把代理对象作为私有变量,然后在类的方法中对这个私有代理对象进行赋值

1
2
3
4
5
6
7
8
private IVoucherOrderService proxy;
@Override
    public Result order(Long voucherId) {
        ...
        //获取代理对象
        proxy=(IVoucherOrderService) AopContext.currentProxy();
        ...
    }

然后改进createVoucherOrde方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
@Transactional(rollbackFor = Exception.class)
    public void createVoucherOrder(VoucherOrder voucherOrder) {
        //开始的话看库存够不够,够就库存减一,并且创建订单
        List<VoucherOrder> list=voucherOrderMapper.selectByUserId(voucherOrder.getVoucherId(),voucherOrder.getUserId());
        if(!list.isEmpty()){
            return ;
        }
        Boolean success=seckillVoucherServiceImpl.update()
                .setSql("stock=stock-1")
                .eq("voucher_id", voucherOrder.getVoucherId())
                .gt("stock", 0)
                .update();
        if(!success){
            return ;
        }
        voucherOrderMapper.insert(voucherOrder);
    }

至此,异步下单已经初步完成了.

JMeter测试

由于要实现多人秒杀,所以就得用多个账户登录来抢票,但是找不到1000个真人来测试,总不能一个一个敲登录来获取token吧,所以可以写一个自动登录1000个账户的测试类

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package com.hmdp;
import com.hmdp.utils.RedisConstants;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.core.StringRedisTemplate;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@SpringBootTest
public class TokenTest {
    private static final Integer NUMBER_OF_TOKEN = 1000;
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    @Test
    public void test() {
        for (Integer i = 1; i < NUMBER_OF_TOKEN+1; i++) {
            Map<String, String> map = new HashMap<>();
            map.put("nickName", "");
            map.put("icon", "");
            map.put("id", i.toString());
            String token= UUID.randomUUID().toString();
            stringRedisTemplate.opsForHash().putAll(RedisConstants.LOGIN_USER_KEY+token,map);
            System.out.println(token);
            stringRedisTemplate.expire(token, RedisConstants.CACHE_SHOP_TTL, TimeUnit.MINUTES);
        }
    }
}

在 Redis里存一份即可,然后把所有的token输出出来,后面拿来给JMeter,这样绕过了拦截器

把tokens存到txt文件中

在JMeter里面这样设置一下

注意那个路径就是txt文件存储位置

然后改一下登录状态头

然后就可以开始测试了!

测试结果:

1000人抢200张票,只有20%成功率,80%的异常率,测试成功

Redis实现消息队列

认识消息队列

  • 消息队列(Message Queue,简称MQ)是一种在软件架构中用于实现应用程序之间异步通信的中间件。它允许一个或多个生产者(消息的发送者)将消息发送到队列中,然后由一个或多个消费者(消息的接收者)从队列中读取消息。消息队列的主要作用是解耦、缓冲、异步通信和负载均衡等,可以用来削峰填谷,减轻服务器和数据库的处理压力。

之前有用到spring的阻塞队列,但是这个阻塞队列效果不是很好,如果中途服务挂机了,或者说停机后JVM会强制删除所有内存数据,那么后续请求就无法完成,并且不保留记录.这时候就需要外部的消息队列了,这里先用Redis实现,后面还可以用实现好的消息队列,kafka,RocketMQ

Redis要求:

redis必须达到5版本及其以上,这里使用的是stream数据结构来制作消息队列

Redis单消费模式

我们可以使用XADD来向消息队列里面加入元素,用XREAD来读取消息队列的元素

1
XADD users * k2 v2
  • 第一次redis会去查存储空间里面有没有users这个消息队列如果没有就创建

  • 这个语句的意思是添加名称为users的消息队列,然后*为自动生成id,k2 v2为一个键值对

  • 添加成功后会返回一个数据"1743079174227-0"这是一个时间戳,用来唯一标识这个队列

同样的我们可以用XREAD来读取消息队列中的数据

1
XREAD count 1 streams users 0

这个语句的意思是读消息,一次读一条,读取消息队列users的消息,并且从0开始读

1
XREAD count 1 block 0 streams users $

这个语句的意思是读取users的消息,($)读取最新的消息,然后无限阻塞,直到有消息进入队列就读取一条消息.

这样看上去一个消息队列建好了,但是有bug,如果存储数据时间较长,存储数据没来得及存储完,又传来了一堆新的消息,$只能读取最新的消息,导致中间有很多消息都漏掉了

Redis消费者组模式

省流版:

  • 消费者组里面有很多消费者,它们是竞争关系,都来争着处理消息,可以解决消息漏处理的问题

  • 消息表示指的是最后被处理的消息会被打上标签,就像看书有个书签,看完后下次再看就立马知道在哪了

  • 消息确认就跟TCP三次握手相似,只是没有最后一段客户端向服务端的确认信息,每次处理完信息都会确认一下,并且有个pending-list,如果没处理这个消息就根据这个list继续处理

创建消费者组

1
XGROUP CREATE users group1 0

这个语句的意思是在users这个消息队列中创建消费者组,如果没有users这个消息队列就会创建失败,这个消费者组的名称为group1,每次从id=0开始读

通过消费者组读消息队列

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
localhost:6379> XREADGROUP group group1 c1 count 1 block 2000 streams users >
1) 1) "users"
   2) 1) 1) "1743074094428-0"
         2) 1) "k1"
            2) "v1"
localhost:6379> XREADGROUP group group1 c1 count 1 block 2000 streams users >
1) 1) "users"
   2) 1) 1) "1743079174227-0"
         2) 1) "k2"
            2) "v2"
localhost:6379> XREADGROUP group group1 c1 count 1 block 2000 streams users >
1) 1) "users"
   2) 1) 1) "1743082841091-0"
         2) 1) "k3"
            2) "v3"
localhost:6379> XREADGROUP group group1 c1 count 1 block 2000 streams users >
1) 1) "users"
   2) 1) 1) "1743082847320-0"
         2) 1) "k4"
            2) "v4"
localhost:6379> XREADGROUP group group1 c1 count 1 block 2000 streams users >
1) 1) "users"
   2) 1) 1) "1743082852967-0"
         2) 1) "k5"
            2) "v5"

这个语句的含义是通过消费者组进行读取,group的名称是group1(刚刚创建的),消费者名称叫c1(随便取的名字,没有的话会自动创建),每次消费一条信息,阻塞2秒,消息队列名称为users,选择从最早发过来的未读消息开始读,>换成0可以使因为之前处理消息的时候,服务器炸了,重新处理的时候读取pending-list,直接从未处理完成的消息继续处理

确认信息方法XACK

1
2
XACK users group1 1743074094428-0 1743079320687-0 1743082841091-0 1743082847320-0 
(integer) 4

前面是消息队列名称和消费者组名,后面是消息被消费后发出的时间戳

通过如下语句查询pending-list

1
2
3
4
5
6
xpending users group1
1) (integer) 1
2) "1743079174227-0"
3) "1743079174227-0"
4) 1) 1) "c1"
      2) "1"

通过Redis实现消息队列替代阻塞队列

其实没必要这么实现(绝对不是因为我这边IDEA无法识别我写的函数才不实现的)

我们可以直接上正经的消息队列

通过RabbitMQ替代阻塞队列

RabbitMQ是个消息队列,文档非常完整好懂,可以通过文档学习.要使用RabbitMQ要先下载,这里我在ubuntu虚拟机上下载并使用这个消息队列

在 Ubuntu 上安装 RabbitMQ 通常需要以下步骤:

1. 更新系统包列表

在安装任何软件之前,建议先更新系统的包列表,以确保安装的软件是最新的版本。运行以下命令:

1
sudo apt update

2. 安装 Erlang

RabbitMQ 是基于 Erlang 编程语言开发的,因此需要先安装 Erlang。可以使用以下命令安装 Erlang:

1
sudo apt install erlang

3. 添加 RabbitMQ 的官方仓库

为了获取最新版本的 RabbitMQ,建议添加 RabbitMQ 的官方仓库。运行以下命令:

1
2
3
sudo apt install apt-transport-https
wget -O- https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc | sudo apt-key add -
echo "deb https://dl.bintray.com/rabbitmq/debian $(lsb_release -sc) main" | sudo tee /etc/apt/sources.list.d/rabbitmq.list

4. 更新包列表

添加完官方仓库后,需要再次更新包列表:

1
sudo apt update

5. 安装 RabbitMQ Server

现在可以安装 RabbitMQ 了。运行以下命令:

1
sudo apt install rabbitmq-server

6. 启动和启用 RabbitMQ 服务

安装完成后,RabbitMQ 服务应该会自动启动。可以通过以下命令检查服务状态:

1
sudo systemctl status rabbitmq-server

如果服务未启动,可以手动启动并设置开机自启:

1
2
sudo systemctl start rabbitmq-server
sudo systemctl enable rabbitmq-server

7. 配置 RabbitMQ(可选)

如果需要进行额外配置,例如设置用户、权限等,可以使用以下命令:

  • 添加用户

    1
    
    sudo rabbitmqctl add_user myuser mypassword
    
  • 设置用户权限

    1
    2
    
    sudo rabbitmqctl set_user_tags myuser administrator
    sudo rabbitmqctl set_permissions -p / myuser ".*" ".*" ".*"
    
  • 启用管理插件(用于访问 Web 管理界面,默认端口为 15672):

    1
    
    sudo rabbitmq-plugins enable rabbitmq_management
    

正式使用RabbitMQ

按照以上步骤下载完rabbitmq后,我们可以打开网址馆里rabbitmq

http://虚拟机的ip地址:15672

注!

  • 如果你用的不是docker下载的rabbitmq,那么你虚拟机的ip地址会改变,下次连不上rabbitmq有可能是因为ip地址变了

rabbitmq管理界面长这样

你可以在主机上打开,也可以在虚拟机上打开,第一次登录只能在虚拟机上打开,因为有默认账号密码.输入账号:guest,密码:guest.然后进去后可以给自己之前用命令行创建的用户改权限

点击创建的用户,这样修改权限

这样你就可以用后端语言操控rabbitmq了!

然后我们就可以打开心爱的IDEA

引入maven坐标

先引入几个maven坐标

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
<!--rabbitMQ的maven坐标-->
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-amqp</artifactId>
            <version>2.2.18.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>2.2.18.RELEASE</version>
        </dependency>

注意:springboot2.0版本只能用这套maven坐标,要用3.0版本maven坐标的话会出bug

配置rabbitmq

先写好yaml

然后这样写配置类,把rabbitmq交给IOC容器

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package com.hmdp.config;

import com.hmdp.properties.RabbitMQProperties;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {
    @Autowired
    private RabbitMQProperties rabbitMQProperties;
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitMQProperties.getHost());
        connectionFactory.setUsername(rabbitMQProperties.getUsername());
        connectionFactory.setPassword(rabbitMQProperties.getPassword());
        connectionFactory.setVirtualHost(rabbitMQProperties.getVirtualHost());
        connectionFactory.setPort(rabbitMQProperties.getPort());
        return connectionFactory;
    }
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        return new RabbitTemplate(connectionFactory);
    }

    @Bean
    public Queue voucherOrderQueue() {
        return new Queue("voucher_order_queue", true);
    }
}

之前我们用的是阻塞队列,单独为消费者new了个线程来处理消息,现在我们可以创建一个消费者类,并且把对象交给IOC容器,让这个消费者持续处理信息

消费者类

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package com.hmdp.Consumer;

import com.hmdp.entity.VoucherOrder;
import com.hmdp.service.IVoucherOrderService;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;

@Component
public class VoucherOrderConsumer {
    @Autowired
    private IVoucherOrderService voucherOrderService;
    @Resource
    private RedissonClient redisson6379;
    @Resource
    private RedissonClient redisson6380;
    @Resource
    private RedissonClient redisson6381;
    @Transactional
    @RabbitListener(queues = "voucher_order_queue")
    public void handle(VoucherOrder voucherOrder) throws InterruptedException {
        //SimpleRedisLock lock=new SimpleRedisLock("order:"+userId,stringRedisTemplate);
        RLock lock1=redisson6379.getLock("lock:order:"+voucherOrder.getUserId());
        RLock lock2=redisson6380.getLock("lock:order:"+voucherOrder.getUserId());
        RLock lock3=redisson6381.getLock("lock:order:"+voucherOrder.getUserId());
        RLock lock=redisson6380.getMultiLock(lock1,lock2,lock3);
        Boolean isLock1=lock.tryLock(1L, TimeUnit.SECONDS);
        //Boolean isLock=lock.tryLock(1200L);
        if(!isLock1){
            return ;
        }
        try{
            voucherOrderService.createVoucherOrder(voucherOrder);
        }finally {
            System.out.println("订单处理成功:" + voucherOrder.getId());
            lock.unlock();
        }
    }
}

然后ServiceImpl类就成为了生产者

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Override
    public Result order(Long voucherId) {
        //查询优惠券信息
        SeckillVoucher seckillVoucher = seckillVoucherMapper.selectById(voucherId);
        //判断秒杀是否开始
        LocalDateTime timeEnd = seckillVoucher.getEndTime();
        LocalDateTime timeBegin=seckillVoucher.getBeginTime();
        if(timeBegin.isAfter(LocalDateTime.now())){
            return Result.fail("秒杀活动暂未开始");
        }else if (timeEnd.isBefore(LocalDateTime.now())){
            return Result.fail("秒杀活动现已结束");
        }
        //前期判断
        List<String> KEYS = new ArrayList<>();
        //List<String> ARGS = new ArrayList<>();
        KEYS.add("Inventory"+voucherId+":stock");
        KEYS.add("Inventory"+voucherId+":set");
        Long id=redisId.nextId("order");
        Long result = stringRedisTemplate.execute(TICKET_SNATCHING_SCRIPT, KEYS, UserHolder.getUser().getId()+"");
        if(result==1){
            return Result.fail("没票了");
        }else if (result==2){
            return Result.fail("一个人不能抢多张票");
        }
        Long userId= UserHolder.getUser().getId();
        VoucherOrder voucherOrder=new VoucherOrder();
        voucherOrder.setVoucherId(voucherId);
        voucherOrder.setUserId(userId);
        voucherOrder.setCreateTime(LocalDateTime.now());
        voucherOrder.setUpdateTime(LocalDateTime.now());
        voucherOrder.setPayTime(LocalDateTime.now());
        voucherOrder.setStatus(1);
        voucherOrder.setId(id);
        // 使用 RabbitMQ 发送消息
        rabbitTemplate.convertAndSend("voucher_order_queue", voucherOrder);
        //queue.add(voucherOrder);
        //获取代理对象
        proxy=(IVoucherOrderService) AopContext.currentProxy();
        return Result.ok("下单成功"+id);
    }

这样我们就使用了rabbitmq作为消息队列完成了异步秒杀.这只是rabbitmq的一点实力,后面我可能会专开一个坑来学rabbitmq

测试类更新

简单更新了一下测试类

把token一键写入.txt文件

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Test
    public void test1() {
        // 指定文件路径
        String filePath = "C:\\Users\\86180\\Desktop\\Test\\秒杀订单TOKEN数据.txt";
        File file = new File(filePath);

        // 如果文件不存在则创建文件夹和文件
        if (!file.getParentFile().exists()) {
            file.getParentFile().mkdirs();
        }

        // 如果文件存在,先清空文件内容
        try (BufferedWriter writer = new BufferedWriter(new FileWriter(file))) {
            writer.write(""); // 清空文件内容
        } catch (IOException e) {
            e.printStackTrace();
        }

        // 写入新的 token 数据
        try (BufferedWriter writer = new BufferedWriter(new FileWriter(file, true))) {
            for (Integer i = 1; i < NUMBER_OF_TOKEN + 1; i++) {
                Map<String, String> map = new HashMap<>();
                map.put("nickName", "");
                map.put("icon", "");
                map.put("id", i.toString());
                String token = UUID.randomUUID().toString();
                stringRedisTemplate.opsForHash().putAll(RedisConstants.LOGIN_USER_KEY + token, map);
                stringRedisTemplate.expire(RedisConstants.LOGIN_USER_KEY + token, RedisConstants.CACHE_SHOP_TTL, TimeUnit.MINUTES);

                // 将 token 写入文件
                writer.write(token);
                writer.newLine(); // 换行
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

一键刷新测试数据,让我们和JMETER再抢一千次优惠券吧!!

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.hmdp;
import com.hmdp.mapper.SeckillVoucherMapper;
import com.hmdp.service.impl.VoucherOrderServiceImpl;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.core.StringRedisTemplate;

@SpringBootTest
public class RefreshData {
    private static final Long VOUCHER_ID=16L;
    private static final Long STOCK=1000L;
    @Autowired
    private VoucherOrderServiceImpl voucherOrderService;
    @Autowired
    private SeckillVoucherMapper seckillVoucherMapper;
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    @Test
    public void test() {
        //删除所有订单
        voucherOrderService.deleteByVoucherId(VOUCHER_ID);
        //将剩余票数刷新为指定数值
        seckillVoucherMapper.updateByVoucherId(VOUCHER_ID,STOCK);
        //同步修改redis的剩余票数
        stringRedisTemplate.opsForValue().set("Inventory"+VOUCHER_ID+":stock",STOCK+"");
        //删除redis的set
        stringRedisTemplate.delete("Inventory"+VOUCHER_ID+":set");
    }
}

点赞功能

问题分析

点赞要求我们第一次点上去是点赞数+1,再点一次是点赞数-1,这完全可以交给数据库去解决,然后就是

关注与互关

TODO明天写完

Licensed under CC BY-NC-SA 4.0