分布式消息队列:构建高可用异步通信架构

在现代分布式系统中,服务间的通信方式直接影响着系统的性能、可靠性和可扩展性。随着微服务架构的普及,传统的同步调用方式逐渐暴露出耦合度高、容错性差、性能瓶颈等问题。分布式消息队列作为一种异步通信机制,为解决这些问题提供了优雅的解决方案。本文将深入探讨分布式消息队列的核心概念、主流实现方案以及在实际项目中的应用实践。

消息队列基础概念

消息队列的核心价值

解耦合:生产者和消费者之间不需要直接依赖,通过消息队列进行间接通信。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// 传统同步调用方式
@Service
public class OrderService {

@Autowired
private InventoryService inventoryService;

@Autowired
private PaymentService paymentService;

@Autowired
private NotificationService notificationService;

public Order createOrder(CreateOrderRequest request) {
// 强耦合:直接依赖多个服务
Order order = new Order(request);
orderRepository.save(order);

// 同步调用,任何一个服务失败都会影响整个流程
inventoryService.reserveStock(request.getItems());
paymentService.processPayment(request.getPaymentInfo());
notificationService.sendOrderConfirmation(order);

return order;
}
}

// 基于消息队列的异步方式
@Service
public class OrderService {

@Autowired
private MessageProducer messageProducer;

public Order createOrder(CreateOrderRequest request) {
// 低耦合:只关注核心业务逻辑
Order order = new Order(request);
order.setStatus(OrderStatus.PENDING);
orderRepository.save(order);

// 发布事件,让其他服务异步处理
OrderCreatedEvent event = new OrderCreatedEvent(order);
messageProducer.publish("order.created", event);

return order;
}
}

异步处理:提高系统响应速度和吞吐量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
// 消息生产者
@Component
public class EventPublisher {

@Autowired
private RabbitTemplate rabbitTemplate;

public void publishOrderEvent(OrderEvent event) {
try {
// 异步发送消息,不阻塞主流程
rabbitTemplate.convertAndSend(
"order.exchange",
event.getRoutingKey(),
event,
message -> {
// 设置消息属性
message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
message.getMessageProperties().setTimestamp(new Date());
message.getMessageProperties().setExpiration("300000"); // 5分钟过期
return message;
}
);

log.info("Event published: {}", event.getEventType());
} catch (Exception e) {
log.error("Failed to publish event: {}", event.getEventType(), e);
// 可以选择重试或者记录到死信队列
handlePublishFailure(event, e);
}
}

private void handlePublishFailure(OrderEvent event, Exception e) {
// 实现重试逻辑或者将失败事件存储到数据库
failedEventRepository.save(new FailedEvent(event, e.getMessage()));
}
}

// 消息消费者
@Component
public class InventoryEventConsumer {

@Autowired
private InventoryService inventoryService;

@RabbitListener(queues = "inventory.queue")
public void handleOrderCreated(OrderCreatedEvent event) {
try {
log.info("Processing inventory for order: {}", event.getOrderId());

// 异步处理库存预留
inventoryService.reserveStock(event.getItems());

log.info("Inventory reserved for order: {}", event.getOrderId());
} catch (InsufficientStockException e) {
log.error("Insufficient stock for order: {}", event.getOrderId(), e);
// 发布库存不足事件
publishStockInsufficientEvent(event);
} catch (Exception e) {
log.error("Failed to process inventory for order: {}", event.getOrderId(), e);
throw new AmqpRejectAndDontRequeueException("Processing failed", e);
}
}

private void publishStockInsufficientEvent(OrderCreatedEvent event) {
StockInsufficientEvent stockEvent = new StockInsufficientEvent(
event.getOrderId(),
event.getItems()
);
eventPublisher.publishOrderEvent(stockEvent);
}
}

削峰填谷:处理突发流量,保护下游系统。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
// 流量削峰配置
@Configuration
public class MessageQueueConfiguration {

@Bean
public Queue highTrafficQueue() {
return QueueBuilder.durable("high.traffic.queue")
.withArgument("x-max-length", 10000) // 队列最大长度
.withArgument("x-overflow", "reject-publish") // 超出长度时拒绝发布
.withArgument("x-message-ttl", 300000) // 消息TTL 5分钟
.build();
}

@Bean
public RabbitListenerContainerFactory<?> rateLimitedContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);

// 限制并发消费者数量
factory.setConcurrentConsumers(5);
factory.setMaxConcurrentConsumers(10);

// 设置预取数量,控制消费速率
factory.setPrefetchCount(1);

return factory;
}
}

// 限流消费者
@Component
public class RateLimitedConsumer {

private final RateLimiter rateLimiter = RateLimiter.create(100.0); // 每秒100个请求

@RabbitListener(
queues = "high.traffic.queue",
containerFactory = "rateLimitedContainerFactory"
)
public void processHighTrafficMessage(HighTrafficMessage message) {
// 获取令牌,控制处理速率
rateLimiter.acquire();

try {
log.info("Processing high traffic message: {}", message.getId());

// 处理业务逻辑
businessService.processMessage(message);

log.info("High traffic message processed: {}", message.getId());
} catch (Exception e) {
log.error("Failed to process high traffic message: {}", message.getId(), e);
throw e;
}
}
}

消息队列的核心模式

点对点模式(Queue)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// 工作队列模式
@Component
public class TaskProducer {

@Autowired
private RabbitTemplate rabbitTemplate;

public void submitTask(Task task) {
rabbitTemplate.convertAndSend("task.queue", task);
log.info("Task submitted: {}", task.getId());
}
}

@Component
public class TaskWorker {

@RabbitListener(queues = "task.queue")
public void processTask(Task task) {
try {
log.info("Worker {} processing task: {}",
Thread.currentThread().getName(), task.getId());

// 模拟任务处理
Thread.sleep(task.getProcessingTime());

taskService.completeTask(task);

log.info("Task completed: {}", task.getId());
} catch (Exception e) {
log.error("Task processing failed: {}", task.getId(), e);
throw new AmqpRejectAndDontRequeueException("Task processing failed", e);
}
}
}

