阅读 61

Redisson分布式锁实现原理

本文一方面是抱怨一下现下的面试体验,另一方面看一下redisson(3.15.1)的trylock api的实现(我也不喜欢针对面试去看源码,只是想找个出口宣泄一下)。

笔者是因为喜欢写代码才干的java开发这一行,但是现实很残酷,要找一个技术氛围好的团队愉快的写代码并不是一件容易的事情,从作为应聘者的角度只能看到功利的八股文。

最近陆陆续续面试了一些大厂和中厂,在沟通顺利的情况下(双方在同一个频道上),八股文基本上没什么问题(至少我自己认为是这样的),但是面试体验并不是很好,所以一面之后我都拒了。

举个简单的例子,最近面试了一个中厂,对方在谈到分布式锁的时候,问我们分布式锁是怎么用的。

我:比如我们在xxx业务场景下需要用到分布式锁,防止在并发场景下同一个单据在内存中的数据计算逻辑错误。我们用了redisson这个工具,在redis官方的单机锁的基础上(set nx px 加锁,lua脚本解锁),利用watchdog解决续约问题。比如trylock 60秒,他会默认在锁过期前续约30秒。

面试官:trylock如果设置了超时时间,他会开启watchdog吗?

我:默认就会。

面试官:设置了超时时间,他默认不会开启watchdog。

我:怎么会呢?肯定会啊。

面试官:那你们一直都是用带超时时间的api吗?

我:我们都用默认的无参的tryLock,刚才我tryLock60秒只是随便举个例子。

面试官:你去看过源码吗?

我:这个redisson的源码我倒是没有看过,毕竟是一个工具,没遇到问题的时候不会去追溯源码。

我不知道对方的想法是什么,我比较介意的是针对一个工具的api问底层源码,毕竟不是每个人摸鱼的时候都会点进redissonclient的(我承认我点进去过,但是不是为了背诵来面试,只是好奇)。在我看来工具是工具,只要你思路是对的,就没有问题,看源码只是解决问题和培养解决问题思路的一种方式。

此外,redisson在我们团队内部被引入其实是因为某个开发的个人行为,他说我们以前的用法不对(redis官方的单机锁),要用redisson,我也无法反驳,那就用吧。

但是这位开发引入新组件也没有去看官方文档,导致解锁用法有误,这部分代码一直在线上,因为也没什么严重的影响(后续的开发过来直接复制粘贴,错可以一直错下去)。

