Redis分布式锁
分布式锁概述
什么是分布式锁
分布式锁:分布式微服务架构,拆分后各个微服务之间避免冲突和数据故障而加入的一种锁。
一般情况下,我们使用分布式锁主要有两个场景:
- 避免不同节点重复相同的工作:比如用户执行了某个操作有可能不同节点会发送多封邮件;
- 避免破坏数据的正确性:如果两个节点在同一条数据上同时进行操作,可能会造成数据错误或不一致的情况出现;
Java 中实现的常见方式如何实现
锁的本质:同一时间只允许一个用户操作。所以理论上,能够满足这个需求的工具我们都能够使用 (就是其他应用能帮我们加锁的):
- 基于 MySQL 中的锁:MySQL 本身有自带的悲观锁
for update
关键字,也可以自己实现悲观/乐观锁来达到目的; - 基于 Zookeeper 有序节点:Zookeeper 允许临时创建有序的子节点,这样客户端获取节点列表时,就能够当前子节点列表中的序号判断是否能够获得锁;
- 基于 Redis 的单线程:由于 Redis 是单线程,所以命令会以串行的方式执行,并且本身提供了像
SETNX(set if not exists)
这样的指令,本身具有互斥性;
一般的互联网公司,大家都习惯用redis做分布式锁 (redis — redlock ===> redisson lock/unlock)
常见面试题
- Redis除了拿来做缓存,你还见过基于Redis的什么用法?
- Redis做分布式锁的时候有需要注意的问题?
- 如果是Redis是单点部署的,会带来什么问题?那你准备怎么解决单点问题呢?
- 集群模式下,比如主从模式,有没有什么问题呢?
- 那你简单的介绍一下Redlock吧?你简历上写redisson,你谈谈
- Redis分布式锁如何续期?看门狗知道吗?
前置知识
在开始提到Redis分布式锁之前,我想跟大家聊点Redis的基础知识。
说一下Redis的两个命令:
-
SETNX key value
setnx
是SET if Not Exists的简写。如果不存在set成功返回int的1,这个key存在了返回0。
-
SETEX key seconds value
-
将值
value
关联到key
,并将key
的生存时间设为seconds
(以秒为单位) -
如果
key
已经存在,setex
命令将覆写旧值 -
setex
是一个原子性(atomic)操作,关联值和设置生存时间两个动作会在同一时间内完成 -
下图设置了10秒的失效时间,ttl命令可以查看倒计时,负的说明已经到期了
-
基础案例
1)使用场景
多个服务间保证同一时刻同一时间段内同一用户只能有一个请求(防止关键业务出现并发攻击)
2)建Module boot_redis01/boot_redis02
3)改POM
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.3.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>cn.silince</groupId>
<artifactId>boot_redis01</artifactId>
<version>0.0.1-SNAPSHOT</version>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<!--web+actuator-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!--SpringBoot与Redis整合依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
<!--jedis-->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.1.0</version>
</dependency>
<!--springboot-aop技术-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<!--redisson-->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.4</version>
</dependency>
<!--一般通用基础配置-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
4)写YML
server.port=1111
# =========redis相关配置================
# Redis数据库索引(默认为0)
spring.redis.database=0
# Redis 服务器地址
spring.redis.host=localhost
# Redis服务器连接端口
spring.redis.port=6379
# Redis服务器连接密码(默认为空)
spring.redis.password=
# 连接池最大连接数(使用负值表示没有限制)默认 8
spring.redis.lettuce.pool.max-active=8
# 连接池最大阻塞等待时间(使用负值表示没有限制)默认 -1
spring.redis.lettuce.pool.max-wait=-1
# 连接池中的最大空闲连接默认8
spring.redis.lettuce.pool.max-idle=8
# 连接池中的最小空闲连接默认0
spring.redis.lettuce.pool.min-idle=0
5)主启动
package cn.silince.redis;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class BootRedis01Application {
public static void main(String[] args) {
SpringApplication.run(BootRedis01Application.class, args);
}
}
6)业务类
- RedisConfig.java
package cn.silince.redis.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import java.io.Serializable;
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate<String, Serializable> redisTemplate(LettuceConnectionFactory connectionFactory){
RedisTemplate<String, Serializable> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(connectionFactory);
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
return redisTemplate;
}
}
- GoodController.java
package cn.silince.redis.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class GoodController {
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Value("${server.port}")
private String serverPort;
@GetMapping("/buy_goods")
public String buy_Goods() {
String result = stringRedisTemplate.opsForValue().get("goods:001");// get key === 看看库存数量够不够
int goodsNumber = result == null ? 0 : Integer.parseInt(result);
if (goodsNumber > 0) {
int realNumber = goodsNumber - 1;
stringRedisTemplate.opsForValue().set("goods:001", String.valueOf(realNumber));
System.out.println("成功买到商品,库存还剩下:" + realNumber + " 件" + "\t 服务提供端口" + serverPort);
return "成功买到商品,库存还剩下:" + realNumber + " 件" + "\t 服务提供端口" + serverPort;
}else {
System.out.println("商品已经售完/活动结束/调用超时"+"\t 服务提供端口" + serverPort);
return "商品已经售完/活动结束/调用超时"+"\t 服务提供端口" + serverPort;
}
}
}
7)测试
以上案例存在的问题
1.单机版没加锁
问题:
没有加锁,并发下数字不对,出现超卖现象
解决方案:
- 加synchronized (不见不散),适用于一定要拿到锁的业务场景,容易造成线程积压
- 加ReentrantLock (过时不候),线程等待的时间太长可以放弃等待或者给定一个时间,期间拿不到锁就放弃
@RestController
public class GoodController {
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Value("${server.port}")
private String serverPort;
private ReentrantLock lock = new ReentrantLock();
@GetMapping("/buy_goods")
public String buy_Goods() throws InterruptedException {
if (lock.tryLock(3, TimeUnit.SECONDS)) {
try {
String result = stringRedisTemplate.opsForValue().get("goods:001");// get key === 看看库存数量够不够
int goodsNumber = result == null ? 0 : Integer.parseInt(result);
if (goodsNumber > 0) {
int realNumber = goodsNumber - 1;
stringRedisTemplate.opsForValue().set("goods:001", String.valueOf(realNumber));
System.out.println("成功买到商品,库存还剩下:" + realNumber + " 件" + "\t 服务提供端口" + serverPort);
return "成功买到商品,库存还剩下:" + realNumber + " 件" + "\t 服务提供端口" + serverPort;
}else {
System.out.println("商品已经售完/活动结束/调用超时" + "\t 服务提供端口" + serverPort);
return "商品已经售完/活动结束/调用超时" + "\t 服务提供端口" + serverPort;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
return "error " + serverPort;
}
} else {
return "获取锁超时" + serverPort;
}
}
}
在单机环境下,可以使用synchronized或Lock来实现。
但是在分布式系统中,因为竞争的线程可能不在同一个节点上(同一个jvm中),所以需要一个让所有进程都能访问到的锁来实现,比如redis或者zookeeper来构建;
不同进程jvm层面的锁就不管用了,那么可以利用第三方的一个组件,来获取锁,未获取到锁,则阻塞当前想要运行的线程。
2.nginx分布式微服务架构
问题:
分布式部署后,单机锁还是出现超卖现象,需要分布式锁
环境配置:
# redis 分布式锁测试
upstream mynginx{
server 127.0.0.1:1111 weight=1;
server 127.0.0.1:2222 weight=1;
}
server {
listen 80;
server_name localhost;
location / {
proxy_pass http://mynginx;
index index.html index.htm;
}
}
访问:http://localhost/buy_goods ,可以点击看到效果,一边一个,默认轮询
2)Jmeter压测
99号商品被卖了两次!
解决方案:
上redis分布式锁setnx(redis天生就是单线程的)。
Redis具有极高的性能,且其命令对分布式锁支持友好,借助SET命令即可实现加锁处理。
修改后代码:
⚠️ 出异常的话,可能无法释放锁, 必须要在代码层面finally释放锁
@RestController
public class GoodController {
public static final String REDIS_LOCK = "silince";
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Value("${server.port}")
private String serverPort;
private ReentrantLock lock = new ReentrantLock();
@GetMapping("/buy_goods")
public String buy_Goods() {
String value = UUID.randomUUID()+Thread.currentThread().getName(); // 唯一固定标识
try {
// ⚠️ redis天生就是单线程的
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(REDIS_LOCK, value);// 相当于setNX(set key if it doesn't exist)
if (!flag){
System.out.println("抢锁失败");
return "抢锁失败";
}
String result = stringRedisTemplate.opsForValue().get("goods:001");// get key === 看看库存数量够不够
int goodsNumber = result == null ? 0 : Integer.parseInt(result);
if (goodsNumber > 0) {
int realNumber = goodsNumber - 1;
stringRedisTemplate.opsForValue().set("goods:001", String.valueOf(realNumber));
System.out.println("成功买到商品,库存还剩下:" + realNumber + " 件" + "\t 服务提供端口" + serverPort);
return "成功买到商品,库存还剩下:" + realNumber + " 件" + "\t 服务提供端口" + serverPort;
} else {
System.out.println("商品已经售完/活动结束/调用超时" + "\t 服务提供端口" + serverPort);
return "商品已经售完/活动结束/调用超时" + "\t 服务提供端口" + serverPort;
}
} finally {
stringRedisTemplate.delete(REDIS_LOCK); // 解锁
}
}
}
3.宕机了/锁超时
问题:
部署了微服务jar包的机器挂了,代码层面根本没有走到finally这块, 没办法保证解锁,这个key没有被删除,需要加入一个过期时间限定key。
解决方案:
需要对lockKey有过期时间的设定
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(REDIS_LOCK, value);// 相当于setNX
stringRedisTemplate.expire(REDIS_LOCK,10L,TimeUnit.SECONDS); // 设置过期时间 防止宕机等原因造成的死锁
4.没有考虑原子性
问题:
设置key+过期时间分开了,必须要合并成一行具备原子性。
解决方案:
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(REDIS_LOCK, value,10L,TimeUnit.SECONDS);
5.可能会删除了别人的锁
问题:张冠李戴,删除了别人的锁
解决方案:
只能自己删除自己的,不许动别人的。
加一层判断:
@RestController
public class GoodController {
public static final String REDIS_LOCK = "silince";
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Value("${server.port}")
private String serverPort;
private ReentrantLock lock = new ReentrantLock();
@GetMapping("/buy_goods")
public String buy_Goods() {
String value = UUID.randomUUID()+Thread.currentThread().getName(); // 唯一固定标识
try {
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(REDIS_LOCK, value,10L,TimeUnit.SECONDS);
if (!flag){
System.out.println("抢锁失败");
return "抢锁失败";
}
String result = stringRedisTemplate.opsForValue().get("goods:001");// get key === 看看库存数量够不够
int goodsNumber = result == null ? 0 : Integer.parseInt(result);
if (goodsNumber > 0) {
int realNumber = goodsNumber - 1;
stringRedisTemplate.opsForValue().set("goods:001", String.valueOf(realNumber));
System.out.println("成功买到商品,库存还剩下:" + realNumber + " 件" + "\t 服务提供端口" + serverPort);
return "成功买到商品,库存还剩下:" + realNumber + " 件" + "\t 服务提供端口" + serverPort;
} else {
System.out.println("商品已经售完/活动结束/调用超时" + "\t 服务提供端口" + serverPort);
return "商品已经售完/活动结束/调用超时" + "\t 服务提供端口" + serverPort;
}
} finally {
// ⚠️ 判断删除的是否是自己的锁
if (stringRedisTemplate.opsForValue().get(REDIS_LOCK).equalsIgnoreCase(value)){
stringRedisTemplate.delete(REDIS_LOCK);
}
}
}
}
6.finally块的判断+del删除操作不是原子性的
问题:
finally块的判断+del删除操作不是原子性的。
// 判断加锁于解锁是不是同一个客户端
if (stringRedisTemplate.opsForValue().get(REDIS_LOCK).equalsIgnoreCase(value)){
// 若在此时,这把锁突然不是这个客户端的,则会误解锁(判断完后,刚好时间到了导致锁自动被释放,别的线程抢占了该锁)
stringRedisTemplate.delete(REDIS_LOCK);
}
解决方式:
1)方式一:用redis自身的事务
事务介绍:
- Redis的事务是通过MULTI, EXEC, DISCARD和WATCH这四个命令来完成。
- Redis的单个命令都是原子性的,所以这里确保事务性的对象是命令集合。
- Redis将命令集合序列化并确保处于一事务的命令集合连续且不被打断的执行。
- Redis不支持回滚的操作。
相关命令:
- MULTI
- 用于标记事务块的开始,Redis会将后续的命令逐个放入队列中,然后使用EXEC命令原子化地执行这个命令序列。
- EXEC
- 在一个事务中执行所有先前放入队列的命令,然后恢复正常的连接状态。
- DISCARD
- 清楚所有先前在一个事务中放入队列的命令,然后恢复正常的连接状态。
- WATCH
- 当某个事务需要按条件执行时,就要使用这个命令将给定的键设置为受监控的状态。
- 语法:WATCH key [key ….]
- 该命令可以实现 redis 的乐观锁
- UNWATCH
- 清除所有先前为一个事务监控的键
finally {
while (true){
stringRedisTemplate.watch(REDIS_LOCK);
if (stringRedisTemplate.opsForValue().get(REDIS_LOCK).equalsIgnoreCase(value)){
stringRedisTemplate.setEnableTransactionSupport(true); // 开启事务支持
stringRedisTemplate.multi();
stringRedisTemplate.delete(REDIS_LOCK);
List<Object> list = stringRedisTemplate.exec();
if (list != null){ // list==null 表示删除成功;否则继续尝试删除
continue;
}
}
stringRedisTemplate.unwatch();
break;
}
2)方式二:用Lua脚本 推荐
@RestController
public class GoodController {
public static final String REDIS_LOCK = "silince";
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Value("${server.port}")
private String serverPort;
private ReentrantLock lock = new ReentrantLock();
@GetMapping("/buy_goods")
public String buy_Goods() throws Exception {
String value = UUID.randomUUID()+Thread.currentThread().getName(); // 唯一固定标识
try {
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(REDIS_LOCK, value,10L,TimeUnit.SECONDS);
if (!flag){
System.out.println("抢锁失败");
return "抢锁失败";
}
String result = stringRedisTemplate.opsForValue().get("goods:001");// get key === 看看库存数量够不够
int goodsNumber = result == null ? 0 : Integer.parseInt(result);
if (goodsNumber > 0) {
int realNumber = goodsNumber - 1;
stringRedisTemplate.opsForValue().set("goods:001", String.valueOf(realNumber));
System.out.println("成功买到商品,库存还剩下:" + realNumber + " 件" + "\t 服务提供端口" + serverPort);
return "成功买到商品,库存还剩下:" + realNumber + " 件" + "\t 服务提供端口" + serverPort;
} else {
System.out.println("商品已经售完/活动结束/调用超时" + "\t 服务提供端口" + serverPort);
return "商品已经售完/活动结束/调用超时" + "\t 服务提供端口" + serverPort;
}
} finally {
Jedis jedis = RedisUtils.getJedis();
String script = "if redis.call(\"get\",KEYS[1]) == ARGV[1]\n" +
"then\n" +
" return redis.call(\"del\",KEYS[1])\n" +
"else\n" +
" return 0\n" +
"end";
try{
// 成功返回1,失败返回0。
Object o = jedis.eval(script, Collections.singletonList(REDIS_LOCK), Collections.singletonList(value));
if ("1".equals(o.toString())){
System.out.println("-----del redis lock ok");
}else {
System.out.println("-----del redis lock error");
}
}finally {
if (jedis!=null){
jedis.close();
}
}
}
}
}
- 工具类 RedisUtils.java
public class RedisUtils {
private static JedisPool jedisPool;
static {
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
jedisPoolConfig.setMaxTotal(20);
jedisPoolConfig.setMaxIdle(10);
jedisPool=new JedisPool(jedisPoolConfig,"127.0.0.1",6379);
}
public static Jedis getJedis() throws Exception{
if (jedisPool!=null){
return jedisPool.getResource();
}
throw new Exception("Jedispoll is not ok");
}
}
7.确保redisLock过期时间大于业务执行时间的问题
问题:
-
Redis 分布式锁如何续期?
-
集群存在的问题+CAP,对比zookeeper:
- Redis 侧重 AP
- redis异步复制造成的锁丢失, 比如:主节点没来的及把刚刚set进来这条数据给从节点,就挂了。
- 此时如果集群模式下,就得上Redisson来解决
- Zookeeper 侧重 CP(保证一致性)
8.终极版 Redisson
redis集群环境下,我们自己写的也不OK, 直接上RedLock之Redisson落地实现(实现了可重入)。
- RedisConfig.java
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate<String, Serializable> redisTemplate(LettuceConnectionFactory connectionFactory){
RedisTemplate<String, Serializable> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(connectionFactory);
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
return redisTemplate;
}
@Bean
public Redisson redisson(){
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379").setDatabase(0);
return (Redisson) Redisson.create(config);
}
}
- GoodController.java
@RestController
public class GoodController {
public static final String REDIS_LOCK = "silince";
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Value("${server.port}")
private String serverPort;
@Autowired
private Redisson redisson;
@GetMapping("/buy_goods")
public String buy_Goods() throws Exception {
String value = UUID.randomUUID() + Thread.currentThread().getName(); // 唯一固定标识
RLock redissonLock = redisson.getLock(REDIS_LOCK);
redissonLock.lock();
try {
String result = stringRedisTemplate.opsForValue().get("goods:001");// get key === 看看库存数量够不够
int goodsNumber = result == null ? 0 : Integer.parseInt(result);
if (goodsNumber > 0) {
int realNumber = goodsNumber - 1;
stringRedisTemplate.opsForValue().set("goods:001", String.valueOf(realNumber));
System.out.println("成功买到商品,库存还剩下:" + realNumber + " 件" + "\t 服务提供端口" + serverPort);
return "成功买到商品,库存还剩下:" + realNumber + " 件" + "\t 服务提供端口" + serverPort;
} else {
System.out.println("商品已经售完/活动结束/调用超时" + "\t 服务提供端口" + serverPort);
return "商品已经售完/活动结束/调用超时" + "\t 服务提供端口" + serverPort;
}
} finally {
redissonLock.unlock();
}
}
}
9.Redis9.0版bug及完善到9.1
出现这个错误的原因:是在并发多的时候就可能会遇到这种错误,可能会被重新抢占
不见得当前这个锁的状态还是在锁定,并且本线程持有,最好再加一层判断:
finally {
if (redissonLock.isLocked() && redissonLock.isHeldByCurrentThread()){
redissonLock.unlock();
}
}
延伸的讨论:GC 可能引发的安全问题
Martin Kleppmann 曾与 Redis 之父 Antirez 就 Redis 实现分布式锁的安全性问题进行过深入的讨论,其中有一个问题就涉及到 GC。
熟悉 Java 的同学肯定对 GC 不陌生,在 GC 的时候会发生 STW(Stop-The-World),这本身是为了保障垃圾回收器的正常执行,但可能会引发如下的问题:
服务 A 获取了锁并设置了超时时间,但是服务 A 出现了 STW 且时间较长,导致了分布式锁进行了超时释放,在这个期间服务 B 获取到了锁,待服务 A STW 结束之后又恢复了锁,这就导致了 服务 A 和服务 B 同时获取到了锁,这个时候分布式锁就不安全了。
不仅仅局限于 Redis,Zookeeper 和 MySQL 有同样的问题。
总结
- synchronized 单机版ok,上分布式
- nginx分布式微服务 单机锁不行
- 取消单机锁 上redis分布式锁setnx
- 只加了锁,没有释放锁, 出异常的话,可能无法释放锁, 必须要在代码层面finally释放锁
- 宕机了,部署了微服务代码层面根本没有走到finally这块,没办法保证解锁,这个key没有被删除,需要有lockKey的过期时间设定
- 为redis的分布式锁key,增加过期时间,此外,还必须要setnx+过期时间必须同一行的原子性操作
- 必须规定只能自己删除自己的锁,你不能把别人的锁删除了,防止张冠李戴,1删2,2删3
- lua或者事务
- redis集群环境下,我们自己写的也不OK。直接上RedLock之Redisson落地实现
Redisson实现分布式锁原理
redission源码分析
加锁机制
线程去获取锁,获取成功: 执行lua脚本,保存数据到redis数据库。
线程去获取锁,获取失败: 一直通过while循环尝试获取锁,获取成功后,执行lua脚本,保存数据到redis数据库。
watch dog自动延期机制
在一个分布式环境下,假如一个线程获得锁后,突然服务器宕机了,那么这个时候在一定时间后这个锁会自动释放,你也可以设置锁的有效时间(不设置默认30秒),这样的目的主要是防止死锁的发生。
但在实际开发中会有下面一种情况:
- 线程1 进来获得锁后,线程一切正常并没有宕机,但它的业务逻辑需要执行2秒,这就会有个问题,在 线程1 执行1秒后,这个锁就自动过期了,那么这个时候 线程2 进来了。那么就存在 线程1和线程2 同时在这段业务逻辑里执行代码,这当然是不合理的。而且如果是这种情况,那么在解锁时系统会抛异常,因为解锁和加锁已经不是同一线程了,具体后面代码演示。
//设置锁1秒过去
redissonLock.lock("redisson", 1);
// 业务逻辑需要咨询2秒
...
redissonLock.release("redisson");
所以这个时候看门狗就出现了,它的作用就是 线程1 业务还没有执行完,时间就过了,线程1 还想持有锁的话,就会启动一个watch dog后台线程,不断的延长锁key的生存时间。
⚠️ 注意 正常这个看门狗线程是不启动的,还有就是这个看门狗启动后对整体性能也会有一定影响,所以不建议开启看门狗。
为啥要用lua脚本呢
主要是如果你的业务逻辑复杂的话,通过封装在lua脚本中发送给redis,而且redis是单线程的,这样就保证这段复杂业务逻辑执行的原子性。
可重入加锁机制
Redisson可以实现可重入加锁机制的原因,主要跟两点有关:
-
Redis存储锁的数据类型是 Hash类型
-
Hash数据类型的key值包含了当前线程信息
-
下面是redis存储的数据
这里表明数据类型是Hash类型,Hash类型相当于我们java的 <key,<key1,value» 类型,这里key是指 redisson
它的有效期还有9秒,我们再来看里们的key1值为078e44a3-5f95-4e24-b6aa-80684655a15a:45
它的组成是:guid(全局统一标识符) + 当前线程的ID。后面的value是就和可重入加锁有关。
上面这图的意思就是可重入锁的机制,它最大的优点就是相同线程不需要在等待锁,而是可以直接进行相应操作。
Redis分布式锁的缺点
在Redis哨兵模式下:
-
客户端1 对某个master节点写入了redisson锁,此时会异步复制给对应的 slave节点。但是这个过程中一旦发生 master节点宕机,主备切换,slave节点从变为了 master节点。
-
这时客户端2 来尝试加锁的时候,在新的master节点上也能加锁,此时就会导致多个客户端对同一个分布式锁完成了加锁。这时系统在业务语义上一定会出现问题,导致各种脏数据的产生。
-
缺陷在哨兵模式或者主从模式下,如果 master实例宕机的时候,可能导致多个客户端同时完成加锁。
redission源码
redisson的锁已经实现了可重入,下面是简单的使用
public static final String REDIS_LOCK = "silince";
RLock redissonLock = redisson.getLock(REDIS_LOCK); // 获取一个锁的实例
redissonLock.lock();
try {
// 业务逻辑
} finally {
if (redissonLock.isLocked() && redissonLock.isHeldByCurrentThread()){
redissonLock.unlock();
}
}
总体流程图:
getLock(String name)
public RLock getLock(String name) {
return new RedissonLock(connectionManager.getCommandExecutor(), name);
}
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
//命令执行器
this.commandExecutor = commandExecutor;
//UUID字符串
this.id = commandExecutor.getConnectionManager().getId();
//内部锁过期时间
this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
this.entryName = id + ":" + name;
}
加锁过程
尝试加锁,拿到当前线程,还有ttl。
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
// 当前线程ID
long threadId = Thread.currentThread().getId();
// 尝试获取锁
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
// 如果ttl为空,则证明获取锁成功
if (ttl == null) {
return;
}
// 如果获取锁失败,则订阅到对应这个锁的channel
RFuture<RedissonLockEntry> future = subscribe(threadId);
if (interruptibly) {
commandExecutor.syncSubscriptionInterrupted(future);
} else {
commandExecutor.syncSubscription(future);
}
try {
while (true) {
// 再次尝试获取锁
ttl = tryAcquire(-1, leaseTime, unit, threadId);
// ttl为空,说明成功获取锁,返回
if (ttl == null) {
break;
}
// ttl大于0 则等待ttl时间后继续尝试获取
if (ttl >= 0) {
try {
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (interruptibly) {
throw e;
}
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else {
if (interruptibly) {
future.getNow().getLatch().acquire();
} else {
future.getNow().getLatch().acquireUninterruptibly();
}
}
}
} finally {
// 取消对channel的订阅
unsubscribe(future, threadId);
}
// get(lockAsync(leaseTime, unit));
}
尝试获取锁
获取锁的时候,也比较简单,你可以看到,他也是不断刷新过期时间,跟上面那个不断去拿当前时间,校验过期是一个道理,只是我比较粗糙。
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}
// tryAcquireAsync()
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
// 如果带有过期时间,则按照普通方式获取锁
if (leaseTime != -1) {
return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
// 先按照30s的过期时间来执行获取锁的方法
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(
waitTime,commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
TimeUnit.MILLISECONDS, threadId,RedisCommands.EVAL_LONG);
// 如果还持有这个锁,则开启定时任务不断刷新该锁的过期时间
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
// lock acquired
if (ttlRemaining == null) {
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}
底层加锁逻辑
你可能会想这么多操作,在一起不是原子性不还是有问题么?
大佬们肯定想得到呀,所以还是LUA,他使用了Hash的数据结构。
主要是判断锁是否存在,存在就设置过期时间,如果锁已经存在了,那对比一下线程,线程是一个那就证明可以重入,锁在了,但是不是当前线程,证明别人还没释放,那就把剩余时间返回,加锁失败。
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId,
RedisStrictCommand<T> command) {
// 过期时间
internalLockLeaseTime = unit.toMillis(leaseTime);
return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
// 如果锁不存在,则通过hset设置它的值,并设置过期时间
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// 如果锁已存在,并且锁的是当前线程,则通过hincrby给数值递增1
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// 如果锁已存在,但并非本线程,则返回过期时间ttl
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
解锁
锁的释放主要是publish释放锁的信息,然后做校验,一样会判断是否当前线程,成功就释放锁,还有个hincrby递减的操作,锁的值大于0说明是可重入锁,那就刷新过期时间。
如果值小于0了,那删掉Key释放锁。
是不是又和AQS很像了?AQS就是通过一个volatile修饰status去看锁的状态,也会看数值判断是否是可重入的。
public RFuture<Void> unlockAsync(long threadId) {
RPromise<Void> result = new RedissonPromise<Void>();
// 解锁方法
RFuture<Boolean> future = unlockInnerAsync(threadId);
future.onComplete((opStatus, e) -> {
cancelExpirationRenewal(threadId);
if (e != null) {
result.tryFailure(e);
return;
}
// 如果返回空,则证明解锁的线程和当前锁不是同一个线程,抛出异常
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "+ id + " thread-id: " + threadId);
result.tryFailure(cause);
return;
}
result.trySuccess(null);
});
return result;
}
// unlockInnerAsync()
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// 如果释放锁的线程和已存在锁的线程不是同一个线程,返回null
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
// 通过hincrby递减1的方式,释放一次锁
// 若剩余次数大于0 ,则刷新过期时间
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
// 否则证明锁已经释放,删除key并发布锁释放的消息
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;",
Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
既已览卷至此,何不品评一二: