分布式事务处理:保障数据一致性的核心技术

在微服务架构和分布式系统中,数据一致性是一个永恒的挑战。当一个业务操作需要跨越多个服务或数据库时,如何确保所有操作要么全部成功,要么全部失败,成为了系统设计的关键问题。传统的ACID事务在单体应用中运行良好,但在分布式环境下面临着网络延迟、节点故障、数据分片等复杂挑战。本文将深入探讨分布式事务的核心概念、主要解决方案以及在实际项目中的应用实践。

分布式事务基础理论

CAP定理与BASE理论

CAP定理指出,在分布式系统中,一致性(Consistency)、可用性(Availability)和分区容错性(Partition tolerance)三者不可兼得,最多只能同时满足其中两个。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
// CAP定理的实际体现
public class CAPExample {

// 强一致性示例(牺牲可用性)
public class StrongConsistencyService {
private final Object lock = new Object();
private volatile boolean isUpdating = false;

public void updateData(String data) throws ServiceUnavailableException {
synchronized (lock) {
if (isUpdating) {
throw new ServiceUnavailableException("Service is updating, please try later");
}

isUpdating = true;
try {
// 同步更新所有节点
updateAllNodes(data);
} finally {
isUpdating = false;
}
}
}

private void updateAllNodes(String data) {
List<Node> nodes = clusterManager.getAllNodes();
for (Node node : nodes) {
if (!node.update(data)) {
throw new RuntimeException("Failed to update node: " + node.getId());
}
}
}
}

// 最终一致性示例(牺牲强一致性)
public class EventualConsistencyService {

public void updateData(String data) {
// 立即更新本地节点
localDataStore.update(data);

// 异步传播到其他节点
CompletableFuture.runAsync(() -> {
propagateToOtherNodes(data);
});
}

private void propagateToOtherNodes(String data) {
List<Node> nodes = clusterManager.getOtherNodes();
for (Node node : nodes) {
try {
node.updateAsync(data);
} catch (Exception e) {
// 记录失败,稍后重试
retryQueue.add(new UpdateTask(node, data));
}
}
}
}
}

BASE理论提出了一种更适合分布式系统的一致性模型:

  • Basically Available(基本可用):系统在出现故障时,允许损失部分可用性
  • Soft state(软状态):允许系统存在中间状态
  • Eventual consistency(最终一致性):系统中的所有数据副本经过一定时间后,最终能够达到一致的状态

ACID vs BASE

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
// ACID事务示例(传统数据库)
@Transactional
public class ACIDTransactionExample {

public void transferMoney(Long fromAccount, Long toAccount, BigDecimal amount) {
// 原子性:要么全部成功,要么全部失败
Account from = accountRepository.findById(fromAccount);
Account to = accountRepository.findById(toAccount);

if (from.getBalance().compareTo(amount) < 0) {
throw new InsufficientFundsException();
}

// 一致性:业务规则得到满足
from.setBalance(from.getBalance().subtract(amount));
to.setBalance(to.getBalance().add(amount));

// 隔离性:并发事务不会相互影响
accountRepository.save(from);
accountRepository.save(to);

// 持久性:事务提交后数据永久保存
auditService.logTransfer(fromAccount, toAccount, amount);
}
}

// BASE事务示例(分布式系统)
public class BASETransactionExample {

public void transferMoneyDistributed(Long fromAccount, Long toAccount, BigDecimal amount) {
String transactionId = UUID.randomUUID().toString();

try {
// 基本可用:即使部分服务不可用,核心功能仍可使用
TransferRequest request = new TransferRequest(transactionId, fromAccount, toAccount, amount);

// 软状态:允许中间状态存在
transferEventPublisher.publishTransferInitiated(request);

// 异步处理,最终一致性
CompletableFuture.allOf(
debitAccountAsync(fromAccount, amount, transactionId),
creditAccountAsync(toAccount, amount, transactionId)
).thenRun(() -> {
transferEventPublisher.publishTransferCompleted(transactionId);
}).exceptionally(throwable -> {
transferEventPublisher.publishTransferFailed(transactionId, throwable.getMessage());
return null;
});

} catch (Exception e) {
transferEventPublisher.publishTransferFailed(transactionId, e.getMessage());
}
}

private CompletableFuture<Void> debitAccountAsync(Long accountId, BigDecimal amount, String transactionId) {
return CompletableFuture.runAsync(() -> {
accountService.debit(accountId, amount, transactionId);
});
}

private CompletableFuture<Void> creditAccountAsync(Long accountId, BigDecimal amount, String transactionId) {
return CompletableFuture.runAsync(() -> {
accountService.credit(accountId, amount, transactionId);
});
}
}

分布式事务解决方案

1. 两阶段提交协议(2PC)

2PC是最经典的分布式事务解决方案,包含准备阶段和提交阶段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
// 2PC协调器实现
@Component
public class TwoPhaseCommitCoordinator {

private final Map<String, TransactionContext> activeTransactions = new ConcurrentHashMap<>();

public boolean executeTransaction(String transactionId, List<TransactionParticipant> participants) {
TransactionContext context = new TransactionContext(transactionId, participants);
activeTransactions.put(transactionId, context);

try {
// 阶段1:准备阶段
if (!preparePhase(context)) {
abortTransaction(context);
return false;
}

// 阶段2:提交阶段
return commitPhase(context);

} catch (Exception e) {
log.error("Transaction {} failed", transactionId, e);
abortTransaction(context);
return false;
} finally {
activeTransactions.remove(transactionId);
}
}

private boolean preparePhase(TransactionContext context) {
log.info("Starting prepare phase for transaction: {}", context.getTransactionId());

List<CompletableFuture<Boolean>> prepareFutures = context.getParticipants().stream()
.map(participant -> CompletableFuture.supplyAsync(() -> {
try {
boolean prepared = participant.prepare(context.getTransactionId());
log.info("Participant {} prepare result: {}", participant.getId(), prepared);
return prepared;
} catch (Exception e) {
log.error("Participant {} prepare failed", participant.getId(), e);
return false;
}
}))
.collect(Collectors.toList());

try {
// 等待所有参与者准备完成
CompletableFuture<Void> allPrepared = CompletableFuture.allOf(
prepareFutures.toArray(new CompletableFuture[0])
);

allPrepared.get(30, TimeUnit.SECONDS); // 30秒超时

// 检查所有参与者是否都准备成功
boolean allSuccess = prepareFutures.stream()
.allMatch(future -> {
try {
return future.get();
} catch (Exception e) {
return false;
}
});

context.setPrepareResult(allSuccess);
return allSuccess;

} catch (TimeoutException e) {
log.error("Prepare phase timeout for transaction: {}", context.getTransactionId());
return false;
} catch (Exception e) {
log.error("Prepare phase failed for transaction: {}", context.getTransactionId(), e);
return false;
}
}

private boolean commitPhase(TransactionContext context) {
log.info("Starting commit phase for transaction: {}", context.getTransactionId());

List<CompletableFuture<Boolean>> commitFutures = context.getParticipants().stream()
.map(participant -> CompletableFuture.supplyAsync(() -> {
try {
boolean committed = participant.commit(context.getTransactionId());
log.info("Participant {} commit result: {}", participant.getId(), committed);
return committed;
} catch (Exception e) {
log.error("Participant {} commit failed", participant.getId(), e);
return false;
}
}))
.collect(Collectors.toList());

try {
CompletableFuture<Void> allCommitted = CompletableFuture.allOf(
commitFutures.toArray(new CompletableFuture[0])
);

allCommitted.get(60, TimeUnit.SECONDS); // 60秒超时

boolean allSuccess = commitFutures.stream()
.allMatch(future -> {
try {
return future.get();
} catch (Exception e) {
return false;
}
});

if (!allSuccess) {
log.error("Some participants failed to commit transaction: {}", context.getTransactionId());
// 在实际实现中,这里需要处理部分提交的情况
}

return allSuccess;

} catch (Exception e) {
log.error("Commit phase failed for transaction: {}", context.getTransactionId(), e);
return false;
}
}

private void abortTransaction(TransactionContext context) {
log.info("Aborting transaction: {}", context.getTransactionId());

context.getParticipants().parallelStream().forEach(participant -> {
try {
participant.abort(context.getTransactionId());
log.info("Participant {} aborted successfully", participant.getId());
} catch (Exception e) {
log.error("Failed to abort participant {}", participant.getId(), e);
}
});
}
}

// 事务参与者接口
public interface TransactionParticipant {
String getId();
boolean prepare(String transactionId);
boolean commit(String transactionId);
void abort(String transactionId);
}

// 具体参与者实现
@Component
public class DatabaseTransactionParticipant implements TransactionParticipant {

private final Map<String, Connection> transactionConnections = new ConcurrentHashMap<>();

@Override
public String getId() {
return "database-participant";
}

@Override
public boolean prepare(String transactionId) {
try {
Connection connection = dataSource.getConnection();
connection.setAutoCommit(false);

// 执行业务操作
executeBusinessLogic(connection, transactionId);

// 保存连接以备提交或回滚
transactionConnections.put(transactionId, connection);

log.info("Database participant prepared for transaction: {}", transactionId);
return true;

} catch (Exception e) {
log.error("Database participant prepare failed for transaction: {}", transactionId, e);
return false;
}
}

@Override
public boolean commit(String transactionId) {
Connection connection = transactionConnections.remove(transactionId);
if (connection == null) {
log.error("No connection found for transaction: {}", transactionId);
return false;
}

try {
connection.commit();
log.info("Database participant committed transaction: {}", transactionId);
return true;
} catch (Exception e) {
log.error("Database participant commit failed for transaction: {}", transactionId, e);
return false;
} finally {
try {
connection.close();
} catch (SQLException e) {
log.warn("Failed to close connection for transaction: {}", transactionId, e);
}
}
}

@Override
public void abort(String transactionId) {
Connection connection = transactionConnections.remove(transactionId);
if (connection == null) {
log.warn("No connection found for transaction: {}", transactionId);
return;
}

try {
connection.rollback();
log.info("Database participant aborted transaction: {}", transactionId);
} catch (Exception e) {
log.error("Database participant abort failed for transaction: {}", transactionId, e);
} finally {
try {
connection.close();
} catch (SQLException e) {
log.warn("Failed to close connection for transaction: {}", transactionId, e);
}
}
}

private void executeBusinessLogic(Connection connection, String transactionId) throws SQLException {
// 执行具体的数据库操作
PreparedStatement stmt = connection.prepareStatement(
"UPDATE accounts SET balance = balance - ? WHERE id = ?"
);
stmt.setBigDecimal(1, new BigDecimal("100.00"));
stmt.setLong(2, 12345L);
stmt.executeUpdate();
}
}