发布订阅模式(Topic/Fanout)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
// 发布订阅配置
@Configuration
public class PubSubConfiguration {

@Bean
public TopicExchange orderExchange() {
return new TopicExchange("order.exchange");
}

@Bean
public Queue inventoryQueue() {
return QueueBuilder.durable("inventory.queue").build();
}

@Bean
public Queue paymentQueue() {
return QueueBuilder.durable("payment.queue").build();
}

@Bean
public Queue notificationQueue() {
return QueueBuilder.durable("notification.queue").build();
}

@Bean
public Binding inventoryBinding() {
return BindingBuilder.bind(inventoryQueue())
.to(orderExchange())
.with("order.created");
}

@Bean
public Binding paymentBinding() {
return BindingBuilder.bind(paymentQueue())
.to(orderExchange())
.with("order.created");
}

@Bean
public Binding notificationBinding() {
return BindingBuilder.bind(notificationQueue())
.to(orderExchange())
.with("order.*"); // 订阅所有订单事件
}
}

// 事件发布者
@Component
public class OrderEventPublisher {

@Autowired
private RabbitTemplate rabbitTemplate;

public void publishOrderCreated(Order order) {
OrderCreatedEvent event = new OrderCreatedEvent(order);
rabbitTemplate.convertAndSend(
"order.exchange",
"order.created",
event
);
log.info("Order created event published: {}", order.getId());
}

public void publishOrderCancelled(Order order) {
OrderCancelledEvent event = new OrderCancelledEvent(order);
rabbitTemplate.convertAndSend(
"order.exchange",
"order.cancelled",
event
);
log.info("Order cancelled event published: {}", order.getId());
}
}

主流消息队列技术对比

RabbitMQ实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
// RabbitMQ高级特性配置
@Configuration
public class RabbitMQAdvancedConfiguration {

// 死信队列配置
@Bean
public Queue mainQueue() {
return QueueBuilder.durable("main.queue")
.withArgument("x-dead-letter-exchange", "dlx.exchange")
.withArgument("x-dead-letter-routing-key", "dlx.routing.key")
.withArgument("x-message-ttl", 300000) // 5分钟TTL
.withArgument("x-max-retries", 3)
.build();
}

@Bean
public DirectExchange deadLetterExchange() {
return new DirectExchange("dlx.exchange");
}

@Bean
public Queue deadLetterQueue() {
return QueueBuilder.durable("dlx.queue").build();
}

@Bean
public Binding deadLetterBinding() {
return BindingBuilder.bind(deadLetterQueue())
.to(deadLetterExchange())
.with("dlx.routing.key");
}

// 延迟队列配置
@Bean
public Queue delayQueue() {
return QueueBuilder.durable("delay.queue")
.withArgument("x-message-ttl", 0) // 由消息自己设置TTL
.withArgument("x-dead-letter-exchange", "process.exchange")
.withArgument("x-dead-letter-routing-key", "process.routing.key")
.build();
}

// 优先级队列配置
@Bean
public Queue priorityQueue() {
return QueueBuilder.durable("priority.queue")
.withArgument("x-max-priority", 10)
.build();
}
}

// RabbitMQ消息确认机制
@Component
public class ReliableMessageProducer {

@Autowired
private RabbitTemplate rabbitTemplate;

@PostConstruct
public void setupCallbacks() {
// 发布确认
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
log.info("Message published successfully: {}", correlationData.getId());
} else {
log.error("Message publish failed: {}, cause: {}", correlationData.getId(), cause);
handlePublishFailure(correlationData);
}
});

// 路由失败回调
rabbitTemplate.setReturnsCallback(returned -> {
log.error("Message returned: {}, reply code: {}, reply text: {}",
returned.getMessage(), returned.getReplyCode(), returned.getReplyText());
handleReturnedMessage(returned);
});
}

public void sendReliableMessage(String exchange, String routingKey, Object message) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

try {
rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
log.info("Message sent with correlation ID: {}", correlationData.getId());
} catch (Exception e) {
log.error("Failed to send message", e);
handleSendException(correlationData, e);
}
}

private void handlePublishFailure(CorrelationData correlationData) {
// 实现重试逻辑或者记录失败信息
failureRecoveryService.recordFailure(correlationData.getId());
}

private void handleReturnedMessage(ReturnedMessage returned) {
// 处理路由失败的消息
String messageId = returned.getMessage().getMessageProperties().getMessageId();
failureRecoveryService.handleReturnedMessage(messageId, returned);
}

private void handleSendException(CorrelationData correlationData, Exception e) {
// 处理发送异常
failureRecoveryService.recordSendException(correlationData.getId(), e);
}
}

// RabbitMQ消费者确认
@Component
public class ReliableMessageConsumer {

@RabbitListener(
queues = "reliable.queue",
ackMode = "MANUAL"
)
public void processMessage(
@Payload ReliableMessage message,
@Header Map<String, Object> headers,
Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {

try {
log.info("Processing message: {}", message.getId());

// 处理业务逻辑
businessService.processMessage(message);

// 手动确认消息
channel.basicAck(deliveryTag, false);

log.info("Message processed successfully: {}", message.getId());

} catch (BusinessException e) {
log.error("Business logic failed for message: {}", message.getId(), e);

try {
// 业务异常,拒绝消息并重新入队
channel.basicNack(deliveryTag, false, true);
} catch (IOException ioException) {
log.error("Failed to nack message", ioException);
}

} catch (Exception e) {
log.error("Unexpected error processing message: {}", message.getId(), e);

try {
// 系统异常,拒绝消息不重新入队
channel.basicNack(deliveryTag, false, false);
} catch (IOException ioException) {
log.error("Failed to nack message", ioException);
}
}
}
}

Apache Kafka实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
// Kafka生产者配置
@Configuration
public class KafkaProducerConfiguration {

@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

// 可靠性配置
configProps.put(ProducerConfig.ACKS_CONFIG, "all"); // 等待所有副本确认
configProps.put(ProducerConfig.RETRIES_CONFIG, 3);
configProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000);
configProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);

// 性能配置
configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
configProps.put(ProducerConfig.LINGER_MS_CONFIG, 5);
configProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
configProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");

// 幂等性配置
configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
KafkaTemplate<String, Object> template = new KafkaTemplate<>(producerFactory());

