Java实现Redis分布式锁

背景

在实际的业务中遇到用户账户充值、提现等业务,需要防止并发来达到账户余额变动的正确性。

旧的实现是通过mysql悲观锁来完成的,在充值和提现等账户余额会变化的功能操作之前,先使用用户账户ID做for update,来锁定用户账户。但是这种实现方式把所有的压力都放在数据库上面,随着业务的不断发展,数据库压力也越来越大,因此我对分布式锁进行了改造,使用redis来实现。

具体实现

分布式锁主逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
package com.guozhe.core.service.impl;

import com.github.rholder.retry.*;
import com.google.common.base.Joiner;
import com.guozhe.core.exception.AssetCommonException;
import com.guozhe.core.utils.CommonPreconditions;
import com.guozhe.core.utils.CommonUtil;
import com.guozhe.core.utils.UUIDUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;

import java.util.Arrays;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

/**
* redis分布式锁
*
* @author guozhe
* @date 2019-09-11
*/
@Slf4j
public class RedisLock {

/**
* redis锁key的前缀
*/
static final String REDIS_LOCK_PREFIX = "CORE_LOCK";

/**
* redis默认的分割字符串
*/
static final String REDIS_DEFAULT_SPLIT = ":";

/**
* redis的ttl命令如果有key没有失效时间的返回结果
*/
private static final int TTL_NO_EXPIRE_RESPONSE = -1;

/**
* 加锁尝试retryer
*/
private final Retryer<Boolean> retryer;

/**
* redis锁的超时时长,单位秒
*/
private final int redisLockExpireSeconds;

private final RedisTemplate<String, String> redisTemplate;

RedisLock(RedisTemplate<String, String> redisTemplate, int redisLockExpireSeconds) {
this.redisTemplate = redisTemplate;
this.redisLockExpireSeconds = redisLockExpireSeconds;
this.retryer = getRetryer((this.redisLockExpireSeconds * 1000) / 100, 100L);
}

/**
* 通用分布式redis锁
* 默认每隔200毫秒尝试一次枷锁,一共尝试10次
*
* @param beforeLock 可以为null,加锁之前处理的逻辑,如果有返回值则直接返回,如果得到的是null则继续执行业务逻辑
* @param beforeBizCalled 可以为null,业务处理之前处理的逻辑,如果有返回值则直接返回,如果得到的是null则继续执行业务逻辑
* @param bizCallable 业务处理逻辑
* @param businessName 当前业务名称
* @param key redis加锁的key
* @return 业务处理结果
* @throws AssetCommonException 有两种情况会抛此异常:
* 0、如果一直获取不到锁会抛此异常
* 1、如果业务处理包括beforeLock、beforeBizCalled、bizCallable时有异常,则直接转成此异常抛出
*/
public <T> T lockAndCall(Callable<T> beforeLock, Callable<T> beforeBizCalled, Callable<T> bizCallable, String businessName, String key) {
if (CommonUtil.isNotNull(beforeLock)) {
T result = call(beforeLock);
if (CommonUtil.isNotNull(result)) {
return result;
}
}
final String uuid = UUIDUtils.getUUID();
// 此方法会阻塞线程往下运行,除非已经获得锁才会继续执行,否则一直尝试获取锁或者最终获取锁失败抛异常
retryLock(businessName, key, uuid);
return lockSuccessDoBiz(beforeBizCalled, bizCallable, businessName, key, uuid);
}

/**
* 通用分布式redis锁,只尝试加锁一次,
* 需要自定义Retryer实例
*
* @param beforeLock 可以为null,加锁之前处理的逻辑,如果有返回值则直接返回,如果得到的是null则继续执行业务逻辑
* @param beforeBizCalled 可以为null,业务处理之前处理的逻辑,如果有返回值则直接返回,如果得到的是null则继续执行业务逻辑
* @param bizCallable 业务处理逻辑
* @param businessName 当前业务名称
* @param key redis加锁的key
* @return 业务处理结果
* @throws AssetCommonException 有两种情况会抛此异常:
* 0、如果一直获取不到锁会抛此异常
* 1、如果业务处理包括beforeLock、beforeBizCalled、bizCallable时有异常,则直接转成此异常抛出
*/
<T> T lockAndCallNoRetry(Callable<T> beforeLock, Callable<T> beforeBizCalled, Callable<T> bizCallable, String businessName, String key) {
if (CommonUtil.isNotNull(beforeLock)) {
T result = call(beforeLock);
if (CommonUtil.isNotNull(result)) {
return result;
}
}
final String uuid = UUIDUtils.getUUID();
// 此方法不会阻塞线程,如果加锁成功则处理业务,否则直接返回null
if (tryLock(businessName, key, uuid)) {
return lockSuccessDoBiz(beforeBizCalled, bizCallable, businessName, key, uuid);
}
return null;
}

/**
* 加锁成功之后的业务处理
*
* @param beforeBizCalled 可以为null,业务处理之前处理的逻辑,如果有返回值则直接返回,如果得到的是null则继续执行业务逻辑
* @param bizCallable 业务处理逻辑
* @param businessName 当前业务名称
* @param key redis加锁的key
* @param value redis锁的value值,为了解锁
* @param <T> 自定义返回值
* @return 业务处理结果
*/
private <T> T lockSuccessDoBiz(Callable<T> beforeBizCalled, Callable<T> bizCallable, String businessName, String key, String value) {
try {
if (CommonUtil.isNotNull(beforeBizCalled)) {
T call = beforeBizCalled.call();
if (CommonUtil.isNotNull(call)) {
return call;
}
}
CommonPreconditions.checkArgument(CommonUtil.isNotNull(bizCallable), "businessName=%s,key=%s业务处理逻辑为null", businessName, key);
return bizCallable.call();
} catch (Exception e) {
throw new AssetCommonException(e);
} finally {
log.info("{}业务处理完成,解锁key={},value={}", businessName, key, value);
unlocked(key, value);
}
}

/**
* 执行,如果成功则返回结果;如果有异常则抛出AssetCommonException
*
* @param callable callable
* @param <T> 自定义返回值
* @return 业务处理结果
*/
private <T> T call(Callable<T> callable) {
try {
return callable.call();
} catch (Exception e) {
throw new AssetCommonException(e);
}
}

/**
* 获取加锁的redis的key
*
* @param key 需要加锁的原始key
* @return 最终处理后的key字节数组
*/
static byte[] getRedisKey(String key) {
return getRedisKeyString(key).getBytes();
}

/**
* 获取加锁的redis的key字符串,使用:把传入的字符串拼接起来
*
* @param first 第一个字符串
* @param rest 其他的字符串
* @return 最终处理后的key字节数组
*/
static String getRedisKeyString(String first, String... rest) {
return Joiner.on(REDIS_DEFAULT_SPLIT).join(REDIS_LOCK_PREFIX, first, rest);
}

/**
* 解锁,删除redis中的key如果value的值和此实例的值一样的话
*
* @param key redis锁的key
* @param value 锁的值
*/
private void unlocked(String key, String value) {
final byte[] redisKey = getRedisKey(key);
redisTemplate.execute((RedisCallback<Boolean>) connection -> {
byte[] bytes = connection.get(redisKey);
if (Arrays.equals(value.getBytes(), bytes)) {
connection.del(redisKey);
}
return null;
});
}

/**
* 重试的多次尝试加锁,如果获取到锁则继续往下执行,否则会阻断线程直到获得锁或者抛异常
*
* @param businessName 业务名称
* @param key redis锁的key
* @param value 锁的值
* @throws AssetCommonException 如果加锁失败或者重复次数达到最大的尝试次数,则抛此异常
*/
private void retryLock(String businessName, String key, String value) {
try {
retryer.call(() -> tryLock(businessName, key, value));
} catch (RetryException e) {
// 重试失败说明没有获取到锁,所以直接抛异常不再往下执行
throw new AssetCommonException(String.format("业务%s,key=%s正在处理中,请稍后重试", businessName, key));
} catch (ExecutionException e) {
// 执行加锁时失败,有些未知原因,如redis连不上之类的,此时为了不影响业务接着往下执行
log.error("businessName={},key={}执行redis加锁异常 errorMsg={}", businessName, key, e.getMessage(), e);
}
}

/**
* 单次尝试加锁
*
* @param businessName 业务名称
* @param key redis锁的key
* @param value 锁的值
* @return 如果加锁成功返回true,否则返回false
*/
private boolean tryLock(String businessName, String key, String value) {
return tryLock(businessName, key, value, this.redisTemplate, this.redisLockExpireSeconds);
}

/**
* 单次尝试加锁
*
* @param businessName 业务名称
* @param key redis锁的key
* @param value 锁的值
* @return 如果加锁成功返回true,否则返回false
*/
private static boolean tryLock(String businessName, String key, String value, RedisTemplate<String, String> redisTemplate, long redisLockExpireSeconds) {
final byte[] redisKey = getRedisKey(key);
return redisTemplate.execute((RedisCallback<Boolean>) connection -> {
// 加锁,只有在key不存在的情况下才能加锁成功
boolean result = connection.setNX(redisKey, value.getBytes());
if (result) {
log.info("businessName={} key={} value={},设置锁的失效时间={}s", businessName, key, value, redisLockExpireSeconds);
// 如果加锁成功设置锁的超时时间
connection.expire(redisKey, redisLockExpireSeconds);
} else {
/*
如果没有加锁成功,检查这个key是否有超时时间,如果没有超时时间则设置超时时间
ttl的官方文档如下:
Returns the remaining time to live of a key that has a timeout.
In Redis 2.6 or older the command returns -1 if the key does not exist or if the key exist but has no associated expire.
Starting with Redis 2.8 the return value in case of error changed:
The command returns -2 if the key does not exist.
The command returns -1 if the key exists but has no associated expire.
测试环境redis_version:4.0.1 生产环境redis_version:4.0.13 @2019-09-20
*/
if (TTL_NO_EXPIRE_RESPONSE == connection.ttl(redisKey)) {
connection.expire(redisKey, redisLockExpireSeconds);
}
}
return result;
});
}

/**
* 有些业务不需要一直等待重试,如果第一次获取锁不成功则马上不处理即可;
* 所以本方法即是
*
* @param retryTimes 尝试枷锁次数 必须大于0
* @param fixedWaitTime 固定等待时长,单位MILLISECONDS
* @return Retryer实例
*/
public Retryer<Boolean> getRetryer(int retryTimes, long fixedWaitTime) {
/*
* 加锁尝试retryer
* 最多尝试10次,每次不成功等待100ms,所以最多等待一秒钟如果获取不到锁就不再尝试
* 2019-09-11查询生产环境日志,放款接口处理时间几乎没有超过500ms,所以时间是够用的
*/
return RetryerBuilder.<Boolean>newBuilder()
// 每次尝试加锁失败后等待100ms
.withWaitStrategy(WaitStrategies.fixedWait(fixedWaitTime, TimeUnit.MILLISECONDS))
// 最多尝试10次
.withStopStrategy(StopStrategies.stopAfterAttempt(retryTimes))
// 如果返回false则继续重试
.retryIfResult(aBoolean -> aBoolean == null || !aBoolean).build();
}

}