2. 三阶段提交协议(3PC)

3PC在2PC基础上增加了预提交阶段,减少了阻塞时间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
// 3PC协调器实现
@Component
public class ThreePhaseCommitCoordinator {

public boolean executeTransaction(String transactionId, List<TransactionParticipant3PC> participants) {
TransactionContext context = new TransactionContext(transactionId, participants);

try {
// 阶段1:CanCommit
if (!canCommitPhase(context)) {
return false;
}

// 阶段2:PreCommit
if (!preCommitPhase(context)) {
abortTransaction(context);
return false;
}

// 阶段3:DoCommit
return doCommitPhase(context);

} catch (Exception e) {
log.error("3PC transaction {} failed", transactionId, e);
abortTransaction(context);
return false;
}
}

private boolean canCommitPhase(TransactionContext context) {
log.info("Starting CanCommit phase for transaction: {}", context.getTransactionId());

return context.getParticipants().parallelStream()
.allMatch(participant -> {
try {
return ((TransactionParticipant3PC) participant).canCommit(context.getTransactionId());
} catch (Exception e) {
log.error("CanCommit failed for participant: {}", participant.getId(), e);
return false;
}
});
}

private boolean preCommitPhase(TransactionContext context) {
log.info("Starting PreCommit phase for transaction: {}", context.getTransactionId());

return context.getParticipants().parallelStream()
.allMatch(participant -> {
try {
return ((TransactionParticipant3PC) participant).preCommit(context.getTransactionId());
} catch (Exception e) {
log.error("PreCommit failed for participant: {}", participant.getId(), e);
return false;
}
});
}

private boolean doCommitPhase(TransactionContext context) {
log.info("Starting DoCommit phase for transaction: {}", context.getTransactionId());

return context.getParticipants().parallelStream()
.allMatch(participant -> {
try {
return ((TransactionParticipant3PC) participant).doCommit(context.getTransactionId());
} catch (Exception e) {
log.error("DoCommit failed for participant: {}", participant.getId(), e);
return false;
}
});
}
}

// 3PC参与者接口
public interface TransactionParticipant3PC extends TransactionParticipant {
boolean canCommit(String transactionId);
boolean preCommit(String transactionId);
boolean doCommit(String transactionId);
}

3. Saga模式

Saga模式将长事务分解为一系列短事务,每个短事务都有对应的补偿操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
// Saga编排器
@Component
public class SagaOrchestrator {

private final Map<String, SagaExecution> activeSagas = new ConcurrentHashMap<>();

public void executeSaga(SagaDefinition sagaDefinition) {
String sagaId = UUID.randomUUID().toString();
SagaExecution execution = new SagaExecution(sagaId, sagaDefinition);
activeSagas.put(sagaId, execution);

try {
executeSteps(execution);
} catch (Exception e) {
log.error("Saga {} execution failed", sagaId, e);
compensate(execution);
} finally {
activeSagas.remove(sagaId);
}
}

private void executeSteps(SagaExecution execution) {
List<SagaStep> steps = execution.getDefinition().getSteps();

for (int i = 0; i < steps.size(); i++) {
SagaStep step = steps.get(i);

try {
log.info("Executing step {} of saga {}", step.getName(), execution.getSagaId());

StepResult result = step.execute(execution.getContext());
execution.addExecutedStep(step, result);

if (!result.isSuccess()) {
log.error("Step {} failed: {}", step.getName(), result.getErrorMessage());
throw new SagaStepException("Step execution failed: " + step.getName());
}

log.info("Step {} completed successfully", step.getName());

} catch (Exception e) {
log.error("Step {} execution failed", step.getName(), e);
throw new SagaExecutionException("Failed to execute step: " + step.getName(), e);
}
}

log.info("Saga {} completed successfully", execution.getSagaId());
}

private void compensate(SagaExecution execution) {
log.info("Starting compensation for saga: {}", execution.getSagaId());

List<ExecutedStep> executedSteps = execution.getExecutedSteps();

// 按相反顺序执行补偿
for (int i = executedSteps.size() - 1; i >= 0; i--) {
ExecutedStep executedStep = executedSteps.get(i);
SagaStep step = executedStep.getStep();

try {
log.info("Compensating step {} of saga {}", step.getName(), execution.getSagaId());

CompensationResult result = step.compensate(execution.getContext(), executedStep.getResult());

if (!result.isSuccess()) {
log.error("Compensation for step {} failed: {}", step.getName(), result.getErrorMessage());
// 补偿失败需要人工干预
alertService.sendCompensationFailureAlert(execution.getSagaId(), step.getName());
} else {
log.info("Step {} compensated successfully", step.getName());
}

} catch (Exception e) {
log.error("Compensation for step {} failed", step.getName(), e);
alertService.sendCompensationFailureAlert(execution.getSagaId(), step.getName());
}
}

log.info("Compensation for saga {} completed", execution.getSagaId());
}
}

