分布式锁实现:保障分布式系统数据安全的关键技术

引言

在分布式系统中,多个节点可能同时访问共享资源,如果没有适当的同步机制,就会出现数据不一致、重复处理等问题。分布式锁作为分布式系统中实现互斥访问的重要手段,能够确保在任意时刻只有一个节点能够访问特定的共享资源。本文将深入探讨分布式锁的实现原理、主流技术方案以及在实际项目中的应用实践。

分布式锁基础理论

分布式锁的核心特性

互斥性(Mutual Exclusion):在任意时刻,只能有一个客户端持有锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// 分布式锁接口定义
public interface DistributedLock {

/**
* 尝试获取锁
* @param lockKey 锁的唯一标识
* @param lockValue 锁的值(用于释放时验证)
* @param expireTime 锁的过期时间
* @param timeUnit 时间单位
* @return 是否成功获取锁
*/
boolean tryLock(String lockKey, String lockValue, long expireTime, TimeUnit timeUnit);

/**
* 释放锁
* @param lockKey 锁的唯一标识
* @param lockValue 锁的值
* @return 是否成功释放锁
*/
boolean unlock(String lockKey, String lockValue);

/**
* 带超时的获取锁
* @param lockKey 锁的唯一标识
* @param lockValue 锁的值
* @param expireTime 锁的过期时间
* @param waitTime 等待时间
* @param timeUnit 时间单位
* @return 是否成功获取锁
*/
boolean tryLock(String lockKey, String lockValue, long expireTime, long waitTime, TimeUnit timeUnit);

/**
* 检查锁是否存在
* @param lockKey 锁的唯一标识
* @return 锁是否存在
*/
boolean isLocked(String lockKey);

/**
* 续期锁
* @param lockKey 锁的唯一标识
* @param lockValue 锁的值
* @param expireTime 新的过期时间
* @param timeUnit 时间单位
* @return 是否成功续期
*/
boolean renewLock(String lockKey, String lockValue, long expireTime, TimeUnit timeUnit);
}