// 设置默认主题
template.setDefaultTopic("default-topic");

// 设置发送回调
template.setProducerListener(new ProducerListener<String, Object>() {
@Override
public void onSuccess(ProducerRecord<String, Object> producerRecord,
RecordMetadata recordMetadata) {
log.info("Message sent successfully: topic={}, partition={}, offset={}",
recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());
}

@Override
public void onError(ProducerRecord<String, Object> producerRecord,
RecordMetadata recordMetadata, Exception exception) {
log.error("Failed to send message: {}", producerRecord.value(), exception);
}
});

return template;
}
}

// Kafka高级生产者
@Component
public class KafkaEventProducer {

@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;

public CompletableFuture<SendResult<String, Object>> sendEventAsync(
String topic, String key, Object event) {

ProducerRecord<String, Object> record = new ProducerRecord<>(topic, key, event);

// 添加消息头
record.headers().add("messageId", UUID.randomUUID().toString().getBytes());
record.headers().add("timestamp", String.valueOf(System.currentTimeMillis()).getBytes());
record.headers().add("source", "order-service".getBytes());

return kafkaTemplate.send(record)
.addCallback(
result -> {
if (result != null) {
log.info("Event sent successfully: topic={}, partition={}, offset={}",
result.getRecordMetadata().topic(),
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset());
}
},
failure -> {
log.error("Failed to send event: topic={}, key={}", topic, key, failure);
handleSendFailure(topic, key, event, failure);
}
);
}

@Transactional
public void sendEventInTransaction(String topic, String key, Object event) {
try {
kafkaTemplate.executeInTransaction(operations -> {
operations.send(topic, key, event);

// 可以在同一个事务中发送多个消息
operations.send("audit-topic", key, new AuditEvent(event));

return true;
});

log.info("Transactional event sent: topic={}, key={}", topic, key);
} catch (Exception e) {
log.error("Failed to send transactional event", e);
throw e;
}
}

private void handleSendFailure(String topic, String key, Object event, Throwable failure) {
// 实现失败处理逻辑,比如重试或者记录到数据库
FailedMessage failedMessage = new FailedMessage(topic, key, event, failure.getMessage());
failedMessageRepository.save(failedMessage);
}
}

// Kafka消费者配置
@Configuration
public class KafkaConsumerConfiguration {

@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "order-service-group");
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);

// 消费策略配置
configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 手动提交
configProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
configProps.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);
configProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
configProps.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000);

// 反序列化配置
configProps.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example.events");
configProps.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.example.events.BaseEvent");

return new DefaultKafkaConsumerFactory<>(configProps);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());

// 并发配置
factory.setConcurrency(3);

// 手动确认模式
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);

// 错误处理
factory.setCommonErrorHandler(new DefaultErrorHandler(
new FixedBackOff(1000L, 3L) // 重试3次,每次间隔1秒
));

return factory;
}
}

// Kafka消费者实现
@Component
public class KafkaEventConsumer {

@Autowired
private OrderService orderService;

@KafkaListener(
topics = "order-events",
groupId = "order-processing-group",
containerFactory = "kafkaListenerContainerFactory"
)
public void handleOrderEvent(
@Payload OrderEvent event,
@Header Map<String, Object> headers,
Acknowledgment acknowledgment,
ConsumerRecord<String, OrderEvent> record) {

try {
log.info("Received order event: type={}, orderId={}, partition={}, offset={}",
event.getEventType(), event.getOrderId(),
record.partition(), record.offset());

// 幂等性检查
String messageId = new String((byte[]) headers.get("messageId"));
if (messageProcessingService.isProcessed(messageId)) {
log.info("Message already processed: {}", messageId);
acknowledgment.acknowledge();
return;
}

// 处理业务逻辑
switch (event.getEventType()) {
case ORDER_CREATED:
orderService.processOrderCreated((OrderCreatedEvent) event);
break;
case ORDER_CANCELLED:
orderService.processOrderCancelled((OrderCancelledEvent) event);
break;
case ORDER_COMPLETED:
orderService.processOrderCompleted((OrderCompletedEvent) event);
break;
default:
log.warn("Unknown event type: {}", event.getEventType());
}

// 记录处理状态
messageProcessingService.markAsProcessed(messageId);

// 手动确认
acknowledgment.acknowledge();

log.info("Order event processed successfully: {}", event.getOrderId());

} catch (Exception e) {
log.error("Failed to process order event: {}", event.getOrderId(), e);

// 根据异常类型决定是否重试
if (isRetryableException(e)) {
throw e; // 让错误处理器处理重试
} else {
// 不可重试的异常,直接确认消息避免无限重试
acknowledgment.acknowledge();
handleNonRetryableError(event, e);
}
}
}

private boolean isRetryableException(Exception e) {
return e instanceof TransientDataAccessException ||
e instanceof ConnectException ||
e instanceof SocketTimeoutException;
}

private void handleNonRetryableError(OrderEvent event, Exception e) {
// 记录错误信息到数据库或发送告警
errorRecordService.recordError(event, e);
alertService.sendProcessingErrorAlert(event.getOrderId(), e.getMessage());
}
}

RocketMQ实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
// RocketMQ生产者配置
@Configuration
public class RocketMQProducerConfiguration {

@Bean
public DefaultMQProducer defaultMQProducer() {
DefaultMQProducer producer = new DefaultMQProducer("order_producer_group");
producer.setNamesrvAddr("localhost:9876");
producer.setRetryTimesWhenSendFailed(3);
producer.setRetryTimesWhenSendAsyncFailed(3);
producer.setSendMsgTimeout(10000);
producer.setMaxMessageSize(4 * 1024 * 1024); // 4MB

try {
producer.start();
log.info("RocketMQ producer started successfully");
} catch (MQClientException e) {
log.error("Failed to start RocketMQ producer", e);
throw new RuntimeException("Failed to start RocketMQ producer", e);
}

return producer;
}

@Bean
public TransactionMQProducer transactionMQProducer() {
TransactionMQProducer producer = new TransactionMQProducer("transaction_producer_group");
producer.setNamesrvAddr("localhost:9876");
producer.setTransactionListener(new OrderTransactionListener());

try {
producer.start();
log.info("RocketMQ transaction producer started successfully");
} catch (MQClientException e) {
log.error("Failed to start RocketMQ transaction producer", e);
throw new RuntimeException("Failed to start RocketMQ transaction producer", e);
}

return producer;
}
}