// Saga步骤定义
public abstract class SagaStep {
private final String name;

public SagaStep(String name) {
this.name = name;
}

public abstract StepResult execute(SagaContext context);
public abstract CompensationResult compensate(SagaContext context, StepResult stepResult);

public String getName() {
return name;
}
}

// 具体的Saga步骤实现
public class CreateOrderStep extends SagaStep {

private final OrderService orderService;

public CreateOrderStep(OrderService orderService) {
super("CreateOrder");
this.orderService = orderService;
}

@Override
public StepResult execute(SagaContext context) {
try {
CreateOrderRequest request = context.get("orderRequest", CreateOrderRequest.class);
Order order = orderService.createOrder(request);

context.put("orderId", order.getId());
context.put("order", order);

return StepResult.success(Map.of("orderId", order.getId()));
} catch (Exception e) {
return StepResult.failure("Failed to create order: " + e.getMessage());
}
}

@Override
public CompensationResult compensate(SagaContext context, StepResult stepResult) {
try {
Long orderId = context.get("orderId", Long.class);
if (orderId != null) {
orderService.cancelOrder(orderId);
log.info("Order {} cancelled successfully", orderId);
}
return CompensationResult.success();
} catch (Exception e) {
return CompensationResult.failure("Failed to cancel order: " + e.getMessage());
}
}
}

public class ReserveInventoryStep extends SagaStep {

private final InventoryService inventoryService;

public ReserveInventoryStep(InventoryService inventoryService) {
super("ReserveInventory");
this.inventoryService = inventoryService;
}

@Override
public StepResult execute(SagaContext context) {
try {
Order order = context.get("order", Order.class);

List<InventoryReservation> reservations = new ArrayList<>();
for (OrderItem item : order.getItems()) {
InventoryReservation reservation = inventoryService.reserveInventory(
item.getProductId(), item.getQuantity()
);
reservations.add(reservation);
}

context.put("inventoryReservations", reservations);

return StepResult.success(Map.of("reservations", reservations));
} catch (InsufficientInventoryException e) {
return StepResult.failure("Insufficient inventory: " + e.getMessage());
} catch (Exception e) {
return StepResult.failure("Failed to reserve inventory: " + e.getMessage());
}
}

@Override
public CompensationResult compensate(SagaContext context, StepResult stepResult) {
try {
List<InventoryReservation> reservations = context.get("inventoryReservations", List.class);
if (reservations != null) {
for (InventoryReservation reservation : reservations) {
inventoryService.releaseReservation(reservation.getId());
}
log.info("Inventory reservations released successfully");
}
return CompensationResult.success();
} catch (Exception e) {
return CompensationResult.failure("Failed to release inventory reservations: " + e.getMessage());
}
}
}

public class ProcessPaymentStep extends SagaStep {

private final PaymentService paymentService;

public ProcessPaymentStep(PaymentService paymentService) {
super("ProcessPayment");
this.paymentService = paymentService;
}

@Override
public StepResult execute(SagaContext context) {
try {
Order order = context.get("order", Order.class);
PaymentRequest paymentRequest = context.get("paymentRequest", PaymentRequest.class);

Payment payment = paymentService.processPayment(
order.getId(),
order.getTotalAmount(),
paymentRequest.getPaymentMethod()
);

context.put("paymentId", payment.getId());
context.put("payment", payment);

return StepResult.success(Map.of("paymentId", payment.getId()));
} catch (PaymentDeclinedException e) {
return StepResult.failure("Payment declined: " + e.getMessage());
} catch (Exception e) {
return StepResult.failure("Failed to process payment: " + e.getMessage());
}
}

@Override
public CompensationResult compensate(SagaContext context, StepResult stepResult) {
try {
Long paymentId = context.get("paymentId", Long.class);
if (paymentId != null) {
paymentService.refundPayment(paymentId);
log.info("Payment {} refunded successfully", paymentId);
}
return CompensationResult.success();
} catch (Exception e) {
return CompensationResult.failure("Failed to refund payment: " + e.getMessage());
}
}
}

4. TCC模式

