掘金 后端 ( ) • 2024-04-23 10:42

theme: mk-cute highlight: a11y-light

引子

在文章的开始前,我们先来看一段代码:

@Autowired
private StringRedisTemplate stringRedisTemplate;

@PostMapping("/deduct_stock")
public String deductStock() {
    int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
    if (stock > 0) {
        int realStock = stock - 1;
        stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock));
        log.info("扣减成功,剩余库存:" + realStock);
    } else {
        log.info("扣减失败,库存不足");
    }
    return "OK";
}

代码的逻辑很简单,就是从redis里拿到一个key为stock的键值对,然后判断其值在大于0的情况下进行减一操作,之后再将这个键值对重新存储,这里模拟的是下单减商品库存这么一个场景。

从场景看问题

虽然代码逻辑很简单,乍一看也没什么问题,但相信聪明的读者已经想到了在高并发场景下可能存在的问题:在短时间内突然大量的请求这个接口,假如有3个请求同时执行到取键值对并进行了扣减库存,那么此时实际上库存是-3,但这3个请求是在同一时刻执行,所以最终只做了减1并重新存储到了redis中。这个时候会导致什么问题呢?假如你这件商品有100件库存,3个请求就是3个人买了3件,但业务扣库存只扣减了1件,就会导致超卖问题

解决问题之道

对于并发问题,我想大家都能想到一个解决方法-加锁。但是加锁的方式有很多,最简单的直接给方法套一个同步代码块,就可以加锁,如下:

@Autowired
private StringRedisTemplate stringRedisTemplate;

@PostMapping("/deduct_stock")
public String deductStock() {
    synchronized (this) {
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
        if (stock > 0) {
            int realStock = stock - 1;
            stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock));
            log.info("扣减成功,剩余库存:" + realStock);
        } else {
            log.info("扣减失败,库存不足");
        }
    }
    return "OK";
}

但这样加锁就解决问题了吗?解决了但没完全解决,因为无论是ReentrantLock还是synchronized,它们都是Java中的本地锁,用于控制同一JVM进程内的并发访问。那么如果你的应用在线上是单机部署的话,自然就没问题,但如果项目是以集群或分布式部署的话,那么这个锁自然就无法满足了,如下:

集群架构.png

假如我们的应用部署在两台机器上,当请求通过网关转发到不同机器的应用上时,那么此时tomcat1上的的锁自然也就无法锁住tomcat2上的请求。那么此时聪明的你一定想到了分布式锁,事实上分布式锁的实现方式很多,比如ZooKeeper、Redis、MySQL等,下面将以redis的实现来展开。

redis如何实现分布式锁

其实redis的分布式锁的实现很简单,就是通过SET和SETNX命令来实现的,它们的作用都是设置Key和Value,但SETNX只有在键不存在时才会设置值,而SET则不管键是否存在都会设置值。也就是说,一个已存在的键值对,如果用SET来操作会把旧值覆盖掉,但用SETNX则不会做任何任何操作。下面我启动redis客户端来演示一下: 2.png 那么回到我们现在的业务场景,当很多人抢同一件商品时,在执行减库存之前先用SETNX命令来存储Key为商品id、值为任意内容的键值对,就会让大量的请求在redis中排队,但由于SETNX命令的限制只会让队头的请求执行成功,那么我就可以认为排在队头的请求拿到了一把分布式锁,于是它就可以往下继续执行业务逻辑,而其他请求则需通过重试且等待拿到锁的请求释放掉锁而抢夺锁。

初次加锁

于是,有了这个逻辑的支撑下,我们就可以对代码进行优化:

@PostMapping("/deduct_stock")
public String deductStock() {
    String key = "lock:product:001";
    //加锁
    Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "2333");
    if (!flag) {
        return "下单失败!";
    }
    int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
    if (stock > 0) {
        int realStock = stock - 1;
        stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock));
        log.info("扣减成功,剩余库存:" + realStock);
    } else {
        log.info("扣减失败,库存不足");
    }
    //释放锁
    stringRedisTemplate.delete(key);
    return "OK";
}

