Redission是什么?
Redisson 是架设在 Redis基础上的一个 Java 驻内存数据网格框架,充分利用 Redis 键值数据库提供的一系列优势,基于 Java 实用工具包中常用接口,为使用者提供了 一系列具有分布式特性的常用工具类。
什么是公平锁和非公平锁:
- 公平锁:每个线程获取到锁的顺序,按照线程访问锁的先后顺序获取的,最前面的线程总是最先获取到锁。
- 非公平锁:每个线程获取锁的顺序是随机的,并不存在谁在前面就先获得锁,而是每次获取锁通过竞争的方式。
Redission可重入锁源码分析
客户端的使用
public void run() {
// 获取一个非公平锁:不保证线程加锁顺序
RLock lock = redissonClient.getLock(lockName);
try {
// 根据超时时间获取锁
boolean bLocked = lock.tryLock(100, -1, TimeUnit.MILLISECONDS);
if (bLocked) {
// 锁释放
lock.unlock();
} else {
hasFails.set(true);
}
} catch (Exception ex) {
hasFails.set(true);
}
}
- 使用 tryLock 获取锁:leaseTime < 0代表没有设置锁超时时间;Redisssion会通过WatchDog机制定时完成锁续期。
获取锁原理
获取锁流程
获取锁源码分析
tryAcquireAsync 尝试获取锁
private RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture<Long> ttlRemainingFuture;
// leaseTime > 0 使用 leaseTime 作为锁超时时间
if (leaseTime > 0) {
ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
// leaseTime <=0 使用使用默认时间 30s作为锁超时时间
ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
}
CompletionStage<Long> s = handleNoSync(threadId, ttlRemainingFuture);
ttlRemainingFuture = new CompletableFutureWrapper<>(s);
CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
// lock acquired
if (ttlRemaining == null) {
if (leaseTime > 0) {
internalLockLeaseTime = unit.toMillis(leaseTime);
} else {
// 通过WatchDog完成锁续期机制;使用时间轮实现
scheduleExpirationRenewal(threadId);
}
}
return ttlRemaining;
});
return new CompletableFutureWrapper<>(f);
}
- Redission在获取锁的时候:会通过 leaseTime 判断是否有锁超时时间;leaseTime > 0表示锁设有超时时间;leaseTime <= 0表示锁未设置超时时间。
- 当锁没有设置超时时间是:Redission通过WatchDog机制进行锁续期,scheduleExpirationRenewal 方法实现。
scheduleExpirationRenewal 锁续期实现
protected void scheduleExpirationRenewal(long threadId) {
// 构建 ExpirationEntry 对象并加入到MAP中
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
try {
// 执行锁续期逻辑
renewExpiration();
} finally {
if (Thread.currentThread().isInterrupted()) {
cancelExpirationRenewal(threadId, null);
}
}
}
}
renewExpiration 锁续期实现
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
// 创建Timeout;并且将Timeout加入到延迟队列中;此时并没有真正的加入到时间轮中
Timeout task = getServiceManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
// 获取当前线程ID
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
CompletionStage<Boolean> future = renewExpirationAsync(threadId);
future.whenComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock {} expiration", getRawName(), e);
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
return;
}
if (res) {
// reschedule itself
renewExpiration();
} else {
cancelExpirationRenewal(null, null);
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
renewExpirationAsync 锁续期Lua脚本实现
protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {
return evalWriteSyncedAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// 判断执行锁续期是否是当前线程
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
// 重置锁超时时间
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getRawName()),
internalLockLeaseTime, getLockName(threadId));
}
-
锁续期:通过时间轮机制实现;当时每一次创建出TimeOut对象,并没有马上放在时间轮的槽中;而是先放入到一个延迟队列中;当时间轮指针每次移动下一个tick的时候,才会一次性将最多10000个Timeout任务加入到时间轮中;
-
可以看到WatchDog就是通过时间轮;1/3 internalLockLeaseTime 就会执行一次锁续期;如果执行失败那么续期失败;
tryLockInnerAsync 加锁Lua脚本实现
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return evalWriteSyncedAsync(getRawName(), LongCodec.INSTANCE, command,
// 1. 根据KEY判断当前锁是否存在;如果存在需要通过 hexists 判断当前线程是否为加锁线程,用于实现锁重入。
// 2. redis.call('hincrby', KEYS[1], ARGV[2], 1); 对线程持有锁的数量 + 1.
// 3. redis.call('pexpire', KEYS[1], ARGV[1]) 设置锁过期时间
"if ((redis.call('exists', KEYS[1]) == 0) " +
"or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}
- 根据KEY判断当前锁是否存在;如果存在需要通过 hexists 判断当前线程是否为加锁线程,用于实现锁重入。
- redis.call('hincrby', KEYS[1], ARGV[2], 1); 对线程持有锁的数量 + 1.
- redis.call('pexpire', KEYS[1], ARGV[1]) 设置锁过期时间
释放锁原理
释放锁流程
释放锁源码
protected RFuture<Boolean> unlockInnerAsync(long threadId, String requestId, int timeout) {
return evalWriteSyncedAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local val = redis.call('get', KEYS[3]); " +
"if val ~= false then " +
"return tonumber(val);" +
"end; " +
// 判断释放锁线程是否为加锁线程
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
// 针对当前线程锁计数 - 1
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
// 如果counter > 0 说明锁还在持有, 不能删除key
"if (counter > 0) then " +
// 设置锁超时时间:避免出现死锁
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"redis.call('set', KEYS[3], 0, 'px', ARGV[5]); " +
"return 0; " +
"else " +
// 执行锁删除;删除KEY
"redis.call('del', KEYS[1]); " +
// 推送锁释放通知
"redis.call(ARGV[4], KEYS[2], ARGV[1]); " +
// 释放锁以后设置KEY为1 过期时间为timeout
"redis.call('set', KEYS[3], 1, 'px', ARGV[5]); " +
"return 1; " +
"end; ",
Arrays.asList(getRawName(), getChannelName(), getUnlockLatchName(requestId)),
LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime,
getLockName(threadId), getSubscribeService().getPublishCommand(), timeout);
}
总结
本文主要介绍了公平锁和非公平锁的区分以及Redission是如何实现可重入锁的原理;在后续的开发中,如果需要使用Redis实现可重入锁可以参考Redission的实现作为参考。
相关内容