可重入性(Reentrant):同一个客户端可以多次获取同一把锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
// 可重入分布式锁实现
@Component
public class ReentrantDistributedLock implements DistributedLock {

@Autowired
private RedisTemplate<String, Object> redisTemplate;

private static final String LOCK_PREFIX = "reentrant_lock:";
private static final String COUNT_PREFIX = "lock_count:";

// 线程本地存储,记录当前线程持有的锁
private final ThreadLocal<Map<String, Integer>> threadLocks =
ThreadLocal.withInitial(HashMap::new);

@Override
public boolean tryLock(String lockKey, String lockValue, long expireTime, TimeUnit timeUnit) {
String fullLockKey = LOCK_PREFIX + lockKey;
String countKey = COUNT_PREFIX + lockKey;

// 检查当前线程是否已经持有该锁
Map<String, Integer> locks = threadLocks.get();
Integer currentCount = locks.get(lockKey);

if (currentCount != null && currentCount > 0) {
// 可重入,增加计数
String script =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" local count = redis.call('incr', KEYS[2]) " +
" redis.call('expire', KEYS[1], ARGV[2]) " +
" redis.call('expire', KEYS[2], ARGV[2]) " +
" return count " +
"else " +
" return 0 " +
"end";

DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptText(script);
redisScript.setResultType(Long.class);

Long result = redisTemplate.execute(redisScript,
Arrays.asList(fullLockKey, countKey),
lockValue, String.valueOf(timeUnit.toSeconds(expireTime)));

if (result != null && result > 0) {
locks.put(lockKey, currentCount + 1);
log.debug("Reentrant lock acquired: {} (count: {})", lockKey, result);
return true;
}
} else {
// 首次获取锁
String script =
"if redis.call('setnx', KEYS[1], ARGV[1]) == 1 then " +
" redis.call('expire', KEYS[1], ARGV[2]) " +
" redis.call('set', KEYS[2], '1') " +
" redis.call('expire', KEYS[2], ARGV[2]) " +
" return 1 " +
"else " +
" return 0 " +
"end";

DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptText(script);
redisScript.setResultType(Long.class);

Long result = redisTemplate.execute(redisScript,
Arrays.asList(fullLockKey, countKey),
lockValue, String.valueOf(timeUnit.toSeconds(expireTime)));

if (result != null && result == 1L) {
locks.put(lockKey, 1);
log.debug("Lock acquired: {}", lockKey);
return true;
}
}

return false;
}

@Override
public boolean unlock(String lockKey, String lockValue) {
String fullLockKey = LOCK_PREFIX + lockKey;
String countKey = COUNT_PREFIX + lockKey;

Map<String, Integer> locks = threadLocks.get();
Integer currentCount = locks.get(lockKey);

if (currentCount == null || currentCount <= 0) {
log.warn("Attempting to unlock a lock not held by current thread: {}", lockKey);
return false;
}

if (currentCount > 1) {
// 减少重入计数
String script =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" local count = redis.call('decr', KEYS[2]) " +
" if count > 0 then " +
" return count " +
" else " +
" redis.call('del', KEYS[1]) " +
" redis.call('del', KEYS[2]) " +
" return 0 " +
" end " +
"else " +
" return -1 " +
"end";

DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptText(script);
redisScript.setResultType(Long.class);

Long result = redisTemplate.execute(redisScript,
Arrays.asList(fullLockKey, countKey),
lockValue);

if (result != null && result >= 0) {
locks.put(lockKey, currentCount - 1);
log.debug("Lock count decreased: {} (count: {})", lockKey, result);
return true;
}
} else {
// 完全释放锁
String script =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" redis.call('del', KEYS[1]) " +
" redis.call('del', KEYS[2]) " +
" return 1 " +
"else " +
" return 0 " +
"end";

DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptText(script);
redisScript.setResultType(Long.class);

Long result = redisTemplate.execute(redisScript,
Arrays.asList(fullLockKey, countKey),
lockValue);

if (result != null && result == 1L) {
locks.remove(lockKey);
log.debug("Lock released: {}", lockKey);
return true;
}
}

return false;
}

@Override
public boolean tryLock(String lockKey, String lockValue, long expireTime,
long waitTime, TimeUnit timeUnit) {
long startTime = System.currentTimeMillis();
long waitTimeMs = timeUnit.toMillis(waitTime);

while (System.currentTimeMillis() - startTime < waitTimeMs) {
if (tryLock(lockKey, lockValue, expireTime, timeUnit)) {
return true;
}

try {
// 随机退避,避免惊群效应
Thread.sleep(50 + (long)(Math.random() * 50));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}

return false;
}

@Override
public boolean isLocked(String lockKey) {
String fullLockKey = LOCK_PREFIX + lockKey;
return Boolean.TRUE.equals(redisTemplate.hasKey(fullLockKey));
}

@Override
public boolean renewLock(String lockKey, String lockValue, long expireTime, TimeUnit timeUnit) {
String fullLockKey = LOCK_PREFIX + lockKey;
String countKey = COUNT_PREFIX + lockKey;

String script =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" redis.call('expire', KEYS[1], ARGV[2]) " +
" redis.call('expire', KEYS[2], ARGV[2]) " +
" return 1 " +
"else " +
" return 0 " +
"end";

DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptText(script);
redisScript.setResultType(Long.class);

Long result = redisTemplate.execute(redisScript,
Arrays.asList(fullLockKey, countKey),
lockValue, String.valueOf(timeUnit.toSeconds(expireTime)));

return result != null && result == 1L;
}
}

容错性(Fault Tolerance):锁服务的部分节点故障不应该影响整个系统。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
// 高可用分布式锁实现
@Component
public class HighAvailabilityDistributedLock {

private final List<RedisTemplate<String, Object>> redisTemplates;
private final int quorum; // 法定人数

public HighAvailabilityDistributedLock(List<RedisTemplate<String, Object>> redisTemplates) {
this.redisTemplates = redisTemplates;
this.quorum = redisTemplates.size() / 2 + 1; // 超过半数
}

public boolean tryLock(String lockKey, String lockValue, long expireTime, TimeUnit timeUnit) {
List<CompletableFuture<Boolean>> futures = new ArrayList<>();

// 并行向所有Redis实例请求锁
for (RedisTemplate<String, Object> redisTemplate : redisTemplates) {
CompletableFuture<Boolean> future = CompletableFuture.supplyAsync(() -> {
try {
Boolean result = redisTemplate.opsForValue()
.setIfAbsent(lockKey, lockValue, expireTime, timeUnit);
return Boolean.TRUE.equals(result);
} catch (Exception e) {
log.warn("Failed to acquire lock from Redis instance", e);
return false;
}
});
futures.add(future);
}

// 等待所有请求完成或超时
int successCount = 0;
List<Boolean> results = new ArrayList<>();

try {
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0])
);

// 设置超时时间
allFutures.get(5, TimeUnit.SECONDS);

for (CompletableFuture<Boolean> future : futures) {
Boolean result = future.get();
results.add(result);
if (Boolean.TRUE.equals(result)) {
successCount++;
}
}

} catch (TimeoutException e) {
log.warn("Lock acquisition timeout");
// 处理已完成的请求
for (CompletableFuture<Boolean> future : futures) {
if (future.isDone()) {
try {
Boolean result = future.get();
results.add(result);
if (Boolean.TRUE.equals(result)) {
successCount++;
}
} catch (Exception ex) {
results.add(false);
}
} else {
results.add(false);
}
}
} catch (Exception e) {
log.error("Error during lock acquisition", e);
return false;
}