TCC(Try-Confirm-Cancel)模式是一种补偿型事务模式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
// TCC事务管理器
@Component
public class TCCTransactionManager {

private final Map<String, TCCTransaction> activeTransactions = new ConcurrentHashMap<>();

public boolean executeTransaction(String transactionId, List<TCCParticipant> participants) {
TCCTransaction transaction = new TCCTransaction(transactionId, participants);
activeTransactions.put(transactionId, transaction);

try {
// Try阶段
if (!tryPhase(transaction)) {
cancelPhase(transaction);
return false;
}

// Confirm阶段
return confirmPhase(transaction);

} catch (Exception e) {
log.error("TCC transaction {} failed", transactionId, e);
cancelPhase(transaction);
return false;
} finally {
activeTransactions.remove(transactionId);
}
}

private boolean tryPhase(TCCTransaction transaction) {
log.info("Starting Try phase for transaction: {}", transaction.getTransactionId());

for (TCCParticipant participant : transaction.getParticipants()) {
try {
boolean tryResult = participant.tryExecute(transaction.getTransactionId());
transaction.addTryResult(participant, tryResult);

if (!tryResult) {
log.error("Try failed for participant: {}", participant.getId());
return false;
}
} catch (Exception e) {
log.error("Try phase failed for participant: {}", participant.getId(), e);
transaction.addTryResult(participant, false);
return false;
}
}

return true;
}

private boolean confirmPhase(TCCTransaction transaction) {
log.info("Starting Confirm phase for transaction: {}", transaction.getTransactionId());

boolean allConfirmed = true;

for (TCCParticipant participant : transaction.getParticipants()) {
try {
boolean confirmResult = participant.confirmExecute(transaction.getTransactionId());
if (!confirmResult) {
log.error("Confirm failed for participant: {}", participant.getId());
allConfirmed = false;
}
} catch (Exception e) {
log.error("Confirm phase failed for participant: {}", participant.getId(), e);
allConfirmed = false;
}
}

return allConfirmed;
}

private void cancelPhase(TCCTransaction transaction) {
log.info("Starting Cancel phase for transaction: {}", transaction.getTransactionId());

for (TCCParticipant participant : transaction.getParticipants()) {
Boolean tryResult = transaction.getTryResult(participant);
if (tryResult != null && tryResult) {
try {
participant.cancelExecute(transaction.getTransactionId());
log.info("Cancel completed for participant: {}", participant.getId());
} catch (Exception e) {
log.error("Cancel failed for participant: {}", participant.getId(), e);
// 取消失败需要人工干预
alertService.sendCancelFailureAlert(transaction.getTransactionId(), participant.getId());
}
}
}
}
}

// TCC参与者接口
public interface TCCParticipant {
String getId();
boolean tryExecute(String transactionId);
boolean confirmExecute(String transactionId);
void cancelExecute(String transactionId);
}

// 账户服务TCC实现
@Component
public class AccountServiceTCC implements TCCParticipant {

private final Map<String, AccountReservation> reservations = new ConcurrentHashMap<>();

@Override
public String getId() {
return "account-service";
}

@Override
public boolean tryExecute(String transactionId) {
try {
// 从事务上下文获取转账信息
TransferContext context = getTransferContext(transactionId);

// 检查余额是否充足
Account fromAccount = accountRepository.findById(context.getFromAccountId());
if (fromAccount.getBalance().compareTo(context.getAmount()) < 0) {
return false;
}

// 冻结资金
AccountReservation reservation = new AccountReservation(
transactionId,
context.getFromAccountId(),
context.getAmount()
);

boolean frozen = accountRepository.freezeAmount(
context.getFromAccountId(),
context.getAmount(),
transactionId
);

if (frozen) {
reservations.put(transactionId, reservation);
log.info("Amount {} frozen for account {} in transaction {}",
context.getAmount(), context.getFromAccountId(), transactionId);
return true;
}

return false;

} catch (Exception e) {
log.error("Try execute failed for transaction: {}", transactionId, e);
return false;
}
}

@Override
public boolean confirmExecute(String transactionId) {
try {
AccountReservation reservation = reservations.get(transactionId);
if (reservation == null) {
log.error("No reservation found for transaction: {}", transactionId);
return false;
}

TransferContext context = getTransferContext(transactionId);

// 执行实际转账
boolean transferred = accountRepository.transferAmount(
context.getFromAccountId(),
context.getToAccountId(),
context.getAmount(),
transactionId
);

if (transferred) {
reservations.remove(transactionId);
log.info("Transfer confirmed for transaction: {}", transactionId);
return true;
}

return false;

} catch (Exception e) {
log.error("Confirm execute failed for transaction: {}", transactionId, e);
return false;
}
}

@Override
public void cancelExecute(String transactionId) {
try {
AccountReservation reservation = reservations.remove(transactionId);
if (reservation != null) {
// 解冻资金
accountRepository.unfreezeAmount(
reservation.getAccountId(),
reservation.getAmount(),
transactionId
);

log.info("Amount {} unfrozen for account {} in transaction {}",
reservation.getAmount(), reservation.getAccountId(), transactionId);
}
} catch (Exception e) {
log.error("Cancel execute failed for transaction: {}", transactionId, e);
}
}

private TransferContext getTransferContext(String transactionId) {
// 从事务上下文存储中获取转账信息
return transactionContextStore.getContext(transactionId, TransferContext.class);
}
}

