序言
本文旨在分析 Redisson 作为分布式锁在单节点 Redis(
useSingleServer
)情况下加锁解锁的原理。
单机锁
加锁方式
RLock lock = redissonClient.getLock(key);
lock.lock();
获取的 RLock 是什么
getLock(key) 方法会使用 redissonClient 初始化一个锁 RedissonLock(commandExecutor, name);
/**
* commandExecutor 命令执行器, 用于执行Redis 命令
* name 锁的名称
*/
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
// 构建加锁对象
super(commandExecutor, name);
// 指定加锁执行器
this.commandExecutor = commandExecutor;
// 指定锁超时时间,默认为 30s
this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
// 从连接器中获取发布服务
this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
}
如何加锁
/**
* leaseTime -1
* unit 时间单位 null
* interruptibly false
*/
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
// 获取当前线程ID 后面锁重入时会使用到
long threadId = Thread.currentThread().getId();
// 尝试加锁,加锁成功会返回null
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}
// 以下为加锁失败的补偿操作
// 生成一个监听线程,监听当前锁的状态,实现有分为公平的和非公平的
CompletableFuture<RedissonLockEntry> future = subscribe(threadId);
if (interruptibly) {
commandExecutor.syncSubscriptionInterrupted(future);
} else {
// 默认 interruptibly 为false 会在这里阻塞等待future取得结果
commandExecutor.syncSubscription(future);
}
// 以下已经收到原线程的释放锁通知了
try {
while (true) {
// 再次尝试加锁
ttl = tryAcquire(-1, leaseTime, unit, threadId);
// 加锁成功,跳出循环
if (ttl == null) {
break;
}
// ttl 为当前持有锁线程的剩余持有时长
if (ttl >= 0) {
try {
// 尝试 ttl 时间之后重新加锁
commandExecutor.getNow(future).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (interruptibly) {
throw e;
}
// interruptibly 默认为false 所以这边会继续尝试加锁
commandExecutor.getNow(future).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else {
// ttl < 0 则说明前一个锁已经释放 直接尝试加锁
if (interruptibly) {
commandExecutor.getNow(future).getLatch().acquire();
} else {
commandExecutor.getNow(future).getLatch().acquireUninterruptibly();
}
}
}
} finally {
// 将当前线程的加锁监听取消
unsubscribe(commandExecutor.getNow(future), threadId);
}
// get(lockAsync(leaseTime, unit));
}
可以看到,上述代码中最主要的加锁代码为 Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
,其内部调用的方法为get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
,下面我们看下 tryAcquireAsync
执行的代码
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}
字段取值说明
- KEYS[1] 锁key,例:test-key
- ARGV[1] 取值为锁过期时间
leaseTime
例:30000 - ARGV[2] 取值为 加锁线程的ID(redisson启动时生成的实例ID + 线程ID) 例:a78ef432-9981-4554-aee7-eb7ba58131e2:1
LUA 脚本解析
- redis.call('exists', KEYS[1]) == 0 判断资源key是否存在,不存在则加锁
- redis.call('hincrby', KEYS[1], ARGV[2], 1) 使用hash结构存储,根据线程id自定义一个子key,设置值为1
- redis.call('pexpire', KEYS[1], ARGV[1]) 设置过期时间
- return nil; 返回空
- redis.call('hexists', KEYS[1], ARGV[2]) == 1_可重入锁_的判断,如果是当前线程再次获取锁,就允许获取锁
- redis.call('hincrby', KEYS[1], ARGV[2], 1) 将当前线程的持有的值加1
- redis.call('pexpire', KEYS[1], ARGV[1]) 重新设置过期时间
- return nil; 返回空
- return redis.call('pttl', KEYS[1])
如何解锁
字段取值说明
- KEYS[1] 锁的key值
test-key
- KEYS[2] 锁的key值-channel 用于推送消息
"redisson_lock__channel:{test-key}"
- ARGV[1] 推送消息的类型
- ARGV[2] 锁超时时间
- ARGV[3] 当前锁线程ID
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;",
Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
LUA脚本解析
-
redis.call('hexists', KEYS[1], ARGV[3]) == 0
判断当前线程是否持有锁,未持有锁时返回空 - 以下逻辑处理时 说明当前线程已经持有锁了
-
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
当前线程持有锁数量 -1, 并取得减完之后锁数量(可重入时会多次加锁) -
if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]);
如果最新的锁数量大于0, 将锁续期 -
redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]);
如果锁数量小于等于0 将锁信息清除 , 并推送广播消息当前锁已释放
监听机制
在首次加锁失败之后,redisson 开启对于加锁 key 的监听,具体是如何操作的呢?
protected CompletableFuture<RedissonLockEntry> subscribe(long threadId) {
return pubSub.subscribe(getEntryName(), getChannelName());
}
/**
* entryName commandExecutor.getConnectionManager().getId() +":" + name 例: a78ef432-9981-4554-aee7-eb7ba58131e2:test-key
* channelName prefixName("redisson_lock__channel", getRawName()); 例:redisson_lock__channel:{test-key}
*
*/
public CompletableFuture<E> subscribe(String entryName, String channelName) {
// 随机取得一个 reids AsyncSemaphore,其中维护了一个 AtomicInteger 的 counter 和 ConcurrentLinkedQueue 的 linsteners
AsyncSemaphore semaphore = service.getSemaphore(new ChannelName(channelName));
CompletableFuture<E> newPromise = new CompletableFuture<>();
int timeout = service.getConnectionManager().getConfig().getTimeout();
// 创建一个时间轮,在3000ms(默认)之后抛出连接超时异常
Timeout lockTimeout = service.getConnectionManager().newTimeout(t -> {
newPromise.completeExceptionally(new RedisTimeoutException(
"Unable to acquire subscription lock after " + timeout + "ms. " +
"Increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters."));
}, timeout, TimeUnit.MILLISECONDS);
// 在信号量对象中监听状态
semaphore.acquire(() -> {
// 如果时间轮取消失败 释放 semaphore 信号量
if (!lockTimeout.cancel()) {
semaphore.release();
return;
}
E entry = entries.get(entryName);
if (entry != null) {
entry.acquire();
semaphore.release();
entry.getPromise().whenComplete((r, e) -> {
if (e != null) {
newPromise.completeExceptionally(e);
return;
}
newPromise.complete(r);
});
return;
}
// entry == null 新建 entry(RedissonLockEntry 实现了 PubSubEntry 接口 )
E value = createEntry(newPromise);
// 获取RedissonLockEntry锁 持有数+1
value.acquire();
// 将 value 锁添加到 entries ConcurrentHashMap 中
E oldValue = entries.putIfAbsent(entryName, value);
// 不为 null 表明已经存在当前 key
if (oldValue != null) {
oldValue.acquire();
semaphore.release();
oldValue.getPromise().whenComplete((r, e) -> {
if (e != null) {
newPromise.completeExceptionally(e);
return;
}
newPromise.complete(r);
});
return;
}
RedisPubSubListener<Object> listener = createListener(channelName, value);
CompletableFuture<PubSubConnectionEntry> s = service.subscribe(LongCodec.INSTANCE, channelName, semaphore, listener);
s.whenComplete((r, e) -> {
if (e != null) {
value.getPromise().completeExceptionally(e);
return;
}
value.getPromise().complete(value);
});
});
return newPromise;
}
相关内容