// 检查是否达到法定人数
if (successCount >= quorum) {
log.debug("Lock acquired successfully: {} ({}/{})",
lockKey, successCount, redisTemplates.size());
return true;
} else {
// 未达到法定人数,释放已获取的锁
releaseLocks(lockKey, lockValue, results);
log.debug("Lock acquisition failed: {} ({}/{})",
lockKey, successCount, redisTemplates.size());
return false;
}
}

public boolean unlock(String lockKey, String lockValue) {
List<CompletableFuture<Boolean>> futures = new ArrayList<>();

String script =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" return redis.call('del', KEYS[1]) " +
"else " +
" return 0 " +
"end";

DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptText(script);
redisScript.setResultType(Long.class);

// 并行释放所有Redis实例上的锁
for (RedisTemplate<String, Object> redisTemplate : redisTemplates) {
CompletableFuture<Boolean> future = CompletableFuture.supplyAsync(() -> {
try {
Long result = redisTemplate.execute(redisScript,
Collections.singletonList(lockKey), lockValue);
return result != null && result == 1L;
} catch (Exception e) {
log.warn("Failed to release lock from Redis instance", e);
return false;
}
});
futures.add(future);
}

// 等待所有释放操作完成
int successCount = 0;
try {
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.get(3, TimeUnit.SECONDS);

for (CompletableFuture<Boolean> future : futures) {
if (Boolean.TRUE.equals(future.get())) {
successCount++;
}
}
} catch (Exception e) {
log.warn("Error during lock release", e);
}

log.debug("Lock released: {} ({}/{})",
lockKey, successCount, redisTemplates.size());

return successCount >= quorum;
}

private void releaseLocks(String lockKey, String lockValue, List<Boolean> acquisitionResults) {
for (int i = 0; i < redisTemplates.size() && i < acquisitionResults.size(); i++) {
if (Boolean.TRUE.equals(acquisitionResults.get(i))) {
try {
RedisTemplate<String, Object> redisTemplate = redisTemplates.get(i);
redisTemplate.delete(lockKey);
} catch (Exception e) {
log.warn("Failed to release lock during cleanup", e);
}
}
}
}
}

Redis分布式锁实现