// RocketMQ消息生产者
@Component
public class RocketMQEventProducer {

@Autowired
private DefaultMQProducer producer;

@Autowired
private TransactionMQProducer transactionProducer;

public SendResult sendOrderEvent(OrderEvent event) {
try {
Message message = new Message(
"ORDER_TOPIC",
event.getEventType().name(),
JSON.toJSONBytes(event)
);

// 设置消息属性
message.putUserProperty("messageId", UUID.randomUUID().toString());
message.putUserProperty("timestamp", String.valueOf(System.currentTimeMillis()));
message.putUserProperty("source", "order-service");

// 同步发送
SendResult result = producer.send(message);

log.info("Message sent successfully: msgId={}, queueId={}, queueOffset={}",
result.getMsgId(), result.getMessageQueue().getQueueId(), result.getQueueOffset());

return result;

} catch (Exception e) {
log.error("Failed to send order event", e);
throw new RuntimeException("Failed to send order event", e);
}
}

public void sendOrderEventAsync(OrderEvent event, SendCallback callback) {
try {
Message message = new Message(
"ORDER_TOPIC",
event.getEventType().name(),
JSON.toJSONBytes(event)
);

message.putUserProperty("messageId", UUID.randomUUID().toString());
message.putUserProperty("timestamp", String.valueOf(System.currentTimeMillis()));

// 异步发送
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("Async message sent successfully: msgId={}", sendResult.getMsgId());
if (callback != null) {
callback.onSuccess(sendResult);
}
}

@Override
public void onException(Throwable e) {
log.error("Failed to send async message", e);
if (callback != null) {
callback.onException(e);
}
handleSendFailure(event, e);
}
});

} catch (Exception e) {
log.error("Failed to send async order event", e);
throw new RuntimeException("Failed to send async order event", e);
}
}

public void sendTransactionMessage(OrderEvent event, Object localTransactionArg) {
try {
Message message = new Message(
"ORDER_TRANSACTION_TOPIC",
event.getEventType().name(),
JSON.toJSONBytes(event)
);

message.putUserProperty("orderId", String.valueOf(event.getOrderId()));
message.putUserProperty("messageId", UUID.randomUUID().toString());

// 发送事务消息
SendResult result = transactionProducer.sendMessageInTransaction(message, localTransactionArg);

log.info("Transaction message sent: msgId={}, status={}",
result.getMsgId(), result.getSendStatus());

} catch (Exception e) {
log.error("Failed to send transaction message", e);
throw new RuntimeException("Failed to send transaction message", e);
}
}

// 延迟消息
public void sendDelayedMessage(OrderEvent event, int delayLevel) {
try {
Message message = new Message(
"ORDER_DELAY_TOPIC",
event.getEventType().name(),
JSON.toJSONBytes(event)
);

// 设置延迟级别 (1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h)
message.setDelayTimeLevel(delayLevel);

SendResult result = producer.send(message);

log.info("Delayed message sent: msgId={}, delayLevel={}",
result.getMsgId(), delayLevel);

} catch (Exception e) {
log.error("Failed to send delayed message", e);
throw new RuntimeException("Failed to send delayed message", e);
}
}

private void handleSendFailure(OrderEvent event, Throwable e) {
// 记录发送失败的消息
FailedMessage failedMessage = new FailedMessage(
"ORDER_TOPIC",
event.getEventType().name(),
event,
e.getMessage()
);
failedMessageRepository.save(failedMessage);
}
}

// RocketMQ消费者
@Component
public class RocketMQEventConsumer {

@Autowired
private OrderService orderService;

@PostConstruct
public void startConsumer() {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_consumer_group");
consumer.setNamesrvAddr("localhost:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.setConsumeMessageBatchMaxSize(10);

try {
// 订阅主题
consumer.subscribe("ORDER_TOPIC", "*");

// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> messages,
ConsumeConcurrentlyContext context) {

for (MessageExt message : messages) {
try {
processMessage(message);
} catch (Exception e) {
log.error("Failed to process message: {}", message.getMsgId(), e);

// 根据重试次数决定是否继续重试
if (message.getReconsumeTimes() < 3) {
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
} else {
// 超过重试次数,记录错误并消费成功
handleMaxRetryExceeded(message, e);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
}

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

consumer.start();
log.info("RocketMQ consumer started successfully");

} catch (Exception e) {
log.error("Failed to start RocketMQ consumer", e);
throw new RuntimeException("Failed to start RocketMQ consumer", e);
}
}

private void processMessage(MessageExt message) {
String messageId = message.getUserProperty("messageId");

// 幂等性检查
if (messageProcessingService.isProcessed(messageId)) {
log.info("Message already processed: {}", messageId);
return;
}

try {
// 解析消息
String eventType = message.getTags();
OrderEvent event = JSON.parseObject(message.getBody(), OrderEvent.class);

log.info("Processing message: msgId={}, eventType={}, orderId={}",
message.getMsgId(), eventType, event.getOrderId());

// 处理业务逻辑
switch (OrderEventType.valueOf(eventType)) {
case ORDER_CREATED:
orderService.processOrderCreated((OrderCreatedEvent) event);
break;
case ORDER_PAID:
orderService.processOrderPaid((OrderPaidEvent) event);
break;
case ORDER_SHIPPED:
orderService.processOrderShipped((OrderShippedEvent) event);
break;
default:
log.warn("Unknown event type: {}", eventType);
}

// 标记为已处理
messageProcessingService.markAsProcessed(messageId);

log.info("Message processed successfully: {}", message.getMsgId());

} catch (Exception e) {
log.error("Error processing message: {}", message.getMsgId(), e);
throw e;
}
}

private void handleMaxRetryExceeded(MessageExt message, Exception e) {
// 记录到死信表或发送告警
DeadLetterMessage deadLetter = new DeadLetterMessage(
message.getMsgId(),
message.getTopic(),
message.getTags(),
new String(message.getBody()),
e.getMessage(),
message.getReconsumeTimes()
);

deadLetterRepository.save(deadLetter);
alertService.sendDeadLetterAlert(message.getMsgId());
}
}

消息队列高级特性

1. 消息顺序保证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
// 顺序消息生产者
@Component
public class OrderedMessageProducer {

@Autowired
private DefaultMQProducer producer;

public void sendOrderedMessage(OrderEvent event) {
try {
Message message = new Message(
"ORDERED_TOPIC",
event.getEventType().name(),
JSON.toJSONBytes(event)
);

// 使用订单ID作为分区键,确保同一订单的消息发送到同一队列
String orderKey = String.valueOf(event.getOrderId());

SendResult result = producer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
String key = (String) arg;
int index = Math.abs(key.hashCode()) % mqs.size();
return mqs.get(index);
}
}, orderKey);

log.info("Ordered message sent: msgId={}, queueId={}, orderKey={}",
result.getMsgId(), result.getMessageQueue().getQueueId(), orderKey);

} catch (Exception e) {
log.error("Failed to send ordered message", e);
throw new RuntimeException("Failed to send ordered message", e);
}
}
}