在原本的基础上,我们通过stringRedisTemplate来操作redis执行SETNX命令来实现分布式锁,但这样真的就没问题了吗?

继续优化

回到代码本身,我们在扣减库存前加锁,在执行完减库存逻辑后释放锁。乍一看没什么问题,但试想如果代码执行到一半抛异常了,这个时候锁也没释放掉,那么别的请求也没法再进来,就陷入了“死锁”状态。这个时候我们就需要解决这个问题,于是有了下面的代码:

@PostMapping("/deduct_stock")
public String deductStock() {
    String key = "lock:product:001";
    //加锁
    Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "2333");
    if (!flag) {
        return "下单失败!";
    }
    try {
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
        if (stock > 0) {
            int realStock = stock - 1;
            stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock));
            log.info("扣减成功,剩余库存:" + realStock);
        } else {
            log.info("扣减失败,库存不足");
        }
    } finally {
        //释放锁
        stringRedisTemplate.delete(key);
    }
    return "OK";
}

我们将释放锁的代码放进finally代码块里,这样无论执行过程中抛出怎样的异常,最终都会执行。但这样就没问题了吗?

还可以优化

前面提到了代码执行到一半抛异常的情况,那么代码执行到一半服务器宕机了,这个时候锁也没法释放掉,运维同学很给力地快速恢复了服务,结果你的这个接口又无法使用了。所以,我们可以这么做:

@PostMapping("/deduct_stock")
public String deductStock() {
    String key = "lock:product:001";
    //加锁
    Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "2333");
    //加锁的过期时间
    stringRedisTemplate.expire(key,10, TimeUnit.SECONDS);
    if (!flag) {
        return "下单失败!";
    }
    try {
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
        if (stock > 0) {
            int realStock = stock - 1;
            stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock));
            log.info("扣减成功,剩余库存:" + realStock);
        } else {
            log.info("扣减失败,库存不足");
        }
    } finally {
        //释放锁
        stringRedisTemplate.delete(key);
    }
    return "OK";
}

我在原代码的基础上又给锁加了过期时间,这样哪怕服务执行到一半宕机了,过了10S后这把锁也会自动释放掉(因为锁是redis提供的,挂的是应用,又不是redis服务),但这样就没问题了吗?

还能优化

继续回到代码本身,我现在的加锁和设置过期时间分了两步代码来写,那么假如执行到加锁那一步服务就挂了,没设置上过期时间,这样恢复服务后还是会“死锁”,这两步代码对应redis中也是两行命令,所以不具有原子性。所以,我们要做的就是把它们合并成具有原子性的一条命令,如下:

@PostMapping("/deduct_stock")
public String deductStock() {
    String key = "lock:product:001";
    //加锁
    //Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "2333");
    //加锁的过期时间
    //stringRedisTemplate.expire(key,10, TimeUnit.SECONDS);
    //加锁且设置过期时间
    Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "2333", 10, TimeUnit.SECONDS);
    if (!flag) {
        return "下单失败!";
    }
    try {
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
        if (stock > 0) {
            int realStock = stock - 1;
            stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock));
            log.info("扣减成功,剩余库存:" + realStock);
        } else {
            log.info("扣减失败,库存不足");
        }
    } finally {
        //释放锁
        stringRedisTemplate.delete(key);
    }
    return "OK";
}

那么现在这段代码是不是就没问题了呢?答案当然是否定的,再次回到代码本身,我们这个锁的过期时间是10秒钟,那么假如在高并发场景下先是一个请求拿到了锁,它完成整个业务到释放锁需要15秒(Ps:这里的时间都是方便讲解设的,实际业务中没人会搞这么慢的接口),在这种情况下会发生什么?答案显而易见,那就是第一个请求还没执行完就把锁释放掉了,这时候第二个请求拿到锁进来了,而恰好第一个请求执行到释放锁那一步把锁释放了(Ps:这时候释放的是第二个请求的锁),于是第三个请求又拿到锁进来了。以此类推,所以依然有可能出现“超卖问题”。

深入优化

