掘金 后端 ( ) • 2024-04-25 19:07

Redission是什么?

Redisson 是架设在 Redis基础上的一个 Java 驻内存数据网格框架,充分利用 Redis 键值数据库提供的一系列优势,基于 Java 实用工具包中常用接口,为使用者提供了 一系列具有分布式特性的常用工具类。

什么是公平锁和非公平锁:

  • 公平锁:每个线程获取到锁的顺序,按照线程访问锁的先后顺序获取的,最前面的线程总是最先获取到锁。
  • 非公平锁:每个线程获取锁的顺序是随机的,并不存在谁在前面就先获得锁,而是每次获取锁通过竞争的方式。

image.png

Redission可重入锁源码分析

客户端的使用

public void run() {
    // 获取一个非公平锁:不保证线程加锁顺序
    RLock lock = redissonClient.getLock(lockName);
    try {
        // 根据超时时间获取锁
        boolean bLocked = lock.tryLock(100, -1, TimeUnit.MILLISECONDS);
        if (bLocked) {
            // 锁释放
            lock.unlock();
        } else {
            hasFails.set(true);
        }
    } catch (Exception ex) {
        hasFails.set(true);
    }
}
  • 使用 tryLock 获取锁:leaseTime < 0代表没有设置锁超时时间;Redisssion会通过WatchDog机制定时完成锁续期。

获取锁原理

获取锁流程

image.png

获取锁源码分析

tryAcquireAsync 尝试获取锁

private RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    RFuture<Long> ttlRemainingFuture;
    // leaseTime > 0 使用 leaseTime 作为锁超时时间
    if (leaseTime > 0) {
        ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    } else {
        // leaseTime <=0 使用使用默认时间 30s作为锁超时时间
        ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    }
    
    CompletionStage<Long> s = handleNoSync(threadId, ttlRemainingFuture);
    ttlRemainingFuture = new CompletableFutureWrapper<>(s);

    CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
        // lock acquired
        if (ttlRemaining == null) {
            if (leaseTime > 0) {
                internalLockLeaseTime = unit.toMillis(leaseTime);
            } else {
                // 通过WatchDog完成锁续期机制;使用时间轮实现
                scheduleExpirationRenewal(threadId);
            }
        }
        return ttlRemaining;
    });
    return new CompletableFutureWrapper<>(f);
}
  • Redission在获取锁的时候:会通过 leaseTime 判断是否有锁超时时间;leaseTime > 0表示锁设有超时时间;leaseTime <= 0表示锁未设置超时时间。
  • 当锁没有设置超时时间是:Redission通过WatchDog机制进行锁续期,scheduleExpirationRenewal 方法实现。

scheduleExpirationRenewal 锁续期实现

protected void scheduleExpirationRenewal(long threadId) {
        // 构建 ExpirationEntry 对象并加入到MAP中
    ExpirationEntry entry = new ExpirationEntry();
    ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
    if (oldEntry != null) {
        oldEntry.addThreadId(threadId);
    } else {
        entry.addThreadId(threadId);
        try {
            // 执行锁续期逻辑
            renewExpiration();
        } finally {
            if (Thread.currentThread().isInterrupted()) {
                cancelExpirationRenewal(threadId, null);
            }
        }
    }
}

renewExpiration 锁续期实现

private void renewExpiration() {
    ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (ee == null) {
        return;
    }
    
    // 创建Timeout;并且将Timeout加入到延迟队列中;此时并没有真正的加入到时间轮中
    Timeout task = getServiceManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
            if (ent == null) {
                return;
            }
            // 获取当前线程ID
            Long threadId = ent.getFirstThreadId();
            if (threadId == null) {
                return;
            }
            
            CompletionStage<Boolean> future = renewExpirationAsync(threadId);
            future.whenComplete((res, e) -> {
                if (e != null) {
                    log.error("Can't update lock {} expiration", getRawName(), e);
                    EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                    return;
                }
                
                if (res) {
                    // reschedule itself
                    renewExpiration();
                } else {
                    cancelExpirationRenewal(null, null);
                }
            });
        }
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
    
    ee.setTimeout(task);
}

renewExpirationAsync 锁续期Lua脚本实现

protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {
    return evalWriteSyncedAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            // 判断执行锁续期是否是当前线程
            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        // 重置锁超时时间
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return 1; " +
                    "end; " +
                    "return 0;",
            Collections.singletonList(getRawName()),
            internalLockLeaseTime, getLockName(threadId));
}
  • 锁续期:通过时间轮机制实现;当时每一次创建出TimeOut对象,并没有马上放在时间轮的槽中;而是先放入到一个延迟队列中;当时间轮指针每次移动下一个tick的时候,才会一次性将最多10000个Timeout任务加入到时间轮中;

  • 可以看到WatchDog就是通过时间轮;1/3 internalLockLeaseTime 就会执行一次锁续期;如果执行失败那么续期失败;

tryLockInnerAsync 加锁Lua脚本实现

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    return evalWriteSyncedAsync(getRawName(), LongCodec.INSTANCE, command,
            // 1. 根据KEY判断当前锁是否存在;如果存在需要通过 hexists 判断当前线程是否为加锁线程,用于实现锁重入。
            // 2. redis.call('hincrby', KEYS[1], ARGV[2], 1); 对线程持有锁的数量 + 1.
            // 3. redis.call('pexpire', KEYS[1], ARGV[1]) 设置锁过期时间
            "if ((redis.call('exists', KEYS[1]) == 0) " +
                        "or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then " +
                    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return nil; " +
                "end; " +
                "return redis.call('pttl', KEYS[1]);",
            Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}
  • 根据KEY判断当前锁是否存在;如果存在需要通过 hexists 判断当前线程是否为加锁线程,用于实现锁重入。
  • redis.call('hincrby', KEYS[1], ARGV[2], 1); 对线程持有锁的数量 + 1.
  • redis.call('pexpire', KEYS[1], ARGV[1]) 设置锁过期时间

释放锁原理

释放锁流程

image.png

释放锁源码

protected RFuture<Boolean> unlockInnerAsync(long threadId, String requestId, int timeout) {
    return evalWriteSyncedAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                          "local val = redis.call('get', KEYS[3]); " +
                                "if val ~= false then " +
                                    "return tonumber(val);" +
                                "end; " +
                                // 判断释放锁线程是否为加锁线程
                                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                                    "return nil;" +
                                "end; " +
                                // 针对当前线程锁计数 - 1
                                "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                                // 如果counter > 0 说明锁还在持有, 不能删除key
                                "if (counter > 0) then " +
                                    // 设置锁超时时间:避免出现死锁
                                    "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                                    "redis.call('set', KEYS[3], 0, 'px', ARGV[5]); " +
                                    "return 0; " +
                                "else " +
                                    // 执行锁删除;删除KEY
                                    "redis.call('del', KEYS[1]); " +
                                    // 推送锁释放通知
                                    "redis.call(ARGV[4], KEYS[2], ARGV[1]); " +
                                    // 释放锁以后设置KEY为1 过期时间为timeout
                                    "redis.call('set', KEYS[3], 1, 'px', ARGV[5]); " +
                                    "return 1; " +
                                "end; ",
                            Arrays.asList(getRawName(), getChannelName(), getUnlockLatchName(requestId)),
                            LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime,
                            getLockName(threadId), getSubscribeService().getPublishCommand(), timeout);
}

总结

本文主要介绍了公平锁和非公平锁的区分以及Redission是如何实现可重入锁的原理;在后续的开发中,如果需要使用Redis实现可重入锁可以参考Redission的实现作为参考。