// 顺序消息消费者
@Component
public class OrderedMessageConsumer {

@PostConstruct
public void startOrderedConsumer() {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ordered_consumer_group");
consumer.setNamesrvAddr("localhost:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

try {
consumer.subscribe("ORDERED_TOPIC", "*");

// 注册顺序消息监听器
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(
List<MessageExt> messages,
ConsumeOrderlyContext context) {

// 设置自动提交
context.setAutoCommit(true);

for (MessageExt message : messages) {
try {
processOrderedMessage(message);
} catch (Exception e) {
log.error("Failed to process ordered message: {}", message.getMsgId(), e);

// 顺序消息失败时暂停消费
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}

return ConsumeOrderlyStatus.SUCCESS;
}
});

consumer.start();
log.info("Ordered message consumer started successfully");

} catch (Exception e) {
log.error("Failed to start ordered message consumer", e);
throw new RuntimeException("Failed to start ordered message consumer", e);
}
}

private void processOrderedMessage(MessageExt message) {
OrderEvent event = JSON.parseObject(message.getBody(), OrderEvent.class);

log.info("Processing ordered message: orderId={}, eventType={}, queueId={}",
event.getOrderId(), event.getEventType(), message.getQueueId());

// 确保按顺序处理同一订单的事件
orderEventProcessor.processInOrder(event);
}
}

2. 消息去重和幂等性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
// 消息去重服务
@Service
public class MessageDeduplicationService {

@Autowired
private RedisTemplate<String, String> redisTemplate;

private static final String DEDUP_KEY_PREFIX = "msg:dedup:";
private static final int DEDUP_EXPIRE_SECONDS = 3600; // 1小时过期

public boolean isDuplicate(String messageId) {
String key = DEDUP_KEY_PREFIX + messageId;
return Boolean.TRUE.equals(redisTemplate.hasKey(key));
}

public void markAsProcessed(String messageId) {
String key = DEDUP_KEY_PREFIX + messageId;
redisTemplate.opsForValue().set(key, "processed", DEDUP_EXPIRE_SECONDS, TimeUnit.SECONDS);
}

public boolean tryLock(String messageId) {
String lockKey = DEDUP_KEY_PREFIX + "lock:" + messageId;
Boolean acquired = redisTemplate.opsForValue().setIfAbsent(
lockKey, "locked", 300, TimeUnit.SECONDS
);
return Boolean.TRUE.equals(acquired);
}

public void releaseLock(String messageId) {
String lockKey = DEDUP_KEY_PREFIX + "lock:" + messageId;
redisTemplate.delete(lockKey);
}
}

// 幂等性消息处理器
@Component
public class IdempotentMessageProcessor {

@Autowired
private MessageDeduplicationService deduplicationService;

@Autowired
private OrderService orderService;

public void processMessage(String messageId, OrderEvent event) {
// 检查是否已处理
if (deduplicationService.isDuplicate(messageId)) {
log.info("Message already processed: {}", messageId);
return;
}

// 尝试获取锁
if (!deduplicationService.tryLock(messageId)) {
log.info("Message is being processed by another instance: {}", messageId);
return;
}

try {
// 再次检查是否已处理(双重检查)
if (deduplicationService.isDuplicate(messageId)) {
log.info("Message already processed during lock acquisition: {}", messageId);
return;
}

// 处理业务逻辑
processBusinessLogic(event);

// 标记为已处理
deduplicationService.markAsProcessed(messageId);

log.info("Message processed successfully: {}", messageId);

} catch (Exception e) {
log.error("Failed to process message: {}", messageId, e);
throw e;
} finally {
// 释放锁
deduplicationService.releaseLock(messageId);
}
}

private void processBusinessLogic(OrderEvent event) {
switch (event.getEventType()) {
case ORDER_CREATED:
orderService.handleOrderCreated((OrderCreatedEvent) event);
break;
case ORDER_PAID:
orderService.handleOrderPaid((OrderPaidEvent) event);
break;
case ORDER_CANCELLED:
orderService.handleOrderCancelled((OrderCancelledEvent) event);
break;
default:
throw new IllegalArgumentException("Unknown event type: " + event.getEventType());
}
}
}

