单机版Redis环境下的Redis分布式锁(上篇)
一、Redis分布式锁的原理
Redis分布式锁的原理就是在Redis里面占住一个“坑位”,当有”其他人“也要占这个坑位的时候,发现已经”坑位“已经被占了,那就加锁失败,就能放弃或者重试。
“占坑”一般使用setex(set if not exists)命令,只允许一个客户端来设置键-值(占坑)。先到先得,用完之后,再使用del命令删除这个key。
关于setnx命令:
一般来说我们使用Redis分布式锁,设置了一个Key之后,需要给它设置一个过期时间。保证出现异常异常的话,可以通过过期时间来删除Key。
setnx lock:foo true # 抢锁 expire lock:foo 5 # 加过期时间 复制代码
但是很明显设置键值和加过期时间是两条命令,不是原子命令,如果在这两条命令中间的时间里Redis发生了异常,导致expire
命令没有被执行,那么就会造成死锁,在Redis2.8开始,便在set命令添加了扩展参数。使得setnx命令和expire命令可以一起执行。
set命令的扩展参数:
EX seconds – Set the specified expire time, in seconds. PX milliseconds – Set the specified expire time, in milliseconds. NX – Only set the key if it does not already exist. XX – Only set the key if it already exist. EX seconds – 设置键key的过期时间,单位时秒 PX milliseconds – 设置键key的过期时间,单位时毫秒 NX – 只有键key不存在的时候才会设置key的值 XX – 只有键key存在的时候才会设置key的值 复制代码
通过在set命令添加上述的参数,已经可以完全取代setnx,setex,psetex命令。
综上所述:
由于setnx命令的缺陷,所以使用set key value [EX seconds] [PX milliseconds] [NX]
来取代setnx命令。
二、分布式锁必备的条件
独占性
既然是锁,那就必须是独占性,任何时刻只能有且仅有一个线程持有。
高可用
如果Redis是集群的环境,不能因为某一个节点的的不可用而导致获取锁和释放失败。
防止死锁
必须要超时控制机制或者撤销锁的操作,防止不能异常情况不能正确释放锁而导致的死锁。
不可乱抢
防止”张冠李戴“,一个客户端设置的锁只能由客户端来释放,不能被其他的客户端释放掉。
可重入
锁一般都是具有可重入性,同一个节点的同一个线程获得了锁之后,可以再次去获取这个锁。例如我们常用的synchronizd和Lock,只不过这两个都是本地锁。
三、基于Spring Boot实现Redis分布式锁案例
这里使用Spring Boot进行快速开发,Redis使用6.08版本。关于Redis的安装可自行去搜索安装。
用到的依赖
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> </dependency> </dependencies> 复制代码
启动类
@SpringBootApplication public class LockApplication { public static void main(String[] args) { SpringApplication.run(LockApplication.class, args); } } 复制代码
Redis的配置类
@Configuration public class RedisConfig { @Bean public RedisTemplate<String, Serializable> redisTemplate(LettuceConnectionFactory connectionFactory) { RedisTemplate<String, Serializable> redisTemplate = new RedisTemplate<>(); redisTemplate.setKeySerializer(new StringRedisSerializer()); redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer()); redisTemplate.setConnectionFactory(connectionFactory); return redisTemplate; } } 复制代码
配置文件的一些配置
server.port=8080 # Redis数据库索引(默认为0) spring.redis.database=0 # Redis服务器地址 spring.redis.host=192.168.244.10 # Redis服务器连接端口 spring.redis.port=6379 # Redis服务器连接密码(默认为空) spring.redis.password= # 连接池最大连接数(使用负值表示没有限制) 默认 8 spring.redis.lettuce.pool.max-active=8 # 连接池最大阻塞等待时间(使用负值表示没有限制) 默认 -1 spring.redis.lettuce.pool.max-wait=-1 # 连接池中的最大空闲连接 默认 8 spring.redis.lettuce.pool.max-idle=8 # 连接池中的最小空闲连接 默认 0 spring.redis.lettuce.pool.min-idle=0 复制代码
定义controller,因为重点关注的是Redis分布式锁,这里代码便全都在controller里面编写。
@RestController @RequestMapping("lock") public class LockTestController { private final static String lockKey = "distributed-lock:goods:1"; private final Logger logger = LoggerFactory.getLogger(LockTestController.class); private final int timeout = 10; @Autowired private StringRedisTemplate stringRedisTemplate; @GetMapping("test") public Object distributedLockV1() { String value = UUID.randomUUID() + Thread.currentThread().getName(); // 抢锁成功的话,并设置key的过期时间 try { Boolean gotLock = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, value, timeout, TimeUnit.SECONDS); if (gotLock) { logger.info(Thread.currentThread().getName() + ":抢锁成功"); try { // 这里模拟业务代码的执行时间 TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } return "抢锁成功!"; } } catch (Exception e) { e.printStackTrace(); } finally { // 释放资源,即解锁 stringRedisTemplate.delete(lockKey); } // 对于抢锁失败的,可以重试抢锁,也可以直接返回友好的失败操作。 return "抢锁失败"; } } 复制代码
四、Redis分布式锁超时问题
4.1 正确释放锁
在开始的时候,说到为保证Redis异常,没能正确释放锁,应该给Key设置一个过期时间,让这个Key过期了而被删除。
现在讨论上一章节实现的分布式锁:
加入在抢锁成功之后,**在执行业务逻辑的时候时间过长,此时该线程(线程1)设置的Key过期被删除了,其他线程(线程2)会抢锁成功又重新设置了Key,线程1执行完,释放锁的时候,由于线程1设置的Key过期了,此时删除的线程2设置的Key。*这样线程2岂不是就懵!
解决方案:
所以在删除Key的时候,判断一下这个Key是不是该线程抢锁时设置的Key,所以在设置Key的时候也把Value也设置,在删除的时候,判断一下Value时候一致。自己释放自己加的锁。
改进代码:
@GetMapping("test2") public Object distributedLockV2() { // 生成随机值,在删除Key时判断 String value = UUID.randomUUID() + Thread.currentThread().getName(); // 抢锁成功的话,并设置key的过期时间 try { Boolean gotLock = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, value, timeout, TimeUnit.SECONDS); if (gotLock) { logger.info(Thread.currentThread().getName() + ": 抢锁成功"); try { // 这里模拟业务代码的执行时间 TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } return "抢锁成功!"; } } catch (Exception e) { e.printStackTrace(); } finally { if (value.equals(stringRedisTemplate.opsForValue().get(lockKey))){ // 释放资源,即解锁 stringRedisTemplate.delete(lockKey); } } // 对于抢锁失败的,可以重试抢锁,也可以直接返回友好的失败操作。 return "抢锁失败"; } 复制代码
4.2 原子性保证
在这里呢,又仔细想想,上一小节distributedLockV2
方法里,finally代码里判断和删除会被分成两条命令,此时这两条命令会被分开执行,这就不是原子命令。如果看过Redis官网的同学,应该知道,官网其实是给出了解决方法:使用Lua脚本执行,Redis在解析Lua脚本时是原子的。
解决这个判断和删除原子性问题的Lua脚本:
@RestController @RequestMapping("lock") public class LockTestController { private final static String lockKey = "distributed-lock:goods:1"; // 命令执行成功,返回1,失败返回0 private final static String luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] " + "then " + "return redis.call('del', KEYS[1]) " + "else " + " return 0 " + "end"; private final Logger logger = LoggerFactory.getLogger(LockTestController.class); private final int timeout = 50; @Autowired private StringRedisTemplate stringRedisTemplate; @GetMapping("test3") public Object distributedLockV3() { String value = UUID.randomUUID() + Thread.currentThread().getName(); // 抢锁成功的话,并设置key的过期时间 try { Boolean gotLock = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, value, timeout, TimeUnit.SECONDS); if (gotLock) { logger.info(Thread.currentThread().getName() + ": 抢锁成功"); try { // 这里模拟业务代码的执行时间 TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } return "抢锁成功!"; } } catch (Exception e) { e.printStackTrace(); } finally { DefaultRedisScript<Long> script = new DefaultRedisScript<>(luaScript, Long.class); Long result = stringRedisTemplate.execute(script, Collections.singletonList(lockKey), value); if (result != null) { if (result.equals(1L)) { logger.info("释放锁---" + lockKey + "---成功"); } else { logger.error("释放Redis分布式锁---" + lockKey + "---失败"); } } logger.error("执行lua脚本异常!"); } // 对于抢锁失败的,可以重试抢锁,也可以直接返回友好的失败操作。 return "抢锁失败"; } } 复制代码
4.3 小总结
1、加了Redis分布式锁,如果出现异常的话,可能无法释放锁,所以在代码层面在finally释放锁。
2、防止因为异常导致代码没有执行到finally,设置的Key需要添加过期时间。
3、释放锁时,需要给Key设置Value值,在释放锁的时候时候,校验一下Value是否一致,防止是放错别人设置的锁。
4、确保判断和删除两条Redis命令是原子的,通过Lua脚本完成。
到这里关于单机Redis下的分布式锁的实现,到这就了。其实还是会有问题:1、Key过期了,但是业务还没执行完,这就要对Key进行续期。2、单机Redis是CP的,集群Redis是AP。主从异步复制可能会导致锁丢失。
作者:LoveLifeSuper
链接:https://juejin.cn/post/7027145032230502431