构建分布式锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package com.guozhe.core.service.impl;

import com.guozhe.core.utils.CommonPreconditions;
import com.guozhe.core.utils.CommonUtil;
import org.springframework.data.redis.core.RedisTemplate;

/**
* 通用的redis分布式锁构建器
* 必须设置RedisTemplate
*
* @author guozhe
*/
public class RedisLockBuilder {

private RedisTemplate<String, String> redisTemplate;

/**
* redis锁的超时时长,单位秒;默认1s,客户端可以根据自身业务自己设置
*/
private int redisLockExpireSeconds = 1;


private RedisLockBuilder() {
}

public static RedisLockBuilder newBuilder() {
return new RedisLockBuilder();
}

public RedisLockBuilder redisTemplate(RedisTemplate<String, String> redisTemplate) {
this.redisTemplate = redisTemplate;
return this;
}

/**
* 设置加锁时长,默认1s,客户端可以根据自身业务自己设置
*
* @param seconds 超时时间,单位秒
* @return 当前RedisLockBuilder
*/
public RedisLockBuilder redisLockExpireSeconds(int seconds) {
this.redisLockExpireSeconds = seconds;
return this;
}

public RedisLock build() {
CommonPreconditions.checkArgument(CommonUtil.isNotNull(redisTemplate), "构建redis分布式锁时,redisTemplate不允许为null");
CommonPreconditions.checkArgument(redisLockExpireSeconds > 0, "构建redis分布式锁时,超时时间必须大于0seconds=%s", this.redisLockExpireSeconds);
return new RedisLock(redisTemplate, redisLockExpireSeconds);
}

}