基于SET命令的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
// Redis分布式锁的完整实现
@Component
public class RedisDistributedLock implements DistributedLock {

@Autowired
private RedisTemplate<String, Object> redisTemplate;

private static final String LOCK_PREFIX = "distributed_lock:";
private static final String UNLOCK_SCRIPT =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" return redis.call('del', KEYS[1]) " +
"else " +
" return 0 " +
"end";

private static final String RENEW_SCRIPT =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" return redis.call('expire', KEYS[1], ARGV[2]) " +
"else " +
" return 0 " +
"end";

private final RedisScript<Long> unlockScript;
private final RedisScript<Long> renewScript;
private final ScheduledExecutorService renewalExecutor;
private final Map<String, ScheduledFuture<?>> renewalTasks;

public RedisDistributedLock() {
// 初始化Lua脚本
DefaultRedisScript<Long> unlockRedisScript = new DefaultRedisScript<>();
unlockRedisScript.setScriptText(UNLOCK_SCRIPT);
unlockRedisScript.setResultType(Long.class);
this.unlockScript = unlockRedisScript;

DefaultRedisScript<Long> renewRedisScript = new DefaultRedisScript<>();
renewRedisScript.setScriptText(RENEW_SCRIPT);
renewRedisScript.setResultType(Long.class);
this.renewScript = renewRedisScript;

// 初始化续期执行器
this.renewalExecutor = Executors.newScheduledThreadPool(5, r -> {
Thread thread = new Thread(r, "redis-lock-renewal");
thread.setDaemon(true);
return thread;
});

this.renewalTasks = new ConcurrentHashMap<>();
}

@Override
public boolean tryLock(String lockKey, String lockValue, long expireTime, TimeUnit timeUnit) {
String fullKey = LOCK_PREFIX + lockKey;

Boolean result = redisTemplate.opsForValue()
.setIfAbsent(fullKey, lockValue, expireTime, timeUnit);

if (Boolean.TRUE.equals(result)) {
log.debug("Lock acquired: {}", lockKey);

// 启动自动续期
startRenewal(lockKey, lockValue, expireTime, timeUnit);

return true;
} else {
log.debug("Lock acquisition failed: {}", lockKey);
return false;
}
}

@Override
public boolean tryLock(String lockKey, String lockValue, long expireTime,
long waitTime, TimeUnit timeUnit) {
long startTime = System.currentTimeMillis();
long waitTimeMs = timeUnit.toMillis(waitTime);

// 实现指数退避算法
long backoffTime = 10; // 初始退避时间10ms
final long maxBackoffTime = 1000; // 最大退避时间1s

while (System.currentTimeMillis() - startTime < waitTimeMs) {
if (tryLock(lockKey, lockValue, expireTime, timeUnit)) {
return true;
}

try {
// 指数退避 + 随机抖动
long jitter = (long) (Math.random() * backoffTime * 0.1);
Thread.sleep(backoffTime + jitter);

backoffTime = Math.min(backoffTime * 2, maxBackoffTime);

} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("Lock acquisition interrupted: {}", lockKey);
return false;
}
}

log.debug("Lock acquisition timeout: {}", lockKey);
return false;
}

@Override
public boolean unlock(String lockKey, String lockValue) {
String fullKey = LOCK_PREFIX + lockKey;

// 停止自动续期
stopRenewal(lockKey);

Long result = redisTemplate.execute(unlockScript,
Collections.singletonList(fullKey), lockValue);

if (result != null && result == 1L) {
log.debug("Lock released: {}", lockKey);
return true;
} else {
log.warn("Lock release failed: {}", lockKey);
return false;
}
}

@Override
public boolean isLocked(String lockKey) {
String fullKey = LOCK_PREFIX + lockKey;
return Boolean.TRUE.equals(redisTemplate.hasKey(fullKey));
}

@Override
public boolean renewLock(String lockKey, String lockValue, long expireTime, TimeUnit timeUnit) {
String fullKey = LOCK_PREFIX + lockKey;

Long result = redisTemplate.execute(renewScript,
Collections.singletonList(fullKey),
lockValue, String.valueOf(timeUnit.toSeconds(expireTime)));

return result != null && result == 1L;
}

private void startRenewal(String lockKey, String lockValue, long expireTime, TimeUnit timeUnit) {
// 在锁过期时间的1/3处开始续期
long renewalInterval = timeUnit.toMillis(expireTime) / 3;

ScheduledFuture<?> renewalTask = renewalExecutor.scheduleAtFixedRate(() -> {
try {
if (renewLock(lockKey, lockValue, expireTime, timeUnit)) {
log.debug("Lock renewed: {}", lockKey);
} else {
log.warn("Lock renewal failed: {}", lockKey);
stopRenewal(lockKey);
}
} catch (Exception e) {
log.error("Error during lock renewal: {}", lockKey, e);
stopRenewal(lockKey);
}
}, renewalInterval, renewalInterval, TimeUnit.MILLISECONDS);

renewalTasks.put(lockKey, renewalTask);
}

