Redisson分布式锁实现原理
本文一方面是抱怨一下现下的面试体验,另一方面看一下redisson(3.15.1)的trylock api的实现(我也不喜欢针对面试去看源码,只是想找个出口宣泄一下)。
笔者是因为喜欢写代码才干的java开发这一行,但是现实很残酷,要找一个技术氛围好的团队愉快的写代码并不是一件容易的事情,从作为应聘者的角度只能看到功利的八股文。
最近陆陆续续面试了一些大厂和中厂,在沟通顺利的情况下(双方在同一个频道上),八股文基本上没什么问题(至少我自己认为是这样的),但是面试体验并不是很好,所以一面之后我都拒了。
举个简单的例子,最近面试了一个中厂,对方在谈到分布式锁的时候,问我们分布式锁是怎么用的。
我:比如我们在xxx业务场景下需要用到分布式锁,防止在并发场景下同一个单据在内存中的数据计算逻辑错误。我们用了redisson这个工具,在redis官方的单机锁的基础上(set nx px 加锁,lua脚本解锁),利用watchdog解决续约问题。比如trylock 60秒,他会默认在锁过期前续约30秒。
面试官:trylock如果设置了超时时间,他会开启watchdog吗?
我:默认就会。
面试官:设置了超时时间,他默认不会开启watchdog。
我:怎么会呢?肯定会啊。
面试官:那你们一直都是用带超时时间的api吗?
我:我们都用默认的无参的tryLock,刚才我tryLock60秒只是随便举个例子。
面试官:你去看过源码吗?
我:这个redisson的源码我倒是没有看过,毕竟是一个工具,没遇到问题的时候不会去追溯源码。
我不知道对方的想法是什么,我比较介意的是针对一个工具的api问底层源码,毕竟不是每个人摸鱼的时候都会点进redissonclient的(我承认我点进去过,但是不是为了背诵来面试,只是好奇)。在我看来工具是工具,只要你思路是对的,就没有问题,看源码只是解决问题和培养解决问题思路的一种方式。
此外,redisson在我们团队内部被引入其实是因为某个开发的个人行为,他说我们以前的用法不对(redis官方的单机锁),要用redisson,我也无法反驳,那就用吧。
但是这位开发引入新组件也没有去看官方文档,导致解锁用法有误,这部分代码一直在线上,因为也没什么严重的影响(后续的开发过来直接复制粘贴,错可以一直错下去)。
RLock lock = redissonClient.getLock(key); try { if (lock.tryLock()) { // 业务方法 } } finally { if (lock.isLocked() && lock.isHeldByCurrentThread()) { // ? lock.unlock(); } } 复制代码
主要错误的地方在于lock.isLocked和lock.isHeldByCurrentThread会增加两次对redis的远程调用。
在不看redisson的官方文档的情况下,只看JDK中Lock接口的java doc,我们就可以看出tryLock和unlock应该怎么用。
很明显,在tryLock成功的情况下,try/catch包裹所有业务逻辑,而不是在tryLock之外通过try/catch包裹。Redisson既然实现了JDK的Lock接口,就不会改变他的语义。
本身团队内部的规范是直接使用的redis官方的单机分布式锁,没有watchdog,至于为什么没用redisson主要原因有几个:
必要性:以前台接口为例,如果用redis官方的单机锁(set nx px 加锁,lua脚本解锁)超时时间设置为60秒,你说你业务超过60秒,tomcat一共就200线程,你这种业务存在,一个接口响应时间在60秒,直接就可能导致tomcat线程池打满夯住,你还没到需要watchdog续约的情况就挂了;
系统复杂度:在不理解原理的情况下,我不会引入额外的中间件或者工具,一方面是出问题不好排查,另一方面越多的中间件和工具会导致系统越复杂,非必要不引入;
团队:团队成员对开发对中间件和工具的认知不足,引入新的组件会提高开发的门槛,我们不能要求团队里每个人都对所有的中间件了如指掌,也不能对于每一行改动的代码都做代码审查;
二、redisson客户端
我们先从自动配置入手,所有的api调用都是由RedissonClient发起的。
@Bean( destroyMethod = "shutdown" ) @ConditionalOnMissingBean({RedissonClient.class}) public RedissonClient redisson() throws IOException { Config config = null; // 装载Config配置,比如连接的 return Redisson.create(config); } 复制代码
Redisson.create根据Config配置创建RedissonClient的实现Redisson。
public static RedissonClient create(Config config) { Redisson redisson = new Redisson(config); if (config.isReferenceEnabled()) { redisson.enableRedissonReferenceSupport(); } return redisson; } 复制代码
Redisson在构造时new了三个对象:
ConnectionManager:负责底层通讯,不看底层了,直接给答案就是Netty
EvictionScheduler:不知道是什么,也不想追究
WriteBehindService:不知道是什么,也不想追究
public class Redisson implements RedissonClient { static { RedissonReference.warmUp(); } protected final QueueTransferService queueTransferService = new QueueTransferService(); protected final EvictionScheduler evictionScheduler; protected final WriteBehindService writeBehindService; protected final ConnectionManager connectionManager; protected final ConcurrentMap<Class<?>, Class<?>> liveObjectClassCache = new ConcurrentHashMap<>(); protected final Config config; protected final ConcurrentMap<String, ResponseEntry> responses = new ConcurrentHashMap<>(); protected Redisson(Config config) { this.config = config; Config configCopy = new Config(config); connectionManager = ConfigSupport.createConnectionManager(configCopy); evictionScheduler = new EvictionScheduler(connectionManager.getCommandExecutor()); writeBehindService = new WriteBehindService(connectionManager.getCommandExecutor()); } } 复制代码
三、getLock
Redisson.getLock将ConnectionManager里的CommandExecutor带进RedissonLock,目的肯定是为了RedissonLock可以调用底层通讯方法请求redis。name对应的就是我们的要锁的对象的标识,比如订单号。
@Override public RLock getLock(String name) { return new RedissonLock(connectionManager.getCommandExecutor(), name); } 复制代码
RedissonLock
internalLockLeaseTime:watchDog的超时时间,默认30s;
pubsub:暂时不知道有啥用,感觉是用到了redis的pubsub;
public class RedissonLock extends RedissonBaseLock { protected long internalLockLeaseTime; protected final LockPubSub pubSub; final CommandAsyncExecutor commandExecutor; public RedissonLock(CommandAsyncExecutor commandExecutor, String name) { super(commandExecutor, name); this.commandExecutor = commandExecutor; this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(); this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub(); } } 复制代码
RedissonBaseLock是RedissonLock的基类
id:ConnectionManager的id,UUID.randomUUID().toString(),如果一个进程只有一个RedissonClient,那么可以认为这个id在进程内唯一,后续我们就称id是进程id;
entryName:进程id拼接业务对象标识name,比如UUID.randomUUID().toString()+":"+订单号;
internalLockLeaseTime:watchDog的超时时间,默认30s;
protected long internalLockLeaseTime; final String id; final String entryName; final CommandAsyncExecutor commandExecutor; public RedissonBaseLock(CommandAsyncExecutor commandExecutor, String name) { super(commandExecutor, name); this.commandExecutor = commandExecutor; this.id = commandExecutor.getConnectionManager().getId(); this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(); this.entryName = id + ":" + name; } 复制代码
RedissonObject是RedssionClient构造出来的大多数对象的基类,也是RedissonBaseLock的基类,没什么特别的。
public abstract class RedissonObject implements RObject { protected final CommandAsyncExecutor commandExecutor; protected String name; protected final Codec codec; public RedissonObject(Codec codec, CommandAsyncExecutor commandExecutor, String name) { this.codec = codec; this.name = name; this.commandExecutor = commandExecutor; if (name == null) { throw new NullPointerException("name can't be null"); } } public RedissonObject(CommandAsyncExecutor commandExecutor, String name) { this(commandExecutor.getConnectionManager().getCodec(), commandExecutor, name); } } 复制代码
四、tryLock无参方法
无参的tryLock底层调用tryAcquireOnceAsync返回一个Future,再通过get阻塞等待Future完成。
tryAcquireOnceAsync有四个入参:
waitTime:如果当前其他人获取了锁,你愿意等多少时间;
leaseTime:如果你获取了锁,你想要多久自动释放;
unit:针对leaseTime的单位;
threadId:当前线程id;
针对无参tryLock,waitTime和leaseTime都是-1。
waitTime=-1,意味着不等待,如果当前锁被其他人占用,快速返回false;
leaseTime=-1,意味深长,如果是-1,代表启用自动续期watchDog,如果不是-1,代表用户自己控制锁过期时间,比如60秒。为什么这么说,继续看源码。
@Override public boolean tryLock() { return get(tryLockAsync()); } @Override public RFuture<Boolean> tryLockAsync() { return tryLockAsync(Thread.currentThread().getId()); } @Override public RFuture<Boolean> tryLockAsync(long threadId) { return tryAcquireOnceAsync(-1, -1, null, threadId); } private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { if (leaseTime != -1) { // case1 return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN); } // case2 RFuture<Boolean> ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN); ttlRemainingFuture.onComplete((ttlRemaining, e) -> { if (e != null) { return; } if (ttlRemaining) { // watchDog scheduleExpirationRenewal(threadId); } }); return ttlRemainingFuture; } 复制代码
无论leaseTime和waitTime的赋值如何,都会先调用tryLockInnerAsync。只是针对leaseTime=-1的情况,还要在tryLockInnerAsync获取锁成功后,执行scheduleExpirationRenewal。
tryLockInnerAsync是个核心方法,就是执行lua脚本,参数如下:
KEYS[1]:我们Redisson.getLock传入的name,代表需要锁定的业务对象,比如订单号;
ARGV[1]:leaseTime,锁自动释放时间,比如无参tryLock这里是internalLockLeaseTime,就是watchDog的续期时间,如果是用户传入leaseTime,那就是用户指定锁定过期时间,对于redis来说就是key的ttl;
ARGV[2]:进程id拼接threadid;
整体lua脚本不算复杂,redisson采用map来存储获取锁成功的客户端id(id+threadId)和重入次数,如果获取成功,返回nil,如果获取失败,返回当前获取锁的剩余ttl。至于为什么是map,还没看到原因
RedisCommands.EVAL_NULL_BOOLEAN会解析,如果返回nil代表true获取锁成功,如果返回非nil代表false获取锁失败。
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { internalLockLeaseTime = unit.toMillis(leaseTime); return evalWriteAsync(getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); } protected String getLockName(long threadId) { return id + ":" + threadId; } 复制代码
ok由于我们调用的是tryLock的无参方法,leaseTime是-1,对于资源key的超时时间默认就是30秒,如果获取锁成功,接下来还要开启watchDog。
RedissonBaseLock.scheduleExpirationRenewal,构造一个ExpirationEntry放到全局map中,key是entryName(进程id+订单号)将当前线程id放入,并执行renewExpiration。
private static final ConcurrentMap<String, ExpirationEntry> EXPIRATION_RENEWAL_MAP = new ConcurrentHashMap<>(); protected void scheduleExpirationRenewal(long threadId) { ExpirationEntry entry = new ExpirationEntry(); ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry); if (oldEntry != null) { oldEntry.addThreadId(threadId); } else { entry.addThreadId(threadId); renewExpiration(); } } protected String getEntryName() { return entryName; } 复制代码
重点来了,watchDog在renewExpiration被提交,按照internalLockLeaseTime/3,也就是默认30s/3=10s延迟执行renew延长当前线程获取锁的时间,直到当前锁被释放取消这个Timeout。也就是说watchDog默认在获取锁成功的10秒后,就会执行续期。
这个延迟任务Timeout是Netty的实现,底层是Netty实现的HashedWheelTimer时间轮。
private void renewExpiration() { ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ee == null) { return; } Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ent == null) { return; } Long threadId = ent.getFirstThreadId(); if (threadId == null) { return; } RFuture<Boolean> future = renewExpirationAsync(threadId); future.onComplete((res, e) -> { if (e != null) { log.error("Can't update lock " + getName() + " expiration", e); EXPIRATION_RENEWAL_MAP.remove(getEntryName()); return; } if (res) { // reschedule itself renewExpiration(); } }); } }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); ee.setTimeout(task); } 复制代码
renew逻辑也比较简单,判断是否是当前线程持有锁,如果是的话按照internalLockLeaseTime设置新的过期时间。
protected RFuture<Boolean> renewExpirationAsync(long threadId) { return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return 1; " + "end; " + "return 0;", Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); } 复制代码
五、tryLock两个参数方法
两个参数的tryLock方法也是实现了JDK的Lock接口,是指定waitTime,就是如果其他人获取锁了,最多允许等待waitTime时间获取锁,否则返回false。底层调用tryLock三个参数方法,相应的leaseTime还是-1,代表要走watchDog逻辑。
@Override public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException { return tryLock(waitTime, -1, unit); } 复制代码
带waitTime的tryLock主要逻辑就两部分:
tryAcquire竞争锁;
订阅一个channel,接收key过期消息(那就是unlock的时候,会publish一个消息),底层通过JDK的Semaphore实现同步,这也是RedissonLock的LockPubSub的作用,用于发布锁定key过期和接收锁定key过期;
以上逻辑都建立在不超过waitTime的基础之上。
@Override public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long time = unit.toMillis(waitTime); long current = System.currentTimeMillis(); long threadId = Thread.currentThread().getId(); // 1. 竞争锁 Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId); // lock acquired if (ttl == null) { return true; } time -= System.currentTimeMillis() - current; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } current = System.currentTimeMillis(); // 2. 利用pubsub订阅一个channel,为了接收锁被unlock的消息 RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId); if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) { if (!subscribeFuture.cancel(false)) { subscribeFuture.onComplete((res, e) -> { if (e == null) { unsubscribe(subscribeFuture, threadId); } }); } acquireFailed(waitTime, unit, threadId); return false; } try { time -= System.currentTimeMillis() - current; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } while (true) { // 3. 竞争锁 long currentTime = System.currentTimeMillis(); ttl = tryAcquire(waitTime, leaseTime, unit, threadId); // lock acquired if (ttl == null) { return true; } time -= System.currentTimeMillis() - currentTime; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } // 4. 等待锁过期消息,重新进入3 currentTime = System.currentTimeMillis(); if (ttl >= 0 && ttl < time) { subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); } time -= System.currentTimeMillis() - currentTime; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } } } finally { // 5. 取消订阅 unsubscribe(subscribeFuture, threadId); } } 复制代码
tryAcquire的逻辑都和tryAcquireOnceAsync一致,只是这里会返回key的过期时间ttl。
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) { return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId)); } private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { if (leaseTime != -1) { return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); ttlRemainingFuture.onComplete((ttlRemaining, e) -> { if (e != null) { return; } // lock acquired if (ttlRemaining == null) { scheduleExpirationRenewal(threadId); } }); return ttlRemainingFuture; } 复制代码
发布订阅channel=redisson_lock__channel:{锁定业务对象,如订单号}。
protected final LockPubSub pubSub; protected RFuture<RedissonLockEntry> subscribe(long threadId) { return pubSub.subscribe(getEntryName(), getChannelName()); } String getChannelName() { return prefixName("redisson_lock__channel", getName()); } public static String prefixName(String prefix, String name) { if (name.contains("{")) { return prefix + ":" + name; } return prefix + ":{" + name + "}"; } 复制代码
六、tryLock三个参数方法
其实tryLock两个参数方法,底层调用的就是tryLock三个参数方法,只不过leaseTime是-1。
而三个参数的tryLock方法,顶层接口不是JDK的Lock,而是Redissson的RLock。
/** * Tries to acquire the lock with defined <code>leaseTime</code>. * Waits up to defined <code>waitTime</code> if necessary until the lock became available. * * Lock will be released automatically after defined <code>leaseTime</code> interval. * * @param waitTime the maximum time to acquire the lock * @param leaseTime lease time * @param unit time unit * @return <code>true</code> if lock is successfully acquired, * otherwise <code>false</code> if lock is already set. * @throws InterruptedException - if the thread is interrupted */ boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException; 复制代码
七、unlock
有始有终吧,把unlock也写了。
其实根据前面lock的逻辑,unlock也大致能猜到。
lua脚本解锁:要注意判断自己的id是否是当前获取锁成功的客户端id;要注意重入逻辑;
无论使用什么方式加锁,解锁都要publish对应key删除的消息到channel=redisson_lock__channel:{锁定业务对象,如订单号};
取消watchDog;
@Override public void unlock() { try { get(unlockAsync(Thread.currentThread().getId())); } catch (RedisException e) { if (e.getCause() instanceof IllegalMonitorStateException) { throw (IllegalMonitorStateException) e.getCause(); } else { throw e; } } } @Override public RFuture<Void> unlockAsync(long threadId) { RPromise<Void> result = new RedissonPromise<>(); RFuture<Boolean> future = unlockInnerAsync(threadId); future.onComplete((opStatus, e) -> { // 3. 取消watchDog(HashTimeWheel里的那个timeout) cancelExpirationRenewal(threadId); if (e != null) { result.tryFailure(e); return; } if (opStatus == null) { IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + threadId); result.tryFailure(cause); return; } result.trySuccess(null); }); return result; } protected RFuture<Boolean> unlockInnerAsync(long threadId) { return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + // 判断是否是自己上的锁 "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + // 减少重入次数 "if (counter > 0) then " + // 如果重入次数仍然大于0 "redis.call('pexpire', KEYS[1], ARGV[2]); " + // renew续租 "return 0; " + "else " + // 如果重入次数为0 "redis.call('del', KEYS[1]); " + // 删除key "redis.call('publish', KEYS[2], ARGV[1]); " + // 发布解锁消息 "return 1; " + "end; " + "return nil;", Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)); } 复制代码
八、总结
本文分析了Redisson分布式锁tryLock api的实现,我们这里看的只是独占可重入的RedissonLock实现,其实Redisson不仅仅支持这种独占分布式锁,还支持许多其他的高级功能,只不过用到的机会比较少(我没用到过)。
对于tryLock api有两个重要的参数:
waitTime:代表最多等待多久获取锁,如果设置为-1,获取锁失败后直接返回false;如果设置非-1,底层会利用redis的pubsub监听解锁消息,在未超时的情况下获取锁;
leaseTime:代表锁自动过期的时间,如果设置为-1,代表开启watchDog,只要业务还在进行,就会自动续期;如果设置非-1,由leaseTime决定锁过期时间,不会开启watchDog;
回过头来说,面试官问的trylock如果设置了超时时间,他会开启watchdog吗?该怎么回答?
waitTime是你所谓的超时时间吗,还是leaseTime是你所谓的超时时间?如果是前者,是两个参数的tryLock方法,那么watchDog会开启;如果是后者,面试官是你对了。
so what?这是你选拔人才的方式吗?
作者:程序猿阿越
链接:https://juejin.cn/post/7168727597180780574