掘金 后端 ( ) • 2024-05-02 17:27

序言

本文旨在分析 Redisson 作为分布式锁在单节点 Redis(useSingleServer)情况下加锁解锁的原理。

单机锁

加锁方式

  RLock lock = redissonClient.getLock(key);
  lock.lock();

获取的 RLock 是什么

getLock(key) 方法会使用 redissonClient 初始化一个锁 RedissonLock(commandExecutor, name);


/**
* commandExecutor 命令执行器, 用于执行Redis 命令
* name 锁的名称
*/
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
    // 构建加锁对象
    super(commandExecutor, name);
    // 指定加锁执行器
    this.commandExecutor = commandExecutor;
    // 指定锁超时时间,默认为 30s
    this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
    // 从连接器中获取发布服务
    this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
}

如何加锁

/**
* leaseTime -1
* unit 时间单位 null
* interruptibly false
*/
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
        // 获取当前线程ID 后面锁重入时会使用到
        long threadId = Thread.currentThread().getId();
        // 尝试加锁,加锁成功会返回null
        Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return;
        }
        // 以下为加锁失败的补偿操作
        // 生成一个监听线程,监听当前锁的状态,实现有分为公平的和非公平的
        CompletableFuture<RedissonLockEntry> future = subscribe(threadId);
        if (interruptibly) {
            commandExecutor.syncSubscriptionInterrupted(future);
        } else {
            // 默认 interruptibly 为false 会在这里阻塞等待future取得结果
            commandExecutor.syncSubscription(future);
        }

        // 以下已经收到原线程的释放锁通知了
        try {
            while (true) {
                // 再次尝试加锁
                ttl = tryAcquire(-1, leaseTime, unit, threadId);
                // 加锁成功,跳出循环
                if (ttl == null) {
                    break;
                }

                // ttl 为当前持有锁线程的剩余持有时长
                if (ttl >= 0) {
                    try {
                        // 尝试 ttl 时间之后重新加锁
                        commandExecutor.getNow(future).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    } catch (InterruptedException e) {
                        if (interruptibly) {
                            throw e;
                        }
                        // interruptibly 默认为false 所以这边会继续尝试加锁
                        commandExecutor.getNow(future).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    }
                } else {
                    //  ttl < 0 则说明前一个锁已经释放 直接尝试加锁
                    if (interruptibly) {
                        commandExecutor.getNow(future).getLatch().acquire();
                    } else {
                        commandExecutor.getNow(future).getLatch().acquireUninterruptibly();
                    }
                }
            }
        } finally {
            // 将当前线程的加锁监听取消
            unsubscribe(commandExecutor.getNow(future), threadId);
        }
//        get(lockAsync(leaseTime, unit));
    }

可以看到,上述代码中最主要的加锁代码为 Long ttl = tryAcquire(-1, leaseTime, unit, threadId);,其内部调用的方法为get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));,下面我们看下 tryAcquireAsync执行的代码

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
                          "if (redis.call('exists', KEYS[1]) == 0) then " +
                          "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                          "return nil; " +
                          "end; " +
                          "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                          "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                          "return nil; " +
                          "end; " +
                          "return redis.call('pttl', KEYS[1]);",
                          Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}

字段取值说明

  1. KEYS[1] 锁key,例:test-key
  2. ARGV[1] 取值为锁过期时间 leaseTime 例:30000
  3. ARGV[2] 取值为 加锁线程的ID(redisson启动时生成的实例ID + 线程ID) 例:a78ef432-9981-4554-aee7-eb7ba58131e2:1

LUA 脚本解析

  • redis.call('exists', KEYS[1]) == 0 判断资源key是否存在,不存在则加锁
  • redis.call('hincrby', KEYS[1], ARGV[2], 1) 使用hash结构存储,根据线程id自定义一个子key,设置值为1
  • redis.call('pexpire', KEYS[1], ARGV[1]) 设置过期时间
  • return nil; 返回空
  • redis.call('hexists', KEYS[1], ARGV[2]) == 1_可重入锁_的判断,如果是当前线程再次获取锁,就允许获取锁
  • redis.call('hincrby', KEYS[1], ARGV[2], 1) 将当前线程的持有的值加1
  • redis.call('pexpire', KEYS[1], ARGV[1]) 重新设置过期时间
  • return nil; 返回空
  • return redis.call('pttl', KEYS[1])