private void stopRenewal(String lockKey) {
ScheduledFuture<?> renewalTask = renewalTasks.remove(lockKey);
if (renewalTask != null) {
renewalTask.cancel(false);
log.debug("Lock renewal stopped: {}", lockKey);
}
}

@PreDestroy
public void shutdown() {
// 停止所有续期任务
renewalTasks.values().forEach(task -> task.cancel(false));
renewalTasks.clear();

// 关闭续期执行器
renewalExecutor.shutdown();
try {
if (!renewalExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
renewalExecutor.shutdownNow();
}
} catch (InterruptedException e) {
renewalExecutor.shutdownNow();
}
}
}

分布式锁的使用模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
// 分布式锁使用工具类
@Component
public class DistributedLockTemplate {

@Autowired
private DistributedLock distributedLock;

/**
* 执行带锁的操作
*/
public <T> T executeWithLock(String lockKey, long expireTime, TimeUnit timeUnit,
Supplier<T> action) {
String lockValue = generateLockValue();

if (distributedLock.tryLock(lockKey, lockValue, expireTime, timeUnit)) {
try {
return action.get();
} finally {
distributedLock.unlock(lockKey, lockValue);
}
} else {
throw new LockAcquisitionException("Failed to acquire lock: " + lockKey);
}
}

/**
* 执行带锁的操作(带超时)
*/
public <T> T executeWithLock(String lockKey, long expireTime, long waitTime,
TimeUnit timeUnit, Supplier<T> action) {
String lockValue = generateLockValue();

if (distributedLock.tryLock(lockKey, lockValue, expireTime, waitTime, timeUnit)) {
try {
return action.get();
} finally {
distributedLock.unlock(lockKey, lockValue);
}
} else {
throw new LockAcquisitionException(
"Failed to acquire lock within timeout: " + lockKey);
}
}

/**
* 执行带锁的操作(无返回值)
*/
public void executeWithLock(String lockKey, long expireTime, TimeUnit timeUnit,
Runnable action) {
executeWithLock(lockKey, expireTime, timeUnit, () -> {
action.run();
return null;
});
}

/**
* 尝试执行带锁的操作
*/
public <T> Optional<T> tryExecuteWithLock(String lockKey, long expireTime,
TimeUnit timeUnit, Supplier<T> action) {
String lockValue = generateLockValue();

if (distributedLock.tryLock(lockKey, lockValue, expireTime, timeUnit)) {
try {
return Optional.of(action.get());
} finally {
distributedLock.unlock(lockKey, lockValue);
}
} else {
return Optional.empty();
}
}

private String generateLockValue() {
return Thread.currentThread().getName() + ":" +
System.currentTimeMillis() + ":" +
UUID.randomUUID().toString();
}

// 自定义异常
public static class LockAcquisitionException extends RuntimeException {
public LockAcquisitionException(String message) {
super(message);
}
}
}