那么,我们分析下这个问题的根本点在于“自己加的锁,被别人删掉了”,在这个思路下,我们可以这样做:

@PostMapping("/deduct_stock")
public String deductStock() {
    String key = "lock:product:001";
    //生成UUID
    String clientId = UUID.randomUUID().toString();
    //加锁且设置过期时间
    Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, clientId, 10, TimeUnit.SECONDS);
    if (!flag) {
        return "下单失败!";
    }
    try {
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
        if (stock > 0) {
            int realStock = stock - 1;
            stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock));
            log.info("扣减成功,剩余库存:" + realStock);
        } else {
            log.info("扣减失败,库存不足");
        }
    } finally {
        //释放锁前先判断是否是自己的锁
        if (clientId.equals(stringRedisTemplate.opsForValue().get(key))) {
            //释放锁
            stringRedisTemplate.delete(key);
        }
    }
    return "OK";
}

我们给每个请求加一个唯一标识,这里我用的UUID,然后在释放锁前先判断当前锁的UUID与请求的UUID是否一致,只有二者相同才可以释放锁。那么现在这段逻辑就完美无缺了吗?当然不是,把目光聚焦在释放锁那一步,和前面加过期时间的问题相似,我们这里的判断和释放锁并没有保证原子性。假设这么一个极限情况,当代码用时9.9s运行到判断处,此时因为网络卡顿导致释放锁还没执行,但10s的时间到了锁自动释放了,而这时请求2拿到锁进来了,但请求1执行到释放锁把请求2的锁释放掉了,于是又回到了前面的“超卖问题”。

最终优化

其实整体看一下上面的各种“坑”,归根结底的原因就出在了过期时间上,那么我们可不可以通过设置更大的过期时间来解决呢?显然是不行的,首先太长的锁时间本身对业务就不友好,其次,时间定的再长依然可能出现上面的问题。那么我们换个思路:我们只需要当业务没执行完成时,就给锁继续延长过期时间,这个操作也叫“锁续命”。比如当前有一个请求抢到锁进入到了业务,这个时候再开一条线程通过定时任务来每隔XX时间(Ps:这个时间是一定低于锁的过期时间的),那么只要业务没执行完毕,就可以通过这个定时任务一直延长过期时间指导业务执行完毕并释放锁。那么这个需要我们自己手撸吗?当然不需要,高并发秒杀这么热门的业务场景自然已经有了成熟的方案,那就是Redisson

Redisson的初体验

Redisson已经提供了丰富的API让我们大大降低了我们使用分布式锁的成本。

导入依赖

根据自己的SpringBoot版本选择适合的版本。

<!-- redisson -->
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.6.5</version>
</dependency>

简单配置

这里我简单配了一个单机模式,但Redisson对主流的分布式、集群、发布订阅、哨兵等模式都做了支持,可以根据自己的需要进行配置。

@Configuration
public class RedissonConfig {

    /**
     * 单机模式配置redisson
     * @return redisson
     */
    @Bean
    public Redisson redisson(){
        Config config = new Config();
        config.useSingleServer().setAddress("redis://127.0.0.1:6379").setDatabase(0);
        return (Redisson) Redisson.create(config);
    }
}

编码

那么我们把代码中原来的加锁代码都删掉,采用redisson的API进行加锁和释放锁:

@PostMapping("/deduct_stock")
public String deductStock() {
    String lockKey = "lock:product:001";
    //获取锁对象
    RLock redissonLock = redisson.getLock(lockKey);
    //加锁
    redissonLock.lock();
    try {
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
        if (stock > 0) {
            int realStock = stock - 1;
            stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock));
            log.info("扣减成功,剩余库存:" + realStock);
        } else {
            log.info("扣减失败,库存不足");
        }
    } finally {
        //释放锁
        redissonLock.unlock();
    }
    return "OK";
}

原本需要我们自己处理的大量步骤就变成了简单的三步:获取锁加锁释放锁。但仅仅只是用肯定不行的,我们需要知道它是怎么实现的,所以,接下来让我们一起走进redisson的源码里。

刨根问底