如何解锁

字段取值说明

  1. KEYS[1] 锁的key值 test-key
  2. KEYS[2] 锁的key值-channel 用于推送消息 "redisson_lock__channel:{test-key}"
  3. ARGV[1] 推送消息的类型
  4. ARGV[2] 锁超时时间
  5. ARGV[3] 当前锁线程ID
    protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                        "return nil;" +
                        "end; " +
                        "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                        "if (counter > 0) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                        "return 0; " +
                        "else " +
                        "redis.call('del', KEYS[1]); " +
                        "redis.call('publish', KEYS[2], ARGV[1]); " +
                        "return 1; " +
                        "end; " +
                        "return nil;",
                Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));

LUA脚本解析

  • redis.call('hexists', KEYS[1], ARGV[3]) == 0 判断当前线程是否持有锁,未持有锁时返回空
  • 以下逻辑处理时 说明当前线程已经持有锁了
  • local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);当前线程持有锁数量 -1, 并取得减完之后锁数量(可重入时会多次加锁)
  • if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); 如果最新的锁数量大于0, 将锁续期
  • redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); 如果锁数量小于等于0 将锁信息清除 , 并推送广播消息当前锁已释放

监听机制

在首次加锁失败之后,redisson 开启对于加锁 key 的监听,具体是如何操作的呢?

protected CompletableFuture<RedissonLockEntry> subscribe(long threadId) {
    return pubSub.subscribe(getEntryName(), getChannelName());
}

/**
* entryName commandExecutor.getConnectionManager().getId() +":" + name 例: a78ef432-9981-4554-aee7-eb7ba58131e2:test-key
* channelName prefixName("redisson_lock__channel", getRawName()); 例:redisson_lock__channel:{test-key}
* 
*/
public CompletableFuture<E> subscribe(String entryName, String channelName) {
        // 随机取得一个 reids AsyncSemaphore,其中维护了一个 AtomicInteger 的 counter 和 ConcurrentLinkedQueue 的 linsteners
        AsyncSemaphore semaphore = service.getSemaphore(new ChannelName(channelName));
        CompletableFuture<E> newPromise = new CompletableFuture<>();

        int timeout = service.getConnectionManager().getConfig().getTimeout();
        // 创建一个时间轮,在3000ms(默认)之后抛出连接超时异常
        Timeout lockTimeout = service.getConnectionManager().newTimeout(t -> {
            newPromise.completeExceptionally(new RedisTimeoutException(
                    "Unable to acquire subscription lock after " + timeout + "ms. " +
                            "Increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters."));
        }, timeout, TimeUnit.MILLISECONDS);

        // 在信号量对象中监听状态
        semaphore.acquire(() -> {
            // 如果时间轮取消失败 释放 semaphore 信号量
            if (!lockTimeout.cancel()) {
                semaphore.release();
                return;
            }

            E entry = entries.get(entryName);
            if (entry != null) {
                entry.acquire();
                semaphore.release();
                entry.getPromise().whenComplete((r, e) -> {
                    if (e != null) {
                        newPromise.completeExceptionally(e);
                        return;
                    }
                    newPromise.complete(r);
                });
                return;
            }

            // entry == null 新建 entry(RedissonLockEntry 实现了 PubSubEntry 接口 )
            E value = createEntry(newPromise);
            // 获取RedissonLockEntry锁 持有数+1
            value.acquire();

            // 将 value 锁添加到 entries ConcurrentHashMap 中
            E oldValue = entries.putIfAbsent(entryName, value);
            // 不为 null 表明已经存在当前 key
            if (oldValue != null) {
                oldValue.acquire();
                semaphore.release();
                oldValue.getPromise().whenComplete((r, e) -> {
                    if (e != null) {
                        newPromise.completeExceptionally(e);
                        return;
                    }
                    newPromise.complete(r);
                });
                return;
            }

            RedisPubSubListener<Object> listener = createListener(channelName, value);
            CompletableFuture<PubSubConnectionEntry> s = service.subscribe(LongCodec.INSTANCE, channelName, semaphore, listener);
            s.whenComplete((r, e) -> {
                if (e != null) {
                    value.getPromise().completeExceptionally(e);
                    return;
                }
                value.getPromise().complete(value);
            });

        });

        return newPromise;
    }