分布式事务中间件

1. Seata集成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
// Seata配置
@Configuration
public class SeataConfiguration {

@Bean
public GlobalTransactionScanner globalTransactionScanner() {
return new GlobalTransactionScanner("order-service", "my_test_tx_group");
}
}

// 使用Seata的分布式事务
@Service
public class OrderService {

@Autowired
private AccountServiceClient accountServiceClient;

@Autowired
private InventoryServiceClient inventoryServiceClient;

@GlobalTransactional(name = "create-order", rollbackFor = Exception.class)
public Order createOrder(CreateOrderRequest request) {
try {
// 1. 创建订单
Order order = new Order();
order.setUserId(request.getUserId());
order.setTotalAmount(request.getTotalAmount());
order.setStatus(OrderStatus.PENDING);
order = orderRepository.save(order);

// 2. 扣减库存
for (OrderItem item : request.getItems()) {
inventoryServiceClient.decreaseStock(
item.getProductId(),
item.getQuantity()
);
}

// 3. 扣减账户余额
accountServiceClient.debit(
request.getUserId(),
request.getTotalAmount()
);

// 4. 更新订单状态
order.setStatus(OrderStatus.COMPLETED);
order = orderRepository.save(order);

log.info("Order {} created successfully", order.getId());
return order;

} catch (Exception e) {
log.error("Failed to create order", e);
throw new OrderCreationException("Failed to create order: " + e.getMessage(), e);
}
}
}

// 库存服务
@Service
public class InventoryService {

@Transactional
public void decreaseStock(Long productId, Integer quantity) {
Product product = productRepository.findById(productId)
.orElseThrow(() -> new ProductNotFoundException("Product not found: " + productId));

if (product.getStock() < quantity) {
throw new InsufficientStockException(
"Insufficient stock for product: " + productId +
", available: " + product.getStock() +
", required: " + quantity
);
}

product.setStock(product.getStock() - quantity);
productRepository.save(product);

log.info("Stock decreased for product {}: {} -> {}",
productId, product.getStock() + quantity, product.getStock());
}
}

// 账户服务
@Service
public class AccountService {

@Transactional
public void debit(Long userId, BigDecimal amount) {
Account account = accountRepository.findByUserId(userId)
.orElseThrow(() -> new AccountNotFoundException("Account not found for user: " + userId));

if (account.getBalance().compareTo(amount) < 0) {
throw new InsufficientFundsException(
"Insufficient funds for user: " + userId +
", balance: " + account.getBalance() +
", required: " + amount
);
}

account.setBalance(account.getBalance().subtract(amount));
accountRepository.save(account);

log.info("Account debited for user {}: amount {}", userId, amount);
}
}

2. 事务消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
// RocketMQ事务消息生产者
@Component
public class TransactionalMessageProducer {

private final TransactionMQProducer producer;

public TransactionalMessageProducer() {
this.producer = new TransactionMQProducer("order_producer_group");
this.producer.setNamesrvAddr("localhost:9876");
this.producer.setTransactionListener(new OrderTransactionListener());

try {
this.producer.start();
} catch (MQClientException e) {
throw new RuntimeException("Failed to start transaction producer", e);
}
}

public void sendTransactionalMessage(String topic, String tag, Object message, Object localTransactionArg) {
try {
Message msg = new Message(topic, tag, JSON.toJSONBytes(message));

SendResult sendResult = producer.sendMessageInTransaction(msg, localTransactionArg);

log.info("Transactional message sent: {}, result: {}",
msg.getMsgId(), sendResult.getSendStatus());

} catch (MQClientException e) {
log.error("Failed to send transactional message", e);
throw new RuntimeException("Failed to send transactional message", e);
}
}
}

// 事务监听器
public class OrderTransactionListener implements TransactionListener {

@Autowired
private OrderService orderService;

@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
// 执行本地事务
CreateOrderRequest request = (CreateOrderRequest) arg;
Order order = orderService.createOrderLocal(request);

// 将订单ID存储到消息属性中,用于后续检查
msg.putUserProperty("orderId", String.valueOf(order.getId()));

log.info("Local transaction executed successfully for order: {}", order.getId());
return LocalTransactionState.COMMIT_MESSAGE;

} catch (Exception e) {
log.error("Local transaction execution failed", e);
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}

@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
try {
String orderIdStr = msg.getUserProperty("orderId");
if (orderIdStr == null) {
log.warn("No orderId found in message properties");
return LocalTransactionState.ROLLBACK_MESSAGE;
}

Long orderId = Long.valueOf(orderIdStr);
Order order = orderService.getOrder(orderId);

if (order != null && order.getStatus() == OrderStatus.COMPLETED) {
log.info("Local transaction confirmed for order: {}", orderId);
return LocalTransactionState.COMMIT_MESSAGE;
} else {
log.info("Local transaction not found or incomplete for order: {}", orderId);
return LocalTransactionState.ROLLBACK_MESSAGE;
}

} catch (Exception e) {
log.error("Failed to check local transaction", e);
return LocalTransactionState.UNKNOW;
}
}
}