前面我们提到了执行时的原子性以及“锁续命”机制,这些redisson都帮我们做到了,我们在使用时只需要加锁和释放即可,那么它是如何实现的呢?就让我带着大家一起看看这其中的原因。

加锁

顺着lock() 方法一路点进去,最终我们找到了tryLockInnerAsync() 方法,如下:

<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    internalLockLeaseTime = unit.toMillis(leaseTime);

    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
              "if (redis.call('exists', KEYS[1]) == 0) then " +
                  "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
              "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                  "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
              "return redis.call('pttl', KEYS[1]);",
                Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

为了避免第一次接触的同学不认识,专门说下是中间那一大段字符串拼接的就是Lua脚本(Ps:不了解的同学请自行百度下)。我们这里只关注加锁的功能,所以只看上半部分,其实就做了这么一件事:先判断key是否存在,如果不存在,则创建一个新的锁并设置过期时间。Lua脚本可以保证操作的原子性,所以是并发安全的。而脚本的参数来源则是由这三个参数传递的,它们其实我们已经见过了,就是我们前面自行实现的时候传的key -> getName(),过期时间 -> internalLockLeaseTime和clientId -> getLockName(threadId)。这个方法就保证了加锁时原子性,那么我们前面提到的“锁续命”又是怎么实现的呢?让我们回到这个方法的上一层:

private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
    // 如果租约时间不等于-1,那么调用 tryLockInnerAsync 方法尝试获取锁
    if (leaseTime != -1) {
        return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    // 如果租约时间等于-1,那么使用默认的看门狗超时时间尝试获取锁
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    // 为获取锁的操作添加一个监听器,当操作完成时,该监听器会被调用
    ttlRemainingFuture.addListener(new FutureListener<Long>() {
        @Override
        public void operationComplete(Future<Long> future) throws Exception {
            // 如果获取锁的操作失败,那么直接返回
            if (!future.isSuccess()) {
                return;
            }

            // 获取锁的剩余生存时间
            Long ttlRemaining = future.getNow();
            // 如果剩余生存时间为 null,说明已经成功获取了锁,那么安排锁的过期时间更新
            if (ttlRemaining == null) {
                scheduleExpirationRenewal(threadId);
            }
        }
    });
    // 返回表示获取锁操作的 RFuture 对象
    return ttlRemainingFuture;
}

通过这一步代码我们可以知道:这里为获取锁的操作添加一个监听器,当操作完成时,该监听器会被调用。如果操作成功,并且成功获取了锁(即剩余生存时间为 null),那么安排锁的过期时间更新,接下来我们再看看它是怎么做续期的:

private void scheduleExpirationRenewal(final long threadId) {
    if (expirationRenewalMap.containsKey(getEntryName())) {
        return;
    }

    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            
            RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                    "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return 1; " +
                    "end; " +
                    "return 0;",
                      Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
            
            future.addListener(new FutureListener<Boolean>() {
                @Override
                public void operationComplete(Future<Boolean> future) throws Exception {
                    expirationRenewalMap.remove(getEntryName());
                    if (!future.isSuccess()) {
                        log.error("Can't update lock " + getName() + " expiration", future.cause());
                        return;
                    }
                    
                    if (future.getNow()) {
                        // reschedule itself
                        scheduleExpirationRenewal(threadId);
                    }
                }
            });
        }
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

    if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {
        task.cancel();
    }
}

我们可以看到这个方法很巧妙地通过线程池的延迟执行来实现续期,通过线程池的schedule方法在指定的延迟时间后执行一个Runnable或Callable任务。这里它取了锁时间的三分之一,也就是说它每过三分之一锁时间就会查询“”是否还在,如果还在就把锁的时间重置。如果“”通过上面的Lua脚本获取不到即已经不存在了,自然也就不会执行这个定时任务了。

小结

在使用redisson后其实我们上面的那段示例代码就规避了“很多坑”,已经可以放到生产环境中使用了,但在架构层面还存在一些“小坑”,我会在第二篇文章中结合具体案例讲讲主从架构下redis分布式锁的应用以及redis分布式锁如何提高性能