3. 消息重试和死信处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
// 重试策略配置
@Component
public class MessageRetryHandler {

private static final int MAX_RETRY_TIMES = 3;
private static final long[] RETRY_DELAYS = {1000, 5000, 15000}; // 1s, 5s, 15s

@Autowired
private RedisTemplate<String, String> redisTemplate;

@Autowired
private MessageProducer messageProducer;

public boolean shouldRetry(String messageId, Exception exception) {
int retryCount = getRetryCount(messageId);

// 检查是否超过最大重试次数
if (retryCount >= MAX_RETRY_TIMES) {
return false;
}

// 检查异常类型是否可重试
return isRetryableException(exception);
}

public void scheduleRetry(String messageId, Object message, Exception exception) {
int retryCount = incrementRetryCount(messageId);

if (retryCount > MAX_RETRY_TIMES) {
handleMaxRetryExceeded(messageId, message, exception);
return;
}

long delay = RETRY_DELAYS[Math.min(retryCount - 1, RETRY_DELAYS.length - 1)];

log.info("Scheduling retry {} for message {} after {} ms",
retryCount, messageId, delay);

// 使用延迟消息实现重试
RetryMessage retryMessage = new RetryMessage(messageId, message, retryCount);
messageProducer.sendDelayedMessage("RETRY_TOPIC", retryMessage, delay);
}

private int getRetryCount(String messageId) {
String key = "retry:count:" + messageId;
String count = redisTemplate.opsForValue().get(key);
return count != null ? Integer.parseInt(count) : 0;
}

private int incrementRetryCount(String messageId) {
String key = "retry:count:" + messageId;
Long count = redisTemplate.opsForValue().increment(key);
redisTemplate.expire(key, 1, TimeUnit.HOURS);
return count.intValue();
}

private boolean isRetryableException(Exception exception) {
return exception instanceof ConnectException ||
exception instanceof SocketTimeoutException ||
exception instanceof TransientDataAccessException ||
exception instanceof OptimisticLockingFailureException;
}

private void handleMaxRetryExceeded(String messageId, Object message, Exception exception) {
log.error("Message {} exceeded max retry times, sending to dead letter queue", messageId);

// 发送到死信队列
DeadLetterMessage deadLetter = new DeadLetterMessage(
messageId,
message,
exception.getMessage(),
MAX_RETRY_TIMES,
System.currentTimeMillis()
);

messageProducer.sendToDeadLetterQueue(deadLetter);

// 清理重试计数
redisTemplate.delete("retry:count:" + messageId);

// 发送告警
alertService.sendDeadLetterAlert(messageId, exception.getMessage());
}
}

// 死信队列处理器
@Component
public class DeadLetterQueueProcessor {

@Autowired
private DeadLetterRepository deadLetterRepository;

@Autowired
private AlertService alertService;

@RabbitListener(queues = "dead.letter.queue")
public void processDeadLetter(DeadLetterMessage deadLetter) {
try {
log.info("Processing dead letter message: {}", deadLetter.getMessageId());

// 保存到数据库
DeadLetterRecord record = new DeadLetterRecord(
deadLetter.getMessageId(),
JSON.toJSONString(deadLetter.getOriginalMessage()),
deadLetter.getErrorMessage(),
deadLetter.getRetryCount(),
new Date(deadLetter.getTimestamp())
);

deadLetterRepository.save(record);

// 发送告警
alertService.sendDeadLetterProcessedAlert(deadLetter.getMessageId());

log.info("Dead letter message processed: {}", deadLetter.getMessageId());

} catch (Exception e) {
log.error("Failed to process dead letter message: {}",
deadLetter.getMessageId(), e);
throw e;
}
}

@Scheduled(fixedRate = 300000) // 每5分钟检查一次
public void processDeadLetterRecords() {
try {
List<DeadLetterRecord> unprocessedRecords =
deadLetterRepository.findUnprocessedRecords();

for (DeadLetterRecord record : unprocessedRecords) {
try {
// 尝试人工处理或重新投递
if (canReprocess(record)) {
reprocessMessage(record);
} else {
markAsManualProcessingRequired(record);
}
} catch (Exception e) {
log.error("Failed to reprocess dead letter record: {}",
record.getMessageId(), e);
}
}
} catch (Exception e) {
log.error("Error during dead letter records processing", e);
}
}

private boolean canReprocess(DeadLetterRecord record) {
// 检查是否可以重新处理(比如系统问题已修复)
return record.getCreatedAt().before(
Date.from(Instant.now().minus(1, ChronoUnit.HOURS))
) && !record.getErrorMessage().contains("ValidationException");
}

private void reprocessMessage(DeadLetterRecord record) {
try {
Object originalMessage = JSON.parseObject(
record.getOriginalMessage(), Object.class
);

// 重新投递到原始队列
messageProducer.republishMessage(originalMessage);

// 标记为已重新处理
record.setReprocessed(true);
record.setReprocessedAt(new Date());
deadLetterRepository.save(record);

log.info("Dead letter message reprocessed: {}", record.getMessageId());

} catch (Exception e) {
log.error("Failed to reprocess message: {}", record.getMessageId(), e);
throw e;
}
}

private void markAsManualProcessingRequired(DeadLetterRecord record) {
record.setRequiresManualProcessing(true);
deadLetterRepository.save(record);

alertService.sendManualProcessingRequiredAlert(record.getMessageId());
}
}

性能优化与监控

1. 性能优化策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
// 批量消息处理
@Component
public class BatchMessageProcessor {

private final List<OrderEvent> eventBatch = new ArrayList<>();
private final Object batchLock = new Object();
private static final int BATCH_SIZE = 100;
private static final long BATCH_TIMEOUT_MS = 5000;

@Autowired
private OrderService orderService;

@Scheduled(fixedRate = 1000)
public void processBatch() {
List<OrderEvent> currentBatch;

synchronized (batchLock) {
if (eventBatch.isEmpty()) {
return;
}

currentBatch = new ArrayList<>(eventBatch);
eventBatch.clear();
}

if (!currentBatch.isEmpty()) {
processBatchEvents(currentBatch);
}
}

public void addEvent(OrderEvent event) {
synchronized (batchLock) {
eventBatch.add(event);

// 达到批次大小时立即处理
if (eventBatch.size() >= BATCH_SIZE) {
List<OrderEvent> batchToProcess = new ArrayList<>(eventBatch);
eventBatch.clear();

// 异步处理批次
CompletableFuture.runAsync(() -> processBatchEvents(batchToProcess));
}
}
}

private void processBatchEvents(List<OrderEvent> events) {
try {
log.info("Processing batch of {} events", events.size());

// 按事件类型分组
Map<OrderEventType, List<OrderEvent>> groupedEvents = events.stream()
.collect(Collectors.groupingBy(OrderEvent::getEventType));

// 批量处理每种类型的事件
for (Map.Entry<OrderEventType, List<OrderEvent>> entry : groupedEvents.entrySet()) {
processBatchByType(entry.getKey(), entry.getValue());
}

log.info("Batch processing completed for {} events", events.size());

} catch (Exception e) {
log.error("Failed to process event batch", e);

// 批量处理失败时,逐个处理
for (OrderEvent event : events) {
try {
orderService.processEvent(event);
} catch (Exception ex) {
log.error("Failed to process individual event: {}", event.getOrderId(), ex);
}
}
}
}

private void processBatchByType(OrderEventType eventType, List<OrderEvent> events) {
switch (eventType) {
case ORDER_CREATED:
orderService.batchProcessOrderCreated(
events.stream()
.map(e -> (OrderCreatedEvent) e)
.collect(Collectors.toList())
);
break;
case ORDER_PAID:
orderService.batchProcessOrderPaid(
events.stream()
.map(e -> (OrderPaidEvent) e)
.collect(Collectors.toList())
);
break;
default:
// 逐个处理不支持批量的事件类型
for (OrderEvent event : events) {
orderService.processEvent(event);
}
}
}
}