// 分布式锁应用示例
@Service
public class OrderService {

@Autowired
private DistributedLockTemplate lockTemplate;

@Autowired
private OrderRepository orderRepository;

@Autowired
private InventoryService inventoryService;

@Autowired
private PaymentService paymentService;

/**
* 创建订单(防止重复提交)
*/
public Order createOrder(CreateOrderRequest request) {
String lockKey = "create_order:" + request.getUserId() + ":" + request.getRequestId();

return lockTemplate.executeWithLock(lockKey, 30, TimeUnit.SECONDS, () -> {
// 检查是否已存在相同的订单
Order existingOrder = orderRepository.findByUserIdAndRequestId(
request.getUserId(), request.getRequestId());

if (existingOrder != null) {
log.info("Order already exists: {}", existingOrder.getId());
return existingOrder;
}

// 检查库存
if (!inventoryService.checkStock(request.getProductId(), request.getQuantity())) {
throw new InsufficientStockException("Insufficient stock for product: " +
request.getProductId());
}

// 扣减库存
inventoryService.reduceStock(request.getProductId(), request.getQuantity());

// 创建订单
Order order = new Order();
order.setUserId(request.getUserId());
order.setProductId(request.getProductId());
order.setQuantity(request.getQuantity());
order.setAmount(request.getAmount());
order.setRequestId(request.getRequestId());
order.setStatus(OrderStatus.PENDING);
order.setCreateTime(LocalDateTime.now());

Order savedOrder = orderRepository.save(order);

log.info("Order created successfully: {}", savedOrder.getId());
return savedOrder;
});
}

/**
* 处理订单支付
*/
public boolean processPayment(Long orderId) {
String lockKey = "process_payment:" + orderId;

return lockTemplate.executeWithLock(lockKey, 60, TimeUnit.SECONDS, () -> {
Order order = orderRepository.findById(orderId)
.orElseThrow(() -> new OrderNotFoundException("Order not found: " + orderId));

if (order.getStatus() != OrderStatus.PENDING) {
log.warn("Order is not in pending status: {} (status: {})",
orderId, order.getStatus());
return false;
}

try {
// 调用支付服务
PaymentResult result = paymentService.processPayment(
order.getUserId(), order.getAmount());

if (result.isSuccess()) {
order.setStatus(OrderStatus.PAID);
order.setPaymentId(result.getPaymentId());
order.setPaymentTime(LocalDateTime.now());
orderRepository.save(order);

log.info("Payment processed successfully for order: {}", orderId);
return true;
} else {
order.setStatus(OrderStatus.PAYMENT_FAILED);
orderRepository.save(order);

// 恢复库存
inventoryService.restoreStock(order.getProductId(), order.getQuantity());

log.warn("Payment failed for order: {} (reason: {})",
orderId, result.getFailureReason());
return false;
}
} catch (Exception e) {
order.setStatus(OrderStatus.PAYMENT_ERROR);
orderRepository.save(order);

log.error("Payment error for order: {}", orderId, e);
throw e;
}
});
}

/**
* 批量处理订单
*/
public void batchProcessOrders(List<Long> orderIds) {
// 对订单ID排序,避免死锁
List<Long> sortedOrderIds = orderIds.stream()
.sorted()
.collect(Collectors.toList());

for (Long orderId : sortedOrderIds) {
String lockKey = "batch_process:" + orderId;

lockTemplate.tryExecuteWithLock(lockKey, 10, TimeUnit.SECONDS, () -> {
try {
processOrder(orderId);
return true;
} catch (Exception e) {
log.error("Failed to process order in batch: {}", orderId, e);
return false;
}
}).ifPresentOrElse(
success -> log.debug("Order processed in batch: {}", orderId),
() -> log.warn("Failed to acquire lock for order: {}", orderId)
);
}
}

private void processOrder(Long orderId) {
// 订单处理逻辑
log.info("Processing order: {}", orderId);
}
}

Zookeeper分布式锁实现

基于临时顺序节点的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
// Zookeeper分布式锁实现
@Component
public class ZookeeperDistributedLock {

private final CuratorFramework client;
private static final String LOCK_ROOT_PATH = "/distributed-locks";

public ZookeeperDistributedLock(CuratorFramework client) {
this.client = client;
try {
// 确保根路径存在
if (client.checkExists().forPath(LOCK_ROOT_PATH) == null) {
client.create()
.creatingParentsIfNeeded()
.forPath(LOCK_ROOT_PATH);
}
} catch (Exception e) {
throw new RuntimeException("Failed to initialize ZooKeeper lock root path", e);
}
}

public boolean tryLock(String lockKey, long waitTime, TimeUnit timeUnit) {
String lockPath = LOCK_ROOT_PATH + "/" + lockKey;

InterProcessMutex mutex = new InterProcessMutex(client, lockPath);

try {
return mutex.acquire(waitTime, timeUnit);
} catch (Exception e) {
log.error("Failed to acquire ZooKeeper lock: {}", lockKey, e);
return false;
}
}

public void unlock(String lockKey) {
String lockPath = LOCK_ROOT_PATH + "/" + lockKey;

InterProcessMutex mutex = new InterProcessMutex(client, lockPath);

try {
mutex.release();
} catch (Exception e) {
log.error("Failed to release ZooKeeper lock: {}", lockKey, e);
}
}

public <T> T executeWithLock(String lockKey, long waitTime, TimeUnit timeUnit,
Supplier<T> action) {
String lockPath = LOCK_ROOT_PATH + "/" + lockKey;

InterProcessMutex mutex = new InterProcessMutex(client, lockPath);

try {
if (mutex.acquire(waitTime, timeUnit)) {
try {
return action.get();
} finally {
mutex.release();
}
} else {
throw new LockAcquisitionException(
"Failed to acquire ZooKeeper lock within timeout: " + lockKey);
}
} catch (Exception e) {
if (e instanceof LockAcquisitionException) {
throw e;
}
throw new RuntimeException("Error executing with ZooKeeper lock: " + lockKey, e);
}
}

// 自定义异常
public static class LockAcquisitionException extends RuntimeException {
public LockAcquisitionException(String message) {
super(message);
}
}
}