// 事务消息消费者
@Component
public class TransactionalMessageConsumer {

@Autowired
private InventoryService inventoryService;

@Autowired
private AccountService accountService;

@RocketMQMessageListener(
topic = "ORDER_TOPIC",
consumerGroup = "inventory_consumer_group",
messageModel = MessageModel.CLUSTERING
)
public class InventoryConsumer implements RocketMQListener<String> {

@Override
public void onMessage(String message) {
try {
OrderCreatedEvent event = JSON.parseObject(message, OrderCreatedEvent.class);

// 处理库存扣减
for (OrderItem item : event.getItems()) {
inventoryService.decreaseStock(item.getProductId(), item.getQuantity());
}

log.info("Inventory updated for order: {}", event.getOrderId());

} catch (Exception e) {
log.error("Failed to process inventory update", e);
throw new RuntimeException("Inventory update failed", e);
}
}
}

@RocketMQMessageListener(
topic = "ORDER_TOPIC",
consumerGroup = "account_consumer_group",
messageModel = MessageModel.CLUSTERING
)
public class AccountConsumer implements RocketMQListener<String> {

@Override
public void onMessage(String message) {
try {
OrderCreatedEvent event = JSON.parseObject(message, OrderCreatedEvent.class);

// 处理账户扣款
accountService.debit(event.getUserId(), event.getTotalAmount());

log.info("Account debited for order: {}", event.getOrderId());

} catch (Exception e) {
log.error("Failed to process account debit", e);
throw new RuntimeException("Account debit failed", e);
}
}
}
}

性能优化与最佳实践

1. 事务超时管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
// 超时管理器
@Component
public class TransactionTimeoutManager {

private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(10);
private final Map<String, ScheduledFuture<?>> timeoutTasks = new ConcurrentHashMap<>();

public void scheduleTimeout(String transactionId, long timeoutMillis, Runnable timeoutAction) {
ScheduledFuture<?> timeoutTask = scheduler.schedule(() -> {
try {
log.warn("Transaction {} timed out after {} ms", transactionId, timeoutMillis);
timeoutAction.run();
} catch (Exception e) {
log.error("Error executing timeout action for transaction: {}", transactionId, e);
} finally {
timeoutTasks.remove(transactionId);
}
}, timeoutMillis, TimeUnit.MILLISECONDS);

timeoutTasks.put(transactionId, timeoutTask);
}

public void cancelTimeout(String transactionId) {
ScheduledFuture<?> timeoutTask = timeoutTasks.remove(transactionId);
if (timeoutTask != null && !timeoutTask.isDone()) {
timeoutTask.cancel(false);
log.debug("Timeout cancelled for transaction: {}", transactionId);
}
}

@PreDestroy
public void shutdown() {
scheduler.shutdown();
try {
if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) {
scheduler.shutdownNow();
}
} catch (InterruptedException e) {
scheduler.shutdownNow();
Thread.currentThread().interrupt();
}
}
}

// 带超时的事务执行器
@Component
public class TimeoutAwareTransactionExecutor {

@Autowired
private TransactionTimeoutManager timeoutManager;

public <T> CompletableFuture<T> executeWithTimeout(
String transactionId,
Supplier<T> transactionLogic,
long timeoutMillis) {

CompletableFuture<T> future = new CompletableFuture<>();

// 设置超时处理
timeoutManager.scheduleTimeout(transactionId, timeoutMillis, () -> {
if (!future.isDone()) {
future.completeExceptionally(new TransactionTimeoutException(
"Transaction " + transactionId + " timed out after " + timeoutMillis + " ms"
));
}
});

// 异步执行事务逻辑
CompletableFuture.supplyAsync(() -> {
try {
T result = transactionLogic.get();
timeoutManager.cancelTimeout(transactionId);
future.complete(result);
return result;
} catch (Exception e) {
timeoutManager.cancelTimeout(transactionId);
future.completeExceptionally(e);
throw new RuntimeException(e);
}
});

return future;
}
}