// 连接池优化
@Configuration
public class MessageQueueConnectionConfiguration {

@Bean
public CachingConnectionFactory connectionFactory() {
CachingConnectionFactory factory = new CachingConnectionFactory("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");

// 连接池配置
factory.setConnectionCacheSize(10);
factory.setChannelCacheSize(100);
factory.setChannelCheckoutTimeout(30000);

// 连接超时配置
factory.getRabbitConnectionFactory().setConnectionTimeout(30000);
factory.getRabbitConnectionFactory().setHandshakeTimeout(10000);
factory.getRabbitConnectionFactory().setShutdownTimeout(5000);

// 心跳配置
factory.getRabbitConnectionFactory().setRequestedHeartBeat(60);

return factory;
}

@Bean
public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);

// 性能优化配置
template.setMandatory(true);
template.setChannelTransacted(false);
template.setReceiveTimeout(5000);
template.setReplyTimeout(5000);

// 消息转换器
template.setMessageConverter(new Jackson2JsonMessageConverter());

return template;
}
}

2. 监控指标收集

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
// 消息队列监控服务
@Service
public class MessageQueueMonitoringService {

private final MeterRegistry meterRegistry;
private final Counter messagesSentCounter;
private final Counter messagesReceivedCounter;
private final Counter messagesFailedCounter;
private final Timer messageProcessingTimer;
private final Gauge queueSizeGauge;

public MessageQueueMonitoringService(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;

this.messagesSentCounter = Counter.builder("messages.sent")
.description("Total number of messages sent")
.register(meterRegistry);

this.messagesReceivedCounter = Counter.builder("messages.received")
.description("Total number of messages received")
.register(meterRegistry);

this.messagesFailedCounter = Counter.builder("messages.failed")
.description("Total number of failed messages")
.register(meterRegistry);

this.messageProcessingTimer = Timer.builder("message.processing.time")
.description("Message processing time")
.register(meterRegistry);

this.queueSizeGauge = Gauge.builder("queue.size")
.description("Current queue size")
.register(meterRegistry, this, MessageQueueMonitoringService::getCurrentQueueSize);
}

public void recordMessageSent(String topic, String eventType) {
messagesSentCounter.increment(
Tags.of(
"topic", topic,
"event_type", eventType
)
);
}

public void recordMessageReceived(String topic, String eventType) {
messagesReceivedCounter.increment(
Tags.of(
"topic", topic,
"event_type", eventType
)
);
}

public void recordMessageFailed(String topic, String eventType, String errorType) {
messagesFailedCounter.increment(
Tags.of(
"topic", topic,
"event_type", eventType,
"error_type", errorType
)
);
}

public Timer.Sample startProcessingTimer() {
return Timer.start(meterRegistry);
}

public void recordProcessingTime(Timer.Sample sample, String topic, String eventType) {
sample.stop(Timer.builder("message.processing.time")
.tags(
"topic", topic,
"event_type", eventType
)
.register(meterRegistry));
}

private double getCurrentQueueSize() {
// 实现获取当前队列大小的逻辑
try {
// 这里可以调用RabbitMQ Management API或其他方式获取队列大小
return queueSizeService.getQueueSize("main.queue");
} catch (Exception e) {
log.error("Failed to get queue size", e);
return 0;
}
}
}

// 监控切面
@Aspect
@Component
public class MessageProcessingMonitoringAspect {

@Autowired
private MessageQueueMonitoringService monitoringService;

@Around("@annotation(MessageProcessingMonitored)")
public Object monitorMessageProcessing(ProceedingJoinPoint joinPoint) throws Throwable {
Timer.Sample sample = monitoringService.startProcessingTimer();

try {
Object result = joinPoint.proceed();

// 记录成功处理
recordSuccessMetrics(joinPoint);

return result;
} catch (Exception e) {
// 记录失败指标
recordFailureMetrics(joinPoint, e);
throw e;
} finally {
// 记录处理时间
recordProcessingTime(sample, joinPoint);
}
}

private void recordSuccessMetrics(ProceedingJoinPoint joinPoint) {
// 从方法参数或注解中提取topic和eventType
String topic = extractTopic(joinPoint);
String eventType = extractEventType(joinPoint);

monitoringService.recordMessageReceived(topic, eventType);
}

private void recordFailureMetrics(ProceedingJoinPoint joinPoint, Exception e) {
String topic = extractTopic(joinPoint);
String eventType = extractEventType(joinPoint);
String errorType = e.getClass().getSimpleName();

monitoringService.recordMessageFailed(topic, eventType, errorType);
}

private void recordProcessingTime(Timer.Sample sample, ProceedingJoinPoint joinPoint) {
String topic = extractTopic(joinPoint);
String eventType = extractEventType(joinPoint);

monitoringService.recordProcessingTime(sample, topic, eventType);
}

private String extractTopic(ProceedingJoinPoint joinPoint) {
// 实现从方法参数或注解中提取topic的逻辑
return "default-topic";
}

private String extractEventType(ProceedingJoinPoint joinPoint) {
// 实现从方法参数或注解中提取eventType的逻辑
return "default-event";
}
}

// 监控注解
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface MessageProcessingMonitored {
String topic() default "";
String eventType() default "";
}