如何使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package com.guozhe.core.service.impl;

import com.google.common.collect.Sets;
import com.guozhe.core.manager.CoreCommonApiNotifyRecordManagerService;
import com.guozhe.core.model.CoreCommonApiNotifyRecord;
import com.guozhe.core.remote.BaseRemoteApiNotifyService;
import com.guozhe.core.service.CommonApiNotifyService;
import com.guozhe.core.service.ServiceLocator;
import com.guozhe.core.utils.CommonPreconditions;
import com.guozhe.core.utils.CommonUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;

import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
* @author guozhe
* @date 2019-12-06
*/
@Slf4j
@Service
public class CommonApiNotifyServiceImpl implements CommonApiNotifyService {

/**
* redis加锁时key标示的业务名称
*/
static final String REDIS_LOCK_BUSINESS_NAME = "COMMON_API_NOTIFY";

/**
* redis加锁时的超时时长,为了防止业务方的接口处理时间过长导致并发,此处设置的时间比较长
*/
private static final int REDIS_LOCK_EXPIRE_SECONDS = 20;

/**
* redis分布式锁
*/
private final RedisLock redisLockService;
private final CoreCommonApiNotifyRecordManagerService coreCommonApiNotifyRecordManagerService;
private final RedisTemplate<String, String> redisTemplate;

public CommonApiNotifyServiceImpl(RedisTemplate<String, String> redisTemplate, CoreCommonApiNotifyRecordManagerService coreCommonApiNotifyRecordManagerService) {
this.coreCommonApiNotifyRecordManagerService = coreCommonApiNotifyRecordManagerService;
this.redisTemplate = redisTemplate;
this.redisLockService = RedisLockBuilder.newBuilder().redisTemplate(redisTemplate).redisLockExpireSeconds(REDIS_LOCK_EXPIRE_SECONDS).build();
}


@Override
public void notifyWithLock(CoreCommonApiNotifyRecord record) {
BaseRemoteApiNotifyService remoteApiNotifyService = getBaseRemoteApiNotifyService(record);
// 此处是为了防止并发,如果本次通知成功或已经通知成功result是true;如果本次通知失败则返回false;如果正在处理则返回null
// 此处不只是为了防并发,此处也有如果已经通知过了就不再通知的逻辑,因此在加锁之前和加锁之后业务处理之前都做检查
Boolean result = redisLockService.lockAndCallNoRetry(
() -> checkIfNotified(record),
() -> checkIfNotified(record),
() -> remoteApiNotifyService.bizNotify(record),
REDIS_LOCK_BUSINESS_NAME,
String.valueOf(record.getId()));
if (CommonUtil.isNull(result)) {
log.info("API通知recordId={}, coreLendRequestId={}, businessType={} 正在通知无需重复", record.getId(), record.getCoreLendRequestId(), record.getBusinessType());
}
executeResult(record, result);
}
}