// Zookeeper配置
@Configuration
public class ZookeeperConfiguration {

@Value("${zookeeper.connection-string}")
private String connectionString;

@Value("${zookeeper.session-timeout:5000}")
private int sessionTimeout;

@Value("${zookeeper.connection-timeout:3000}")
private int connectionTimeout;

@Bean
public CuratorFramework curatorFramework() {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);

CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(connectionString)
.sessionTimeoutMs(sessionTimeout)
.connectionTimeoutMs(connectionTimeout)
.retryPolicy(retryPolicy)
.build();

client.start();

try {
// 等待连接建立
client.blockUntilConnected(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Failed to connect to ZooKeeper", e);
}

return client;
}
}

分布式锁性能优化

锁粒度优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
// 细粒度锁管理器
@Component
public class FineGrainedLockManager {

@Autowired
private DistributedLock distributedLock;

/**
* 用户级别的锁
*/
public <T> T executeWithUserLock(Long userId, Supplier<T> action) {
String lockKey = "user:" + userId;
return executeWithLock(lockKey, action);
}

/**
* 商品级别的锁
*/
public <T> T executeWithProductLock(Long productId, Supplier<T> action) {
String lockKey = "product:" + productId;
return executeWithLock(lockKey, action);
}

/**
* 订单级别的锁
*/
public <T> T executeWithOrderLock(Long orderId, Supplier<T> action) {
String lockKey = "order:" + orderId;
return executeWithLock(lockKey, action);
}

/**
* 库存分片锁(减少锁竞争)
*/
public <T> T executeWithInventoryShardLock(Long productId, int shardCount,
Supplier<T> action) {
// 基于产品ID计算分片
int shard = (int) (productId % shardCount);
String lockKey = "inventory_shard:" + shard;
return executeWithLock(lockKey, action);
}

/**
* 组合锁(多个资源的原子操作)
*/
public <T> T executeWithMultipleLocks(List<String> lockKeys, Supplier<T> action) {
// 对锁键排序,避免死锁
List<String> sortedKeys = lockKeys.stream()
.sorted()
.collect(Collectors.toList());

return executeWithSortedLocks(sortedKeys, action);
}

private <T> T executeWithSortedLocks(List<String> sortedKeys, Supplier<T> action) {
if (sortedKeys.isEmpty()) {
return action.get();
}

String firstKey = sortedKeys.get(0);
List<String> remainingKeys = sortedKeys.subList(1, sortedKeys.size());

return executeWithLock(firstKey, () ->
executeWithSortedLocks(remainingKeys, action));
}

private <T> T executeWithLock(String lockKey, Supplier<T> action) {
String lockValue = generateLockValue();

if (distributedLock.tryLock(lockKey, lockValue, 30, TimeUnit.SECONDS)) {
try {
return action.get();
} finally {
distributedLock.unlock(lockKey, lockValue);
}
} else {
throw new RuntimeException("Failed to acquire lock: " + lockKey);
}
}

private String generateLockValue() {
return Thread.currentThread().getName() + ":" +
System.currentTimeMillis() + ":" +
UUID.randomUUID().toString();
}
}