3. 健康检查和告警

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
// 消息队列健康检查
@Component
public class MessageQueueHealthIndicator implements HealthIndicator {

@Autowired
private RabbitTemplate rabbitTemplate;

@Autowired
private QueueSizeService queueSizeService;

@Override
public Health health() {
try {
// 检查连接状态
boolean connectionHealthy = checkConnection();

// 检查队列状态
Map<String, Integer> queueSizes = checkQueueSizes();

// 检查消费者状态
boolean consumersHealthy = checkConsumers();

if (connectionHealthy && consumersHealthy && !hasQueueBacklog(queueSizes)) {
return Health.up()
.withDetail("connection", "healthy")
.withDetail("consumers", "healthy")
.withDetail("queueSizes", queueSizes)
.build();
} else {
return Health.down()
.withDetail("connection", connectionHealthy ? "healthy" : "unhealthy")
.withDetail("consumers", consumersHealthy ? "healthy" : "unhealthy")
.withDetail("queueSizes", queueSizes)
.withDetail("hasBacklog", hasQueueBacklog(queueSizes))
.build();
}
} catch (Exception e) {
return Health.down()
.withDetail("error", e.getMessage())
.build();
}
}

private boolean checkConnection() {
try {
rabbitTemplate.execute(channel -> {
channel.queueDeclarePassive("health-check-queue");
return true;
});
return true;
} catch (Exception e) {
log.error("RabbitMQ connection check failed", e);
return false;
}
}

private Map<String, Integer> checkQueueSizes() {
Map<String, Integer> queueSizes = new HashMap<>();

List<String> queues = Arrays.asList(
"order.queue",
"inventory.queue",
"payment.queue",
"notification.queue"
);

for (String queue : queues) {
try {
int size = queueSizeService.getQueueSize(queue);
queueSizes.put(queue, size);
} catch (Exception e) {
log.error("Failed to get size for queue: {}", queue, e);
queueSizes.put(queue, -1); // -1 表示获取失败
}
}

return queueSizes;
}

private boolean checkConsumers() {
try {
// 检查消费者是否正常运行
return consumerStatusService.areConsumersHealthy();
} catch (Exception e) {
log.error("Consumer health check failed", e);
return false;
}
}

private boolean hasQueueBacklog(Map<String, Integer> queueSizes) {
return queueSizes.values().stream()
.anyMatch(size -> size > 1000); // 队列积压超过1000条消息
}
}

// 告警服务
@Service
public class MessageQueueAlertService {

@Autowired
private NotificationService notificationService;

@EventListener
public void handleQueueBacklogAlert(QueueBacklogEvent event) {
String message = String.format(
"队列积压告警: 队列 %s 当前积压 %d 条消息,超过阈值 %d",
event.getQueueName(),
event.getCurrentSize(),
event.getThreshold()
);

AlertMessage alert = AlertMessage.builder()
.title("消息队列积压告警")
.content(message)
.level(AlertLevel.WARNING)
.timestamp(LocalDateTime.now())
.build();

notificationService.sendAlert(alert);
}

@EventListener
public void handleConsumerDownAlert(ConsumerDownEvent event) {
String message = String.format(
"消费者下线告警: 消费者 %s 已下线,队列 %s 可能无法正常消费",
event.getConsumerName(),
event.getQueueName()
);

AlertMessage alert = AlertMessage.builder()
.title("消费者下线告警")
.content(message)
.level(AlertLevel.CRITICAL)
.timestamp(LocalDateTime.now())
.build();

notificationService.sendAlert(alert);
}

@EventListener
public void handleMessageProcessingFailureAlert(MessageProcessingFailureEvent event) {
if (event.getFailureCount() > 10) { // 连续失败超过10次
String message = String.format(
"消息处理失败告警: 消息类型 %s 连续失败 %d 次,最新错误: %s",
event.getEventType(),
event.getFailureCount(),
event.getLastError()
);

AlertMessage alert = AlertMessage.builder()
.title("消息处理失败告警")
.content(message)
.level(AlertLevel.CRITICAL)
.timestamp(LocalDateTime.now())
.build();

notificationService.sendAlert(alert);
}
}
}

最佳实践与总结

1. 设计原则

消息设计原则

  • 消息应该是自包含的,包含处理所需的所有信息
  • 消息应该是不可变的,避免在传输过程中被修改
  • 消息格式应该向后兼容,支持系统演进
  • 消息大小应该合理,避免过大影响性能

队列设计原则

  • 按业务领域划分队列,避免单一队列承载过多职责
  • 合理设置队列的持久化、TTL、死信等属性
  • 考虑消息的优先级和顺序要求
  • 设计合适的路由策略

2. 运维建议

容量规划

  • 根据业务量预估消息吞吐量
  • 考虑峰值流量的处理能力
  • 预留足够的存储空间
  • 定期评估和调整配置

监控告警

  • 监控队列长度、消费速率、错误率等关键指标
  • 设置合理的告警阈值
  • 建立完善的日志记录和追踪机制
  • 定期进行健康检查

故障处理

  • 建立完善的重试和降级机制
  • 设计合理的死信队列处理流程
  • 准备应急预案和回滚方案
  • 定期进行故障演练

3. 技术选型建议

RabbitMQ适用场景

  • 对消息可靠性要求较高
  • 需要复杂的路由规则
  • 消息量中等规模
  • 需要丰富的管理界面

Kafka适用场景

  • 高吞吐量场景
  • 需要消息持久化和回放
  • 流式数据处理
  • 大数据场景

RocketMQ适用场景

  • 需要事务消息
  • 对顺序消息有要求
  • 需要延迟消息
  • 金融等对可靠性要求极高的场景

结语

分布式消息队列作为现代微服务架构的重要组件,为系统间的异步通信提供了可靠的解决方案。通过合理的设计和实现,消息队列能够显著提升系统的可扩展性、可靠性和性能。

在实际应用中,我们需要根据具体的业务场景选择合适的消息队列技术,并结合监控、告警、重试等机制,构建一个健壮的消息处理系统。同时,要持续关注系统的性能表现,及时进行优化和调整,确保消息队列能够稳定高效地支撑业务发展。

随着云原生技术的发展,消息队列也在向更加智能化、自动化的方向演进。未来,我们可以期待更多基于AI的智能路由、自动扩缩容、故障自愈等特性,进一步简化消息队列的运维管理,提升系统的自治能力。

版权所有,如有侵权请联系我