RLock lock = redissonClient.getLock(key); try {     if (lock.tryLock()) {         // 业务方法     } } finally {     if (lock.isLocked() && lock.isHeldByCurrentThread()) { // ?         lock.unlock();     } } 复制代码

主要错误的地方在于lock.isLocked和lock.isHeldByCurrentThread会增加两次对redis的远程调用。

在不看redisson的官方文档的情况下,只看JDK中Lock接口的java doc,我们就可以看出tryLock和unlock应该怎么用。

很明显,在tryLock成功的情况下,try/catch包裹所有业务逻辑,而不是在tryLock之外通过try/catch包裹。Redisson既然实现了JDK的Lock接口,就不会改变他的语义。

本身团队内部的规范是直接使用的redis官方的单机分布式锁,没有watchdog,至于为什么没用redisson主要原因有几个:

  • 必要性:以前台接口为例,如果用redis官方的单机锁(set nx px 加锁,lua脚本解锁)超时时间设置为60秒,你说你业务超过60秒,tomcat一共就200线程,你这种业务存在,一个接口响应时间在60秒,直接就可能导致tomcat线程池打满夯住,你还没到需要watchdog续约的情况就挂了;

  • 系统复杂度:在不理解原理的情况下,我不会引入额外的中间件或者工具,一方面是出问题不好排查,另一方面越多的中间件和工具会导致系统越复杂,非必要不引入;

  • 团队:团队成员对开发对中间件和工具的认知不足,引入新的组件会提高开发的门槛,我们不能要求团队里每个人都对所有的中间件了如指掌,也不能对于每一行改动的代码都做代码审查;

二、redisson客户端

我们先从自动配置入手,所有的api调用都是由RedissonClient发起的。

@Bean(     destroyMethod = "shutdown" ) @ConditionalOnMissingBean({RedissonClient.class}) public RedissonClient redisson() throws IOException {     Config config = null;     // 装载Config配置,比如连接的     return Redisson.create(config); } 复制代码

Redisson.create根据Config配置创建RedissonClient的实现Redisson。

public static RedissonClient create(Config config) {     Redisson redisson = new Redisson(config);     if (config.isReferenceEnabled()) {         redisson.enableRedissonReferenceSupport();     }     return redisson; } 复制代码

Redisson在构造时new了三个对象:

  • ConnectionManager:负责底层通讯,不看底层了,直接给答案就是Netty

  • EvictionScheduler:不知道是什么,也不想追究

  • WriteBehindService:不知道是什么,也不想追究

public class Redisson implements RedissonClient {     static {         RedissonReference.warmUp();     }     protected final QueueTransferService queueTransferService = new QueueTransferService();     protected final EvictionScheduler evictionScheduler;     protected final WriteBehindService writeBehindService;     protected final ConnectionManager connectionManager;     protected final ConcurrentMap<Class<?>, Class<?>> liveObjectClassCache = new ConcurrentHashMap<>();     protected final Config config;     protected final ConcurrentMap<String, ResponseEntry> responses = new ConcurrentHashMap<>();     protected Redisson(Config config) {         this.config = config;         Config configCopy = new Config(config);         connectionManager = ConfigSupport.createConnectionManager(configCopy);         evictionScheduler = new EvictionScheduler(connectionManager.getCommandExecutor());         writeBehindService = new WriteBehindService(connectionManager.getCommandExecutor());     } } 复制代码

三、getLock

Redisson.getLock将ConnectionManager里的CommandExecutor带进RedissonLock,目的肯定是为了RedissonLock可以调用底层通讯方法请求redis。name对应的就是我们的要锁的对象的标识,比如订单号。

@Override public RLock getLock(String name) {     return new RedissonLock(connectionManager.getCommandExecutor(), name); } 复制代码

RedissonLock

  • internalLockLeaseTime:watchDog的超时时间,默认30s;

  • pubsub:暂时不知道有啥用,感觉是用到了redis的pubsub;

public class RedissonLock extends RedissonBaseLock {     protected long internalLockLeaseTime;     protected final LockPubSub pubSub;     final CommandAsyncExecutor commandExecutor;     public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {         super(commandExecutor, name);         this.commandExecutor = commandExecutor;         this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();         this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();     } } 复制代码

RedissonBaseLock是RedissonLock的基类

  • id:ConnectionManager的id,UUID.randomUUID().toString(),如果一个进程只有一个RedissonClient,那么可以认为这个id在进程内唯一,后续我们就称id是进程id;

  • entryName:进程id拼接业务对象标识name,比如UUID.randomUUID().toString()+":"+订单号;

  • internalLockLeaseTime:watchDog的超时时间,默认30s;

protected long internalLockLeaseTime; final String id; final String entryName; final CommandAsyncExecutor commandExecutor; public RedissonBaseLock(CommandAsyncExecutor commandExecutor, String name) {     super(commandExecutor, name);     this.commandExecutor = commandExecutor;     this.id = commandExecutor.getConnectionManager().getId();     this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();     this.entryName = id + ":" + name; } 复制代码

RedissonObject是RedssionClient构造出来的大多数对象的基类,也是RedissonBaseLock的基类,没什么特别的。

public abstract class RedissonObject implements RObject {     protected final CommandAsyncExecutor commandExecutor;     protected String name;     protected final Codec codec;     public RedissonObject(Codec codec, CommandAsyncExecutor commandExecutor, String name) {         this.codec = codec;         this.name = name;         this.commandExecutor = commandExecutor;         if (name == null) {             throw new NullPointerException("name can't be null");         }     }     public RedissonObject(CommandAsyncExecutor commandExecutor, String name) {         this(commandExecutor.getConnectionManager().getCodec(), commandExecutor, name);     } } 复制代码

四、tryLock无参方法

无参的tryLock底层调用tryAcquireOnceAsync返回一个Future,再通过get阻塞等待Future完成。

tryAcquireOnceAsync有四个入参:

  • waitTime:如果当前其他人获取了锁,你愿意等多少时间;

  • leaseTime:如果你获取了锁,你想要多久自动释放;

  • unit:针对leaseTime的单位;

  • threadId:当前线程id;

针对无参tryLock,waitTime和leaseTime都是-1

waitTime=-1,意味着不等待,如果当前锁被其他人占用,快速返回false;

leaseTime=-1,意味深长,如果是-1,代表启用自动续期watchDog,如果不是-1,代表用户自己控制锁过期时间,比如60秒。为什么这么说,继续看源码。

@Override public boolean tryLock() {     return get(tryLockAsync()); } @Override public RFuture<Boolean> tryLockAsync() {     return tryLockAsync(Thread.currentThread().getId()); } @Override public RFuture<Boolean> tryLockAsync(long threadId) {     return tryAcquireOnceAsync(-1, -1, null, threadId); } private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {     if (leaseTime != -1) {         // case1         return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);     }     // case2     RFuture<Boolean> ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,                                                                 TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);     ttlRemainingFuture.onComplete((ttlRemaining, e) -> {         if (e != null) {             return;         }         if (ttlRemaining) {             // watchDog             scheduleExpirationRenewal(threadId);         }     });     return ttlRemainingFuture; } 复制代码

无论leaseTime和waitTime的赋值如何,都会先调用tryLockInnerAsync。只是针对leaseTime=-1的情况,还要在tryLockInnerAsync获取锁成功后,执行scheduleExpirationRenewal。

tryLockInnerAsync是个核心方法,就是执行lua脚本,参数如下:

  • KEYS[1]:我们Redisson.getLock传入的name,代表需要锁定的业务对象,比如订单号;

  • ARGV[1]:leaseTime,锁自动释放时间,比如无参tryLock这里是internalLockLeaseTime,就是watchDog的续期时间,如果是用户传入leaseTime,那就是用户指定锁定过期时间,对于redis来说就是key的ttl;

  • ARGV[2]:进程id拼接threadid;

整体lua脚本不算复杂,redisson采用map来存储获取锁成功的客户端id(id+threadId)和重入次数,如果获取成功,返回nil,如果获取失败,返回当前获取锁的剩余ttl。至于为什么是map,还没看到原因

RedisCommands.EVAL_NULL_BOOLEAN会解析,如果返回nil代表true获取锁成功,如果返回非nil代表false获取锁失败。

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {     internalLockLeaseTime = unit.toMillis(leaseTime);     return evalWriteAsync(getName(), LongCodec.INSTANCE, command,             "if (redis.call('exists', KEYS[1]) == 0) then " +                     "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +                     "redis.call('pexpire', KEYS[1], ARGV[1]); " +                     "return nil; " +                     "end; " +                     "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +                     "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +                     "redis.call('pexpire', KEYS[1], ARGV[1]); " +                     "return nil; " +                     "end; " +                     "return redis.call('pttl', KEYS[1]);",             Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); } protected String getLockName(long threadId) {     return id + ":" + threadId; } 复制代码

ok由于我们调用的是tryLock的无参方法,leaseTime是-1,对于资源key的超时时间默认就是30秒,如果获取锁成功,接下来还要开启watchDog。

RedissonBaseLock.scheduleExpirationRenewal,构造一个ExpirationEntry放到全局map中,key是entryName(进程id+订单号)将当前线程id放入,并执行renewExpiration。

private static final ConcurrentMap<String, ExpirationEntry> EXPIRATION_RENEWAL_MAP = new ConcurrentHashMap<>(); protected void scheduleExpirationRenewal(long threadId) {     ExpirationEntry entry = new ExpirationEntry();     ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);     if (oldEntry != null) {         oldEntry.addThreadId(threadId);     } else {         entry.addThreadId(threadId);         renewExpiration();     } } protected String getEntryName() {     return entryName; } 复制代码

重点来了,watchDog在renewExpiration被提交,按照internalLockLeaseTime/3,也就是默认30s/3=10s延迟执行renew延长当前线程获取锁的时间,直到当前锁被释放取消这个Timeout。也就是说watchDog默认在获取锁成功的10秒后,就会执行续期

这个延迟任务Timeout是Netty的实现,底层是Netty实现的HashedWheelTimer时间轮。

private void renewExpiration() {     ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());     if (ee == null) {         return;     }          Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {         @Override         public void run(Timeout timeout) throws Exception {             ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());             if (ent == null) {                 return;             }             Long threadId = ent.getFirstThreadId();             if (threadId == null) {                 return;             }                          RFuture<Boolean> future = renewExpirationAsync(threadId);             future.onComplete((res, e) -> {                 if (e != null) {                     log.error("Can't update lock " + getName() + " expiration", e);                     EXPIRATION_RENEWAL_MAP.remove(getEntryName());                     return;                 }                                  if (res) {                     // reschedule itself                     renewExpiration();                 }             });         }     }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);          ee.setTimeout(task); } 复制代码

renew逻辑也比较简单,判断是否是当前线程持有锁,如果是的话按照internalLockLeaseTime设置新的过期时间。

protected RFuture<Boolean> renewExpirationAsync(long threadId) {     return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,             "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +                     "redis.call('pexpire', KEYS[1], ARGV[1]); " +                     "return 1; " +                     "end; " +                     "return 0;",             Collections.singletonList(getName()),             internalLockLeaseTime, getLockName(threadId)); } 复制代码

五、tryLock两个参数方法

两个参数的tryLock方法也是实现了JDK的Lock接口,是指定waitTime,就是如果其他人获取锁了,最多允许等待waitTime时间获取锁,否则返回false。底层调用tryLock三个参数方法,相应的leaseTime还是-1,代表要走watchDog逻辑。

@Override public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {     return tryLock(waitTime, -1, unit); } 复制代码

带waitTime的tryLock主要逻辑就两部分:

  1. tryAcquire竞争锁;

  2. 订阅一个channel,接收key过期消息(那就是unlock的时候,会publish一个消息),底层通过JDK的Semaphore实现同步,这也是RedissonLock的LockPubSub的作用,用于发布锁定key过期和接收锁定key过期;

以上逻辑都建立在不超过waitTime的基础之上。

@Override public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {     long time = unit.toMillis(waitTime);     long current = System.currentTimeMillis();     long threadId = Thread.currentThread().getId();     // 1. 竞争锁     Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);     // lock acquired     if (ttl == null) {         return true;     }     time -= System.currentTimeMillis() - current;     if (time <= 0) {         acquireFailed(waitTime, unit, threadId);         return false;     }          current = System.currentTimeMillis();     // 2. 利用pubsub订阅一个channel,为了接收锁被unlock的消息     RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);     if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {         if (!subscribeFuture.cancel(false)) {             subscribeFuture.onComplete((res, e) -> {                 if (e == null) {                     unsubscribe(subscribeFuture, threadId);                 }             });         }         acquireFailed(waitTime, unit, threadId);         return false;     }     try {         time -= System.currentTimeMillis() - current;         if (time <= 0) {             acquireFailed(waitTime, unit, threadId);             return false;         }              while (true) {             // 3. 竞争锁             long currentTime = System.currentTimeMillis();             ttl = tryAcquire(waitTime, leaseTime, unit, threadId);             // lock acquired             if (ttl == null) {                 return true;             }             time -= System.currentTimeMillis() - currentTime;             if (time <= 0) {                 acquireFailed(waitTime, unit, threadId);                 return false;             }             // 4. 等待锁过期消息,重新进入3             currentTime = System.currentTimeMillis();             if (ttl >= 0 && ttl < time) {                 subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);             } else {                 subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);             }             time -= System.currentTimeMillis() - currentTime;             if (time <= 0) {                 acquireFailed(waitTime, unit, threadId);                 return false;             }         }     } finally {         // 5. 取消订阅         unsubscribe(subscribeFuture, threadId);     } } 复制代码

tryAcquire的逻辑都和tryAcquireOnceAsync一致,只是这里会返回key的过期时间ttl。

private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {     return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId)); } private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {     if (leaseTime != -1) {         return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);     }     RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,                                                             TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);     ttlRemainingFuture.onComplete((ttlRemaining, e) -> {         if (e != null) {             return;         }         // lock acquired         if (ttlRemaining == null) {             scheduleExpirationRenewal(threadId);         }     });     return ttlRemainingFuture; } 复制代码

发布订阅channel=redisson_lock__channel:{锁定业务对象,如订单号}。

protected final LockPubSub pubSub; protected RFuture<RedissonLockEntry> subscribe(long threadId) {     return pubSub.subscribe(getEntryName(), getChannelName()); } String getChannelName() {     return prefixName("redisson_lock__channel", getName()); } public static String prefixName(String prefix, String name) {     if (name.contains("{")) {         return prefix + ":" + name;     }     return prefix + ":{" + name + "}"; } 复制代码

六、tryLock三个参数方法

其实tryLock两个参数方法,底层调用的就是tryLock三个参数方法,只不过leaseTime是-1。

三个参数的tryLock方法,顶层接口不是JDK的Lock,而是Redissson的RLock

/**  * Tries to acquire the lock with defined <code>leaseTime</code>.  * Waits up to defined <code>waitTime</code> if necessary until the lock became available.  *  * Lock will be released automatically after defined <code>leaseTime</code> interval.  *  * @param waitTime the maximum time to acquire the lock  * @param leaseTime lease time  * @param unit time unit  * @return <code>true</code> if lock is successfully acquired,  *          otherwise <code>false</code> if lock is already set.  * @throws InterruptedException - if the thread is interrupted  */ boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException; 复制代码

七、unlock

有始有终吧,把unlock也写了。

其实根据前面lock的逻辑,unlock也大致能猜到。

  1. lua脚本解锁:要注意判断自己的id是否是当前获取锁成功的客户端id;要注意重入逻辑;

  2. 无论使用什么方式加锁,解锁都要publish对应key删除的消息到channel=redisson_lock__channel:{锁定业务对象,如订单号};

  3. 取消watchDog;

@Override public void unlock() {     try {         get(unlockAsync(Thread.currentThread().getId()));     } catch (RedisException e) {         if (e.getCause() instanceof IllegalMonitorStateException) {             throw (IllegalMonitorStateException) e.getCause();         } else {             throw e;         }     } } @Override public RFuture<Void> unlockAsync(long threadId) {     RPromise<Void> result = new RedissonPromise<>();     RFuture<Boolean> future = unlockInnerAsync(threadId);     future.onComplete((opStatus, e) -> {         // 3. 取消watchDog(HashTimeWheel里的那个timeout)         cancelExpirationRenewal(threadId);         if (e != null) {             result.tryFailure(e);             return;         }         if (opStatus == null) {             IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "                     + id + " thread-id: " + threadId);             result.tryFailure(cause);             return;         }         result.trySuccess(null);     });     return result; } protected RFuture<Boolean> unlockInnerAsync(long threadId) {     return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,             "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + // 判断是否是自己上的锁                     "return nil;" +                     "end; " +                     "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + // 减少重入次数                     "if (counter > 0) then " + // 如果重入次数仍然大于0                     "redis.call('pexpire', KEYS[1], ARGV[2]); " + // renew续租                     "return 0; " +                     "else " + // 如果重入次数为0                     "redis.call('del', KEYS[1]); " + // 删除key                     "redis.call('publish', KEYS[2], ARGV[1]); " + // 发布解锁消息                     "return 1; " +                     "end; " +                     "return nil;",             Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)); } 复制代码

八、总结

本文分析了Redisson分布式锁tryLock api的实现,我们这里看的只是独占可重入的RedissonLock实现,其实Redisson不仅仅支持这种独占分布式锁,还支持许多其他的高级功能,只不过用到的机会比较少(我没用到过)。

对于tryLock api有两个重要的参数:

  • waitTime:代表最多等待多久获取锁,如果设置为-1,获取锁失败后直接返回false;如果设置非-1,底层会利用redis的pubsub监听解锁消息,在未超时的情况下获取锁;

  • leaseTime:代表锁自动过期的时间,如果设置为-1,代表开启watchDog,只要业务还在进行,就会自动续期;如果设置非-1,由leaseTime决定锁过期时间,不会开启watchDog;

回过头来说,面试官问的trylock如果设置了超时时间,他会开启watchdog吗?该怎么回答?

waitTime是你所谓的超时时间吗,还是leaseTime是你所谓的超时时间?如果是前者,是两个参数的tryLock方法,那么watchDog会开启;如果是后者,面试官是你对了。

so what?这是你选拔人才的方式吗?


作者:程序猿阿越
链接:https://juejin.cn/post/7168727597180780574


文章分类
代码人生
文章标签
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