锁监控与告警

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
// 分布式锁监控
@Component
public class DistributedLockMonitor {

private final MeterRegistry meterRegistry;
private final Counter lockAcquisitionCounter;
private final Counter lockReleaseCounter;
private final Timer lockHoldTimer;
private final Gauge activeLockGauge;

private final AtomicLong activeLockCount = new AtomicLong(0);
private final Map<String, LockMetrics> lockMetricsMap = new ConcurrentHashMap<>();

public DistributedLockMonitor(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;

this.lockAcquisitionCounter = Counter.builder("distributed_lock_acquisition")
.description("Number of lock acquisitions")
.register(meterRegistry);

this.lockReleaseCounter = Counter.builder("distributed_lock_release")
.description("Number of lock releases")
.register(meterRegistry);

this.lockHoldTimer = Timer.builder("distributed_lock_hold_time")
.description("Time locks are held")
.register(meterRegistry);

this.activeLockGauge = Gauge.builder("distributed_lock_active")
.description("Number of active locks")
.register(meterRegistry, this, monitor -> monitor.activeLockCount.get());
}

public void recordLockAcquisition(String lockKey, boolean success) {
lockAcquisitionCounter.increment(
Tags.of(
"lock_key", lockKey,
"success", String.valueOf(success)
)
);

if (success) {
activeLockCount.incrementAndGet();
lockMetricsMap.put(lockKey, new LockMetrics(System.currentTimeMillis()));
}
}

public void recordLockRelease(String lockKey) {
lockReleaseCounter.increment(Tags.of("lock_key", lockKey));

activeLockCount.decrementAndGet();

LockMetrics metrics = lockMetricsMap.remove(lockKey);
if (metrics != null) {
long holdTime = System.currentTimeMillis() - metrics.getAcquisitionTime();
lockHoldTimer.record(holdTime, TimeUnit.MILLISECONDS);
}
}

@Scheduled(fixedRate = 60000) // 每分钟检查一次
public void checkLongHeldLocks() {
long currentTime = System.currentTimeMillis();
long warningThreshold = 5 * 60 * 1000; // 5分钟

lockMetricsMap.entrySet().stream()
.filter(entry -> currentTime - entry.getValue().getAcquisitionTime() > warningThreshold)
.forEach(entry -> {
String lockKey = entry.getKey();
long holdTime = currentTime - entry.getValue().getAcquisitionTime();

log.warn("Lock held for too long: {} ({}ms)", lockKey, holdTime);

// 发送告警
sendLockAlert(lockKey, holdTime);
});
}

private void sendLockAlert(String lockKey, long holdTime) {
// 实现告警逻辑,如发送邮件、短信或推送到监控系统
log.error("ALERT: Distributed lock held for {} ms: {}", holdTime, lockKey);
}

@EventListener
public void handleApplicationShutdown(ContextClosedEvent event) {
// 应用关闭时,记录未释放的锁
if (!lockMetricsMap.isEmpty()) {
log.warn("Application shutting down with {} unreleased locks",
lockMetricsMap.size());

lockMetricsMap.keySet().forEach(lockKey ->
log.warn("Unreleased lock: {}", lockKey));
}
}

private static class LockMetrics {
private final long acquisitionTime;

public LockMetrics(long acquisitionTime) {
this.acquisitionTime = acquisitionTime;
}

public long getAcquisitionTime() {
return acquisitionTime;
}
}
}

最佳实践与总结

分布式锁使用原则

  1. 锁粒度要合适:既要保证数据一致性,又要避免过度串行化
  2. 设置合理的超时时间:防止死锁,但要确保业务操作能够完成
  3. 实现锁的自动续期:对于长时间运行的操作
  4. 使用唯一的锁值:确保只有持有锁的客户端才能释放锁
  5. 处理锁获取失败:实现合适的重试和降级策略

性能优化建议

  1. 减少锁的持有时间:优化业务逻辑,尽快释放锁
  2. 使用分片锁:将热点资源分片,减少锁竞争
  3. 实现锁的批量操作:减少网络开销
  4. 监控锁的使用情况:及时发现性能瓶颈

故障处理策略

  1. 锁服务高可用:使用集群部署,避免单点故障
  2. 锁的自动过期:防止客户端异常导致的死锁
  3. 锁状态监控:实时监控锁的获取和释放情况
  4. 降级方案:在锁服务不可用时的备选方案

分布式锁是构建可靠分布式系统的重要组件,正确的实现和使用能够有效保障数据的一致性和系统的稳定性。在实际应用中,需要根据具体的业务场景选择合适的实现方案,并结合监控和告警机制,确保分布式锁的可靠运行。

版权所有,如有侵权请联系我