2. 事务状态持久化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
// 事务状态存储
@Repository
public class TransactionStateRepository {

@Autowired
private JdbcTemplate jdbcTemplate;

public void saveTransactionState(TransactionState state) {
String sql = """
INSERT INTO transaction_states
(transaction_id, status, participants, created_at, updated_at, timeout_at, retry_count)
VALUES (?, ?, ?, ?, ?, ?, ?)
ON DUPLICATE KEY UPDATE
status = VALUES(status),
participants = VALUES(participants),
updated_at = VALUES(updated_at),
timeout_at = VALUES(timeout_at),
retry_count = VALUES(retry_count)
""";

jdbcTemplate.update(sql,
state.getTransactionId(),
state.getStatus().name(),
JSON.toJSONString(state.getParticipants()),
state.getCreatedAt(),
state.getUpdatedAt(),
state.getTimeoutAt(),
state.getRetryCount()
);
}

public TransactionState getTransactionState(String transactionId) {
String sql = """
SELECT transaction_id, status, participants, created_at, updated_at, timeout_at, retry_count
FROM transaction_states
WHERE transaction_id = ?
""";

try {
return jdbcTemplate.queryForObject(sql, (rs, rowNum) -> {
TransactionState state = new TransactionState();
state.setTransactionId(rs.getString("transaction_id"));
state.setStatus(TransactionStatus.valueOf(rs.getString("status")));

String participantsJson = rs.getString("participants");
List<String> participants = JSON.parseArray(participantsJson, String.class);
state.setParticipants(participants);

state.setCreatedAt(rs.getTimestamp("created_at").toInstant());
state.setUpdatedAt(rs.getTimestamp("updated_at").toInstant());
state.setTimeoutAt(rs.getTimestamp("timeout_at").toInstant());
state.setRetryCount(rs.getInt("retry_count"));

return state;
}, transactionId);
} catch (EmptyResultDataAccessException e) {
return null;
}
}

public List<TransactionState> getTimeoutTransactions() {
String sql = """
SELECT transaction_id, status, participants, created_at, updated_at, timeout_at, retry_count
FROM transaction_states
WHERE timeout_at < NOW() AND status IN ('PREPARING', 'COMMITTING')
""";

return jdbcTemplate.query(sql, (rs, rowNum) -> {
TransactionState state = new TransactionState();
state.setTransactionId(rs.getString("transaction_id"));
state.setStatus(TransactionStatus.valueOf(rs.getString("status")));

String participantsJson = rs.getString("participants");
List<String> participants = JSON.parseArray(participantsJson, String.class);
state.setParticipants(participants);

state.setCreatedAt(rs.getTimestamp("created_at").toInstant());
state.setUpdatedAt(rs.getTimestamp("updated_at").toInstant());
state.setTimeoutAt(rs.getTimestamp("timeout_at").toInstant());
state.setRetryCount(rs.getInt("retry_count"));

return state;
});
}

public void deleteTransactionState(String transactionId) {
String sql = "DELETE FROM transaction_states WHERE transaction_id = ?";
jdbcTemplate.update(sql, transactionId);
}
}

// 事务恢复服务
@Service
public class TransactionRecoveryService {

@Autowired
private TransactionStateRepository stateRepository;

@Autowired
private TransactionCoordinator coordinator;

@Scheduled(fixedRate = 30000) // 每30秒检查一次
public void recoverTimeoutTransactions() {
try {
List<TransactionState> timeoutTransactions = stateRepository.getTimeoutTransactions();

for (TransactionState state : timeoutTransactions) {
if (state.getRetryCount() < 3) {
log.info("Recovering timeout transaction: {}", state.getTransactionId());
recoverTransaction(state);
} else {
log.error("Transaction {} exceeded max retry count, marking as failed",
state.getTransactionId());
markTransactionAsFailed(state);
}
}
} catch (Exception e) {
log.error("Error during transaction recovery", e);
}
}

private void recoverTransaction(TransactionState state) {
try {
state.setRetryCount(state.getRetryCount() + 1);
state.setUpdatedAt(Instant.now());
state.setTimeout(Instant.now().plusSeconds(60)); // 重新设置超时时间

stateRepository.saveTransactionState(state);

// 根据事务状态决定恢复策略
switch (state.getStatus()) {
case PREPARING:
coordinator.retryPrepare(state.getTransactionId());
break;
case COMMITTING:
coordinator.retryCommit(state.getTransactionId());
break;
default:
log.warn("Unknown transaction status for recovery: {}", state.getStatus());
}

} catch (Exception e) {
log.error("Failed to recover transaction: {}", state.getTransactionId(), e);
}
}

private void markTransactionAsFailed(TransactionState state) {
state.setStatus(TransactionStatus.FAILED);
state.setUpdatedAt(Instant.now());
stateRepository.saveTransactionState(state);

// 发送告警
alertService.sendTransactionFailureAlert(state.getTransactionId());
}
}

总结

分布式事务处理是构建可靠分布式系统的核心技术之一。在实际应用中,需要根据业务特点和系统要求选择合适的解决方案:

  1. 强一致性要求:选择2PC或3PC,但要考虑性能和可用性的影响
  2. 最终一致性可接受:优先考虑Saga模式或事务消息
  3. 高性能要求:使用TCC模式或异步消息
  4. 复杂业务流程:Saga模式提供了最好的灵活性

在设计分布式事务时,还需要重点关注:

  • 事务的幂等性设计
  • 超时和重试机制
  • 事务状态的持久化
  • 异常情况的处理和恢复
  • 监控和告警机制

通过合理的架构设计和技术选型,可以在保证数据一致性的同时,最大化系统的性能和可用性。

版权所有,如有侵权请联系我