消息队列与中间件应用
概述
消息队列是分布式系统中重要的中间件组件,用于系统解耦、异步处理、流量削峰等场景。本文涵盖了RabbitMQ、Kafka、RocketMQ等主流消息队列的原理和实战应用。
核心面试问题
1. 消息队列基础概念
面试问题:消息队列的作用是什么?不同MQ产品的特点和适用场景?
消息队列的作用
// 1. 系统解耦示例
@Service
public class OrderService {
@Autowired
private MessageSender messageSender;
@Autowired
private OrderRepository orderRepository;
// 同步处理方式(紧耦合)
public OrderResult createOrderSync(CreateOrderRequest request) {
try {
// 1. 创建订单
Order order = new Order(request);
orderRepository.save(order);
// 2. 扣减库存(同步调用)
inventoryService.reduceStock(request.getProductId(), request.getQuantity());
// 3. 发送短信(同步调用)
smsService.sendOrderNotification(request.getUserId(), order.getOrderNo());
// 4. 更新积分(同步调用)
pointService.addPoints(request.getUserId(), order.getAmount());
return OrderResult.success(order.getId());
} catch (Exception e) {
// 任何一个步骤失败都会影响整个流程
return OrderResult.failure("订单创建失败: " + e.getMessage());
}
}
// 异步处理方式(解耦)
public OrderResult createOrderAsync(CreateOrderRequest request) {
try {
// 1. 创建订单(核心业务逻辑)
Order order = new Order(request);
orderRepository.save(order);
// 2. 发送消息,异步处理后续步骤
OrderCreatedEvent event = new OrderCreatedEvent(
order.getId(),
order.getUserId(),
order.getProductId(),
order.getQuantity(),
order.getAmount(),
order.getOrderNo()
);
messageSender.sendMessage("order.created", event);
return OrderResult.success(order.getId());
} catch (Exception e) {
return OrderResult.failure("订单创建失败: " + e.getMessage());
}
}
}
// 消息处理器
@Component
public class OrderEventHandler {
@Autowired
private InventoryService inventoryService;
@Autowired
private SmsService smsService;
@Autowired
private PointService pointService;
@RabbitListener(queues = "inventory.reduce")
public void handleInventoryReduce(OrderCreatedEvent event) {
try {
inventoryService.reduceStock(event.getProductId(), event.getQuantity());
System.out.println("库存扣减完成: " + event.getOrderId());
} catch (Exception e) {
System.err.println("库存扣减失败: " + e.getMessage());
// 发送补偿消息或重试
}
}
@RabbitListener(queues = "notification.sms")
public void handleSmsNotification(OrderCreatedEvent event) {
try {
smsService.sendOrderNotification(event.getUserId(), event.getOrderNo());
System.out.println("短信发送完成: " + event.getOrderId());
} catch (Exception e) {
System.err.println("短信发送失败: " + e.getMessage());
// 短信发送失败不影响主流程,可以重试或记录
}
}
@RabbitListener(queues = "point.add")
public void handlePointAdd(OrderCreatedEvent event) {
try {
pointService.addPoints(event.getUserId(), event.getAmount());
System.out.println("积分添加完成: " + event.getOrderId());
} catch (Exception e) {
System.err.println("积分添加失败: " + e.getMessage());
}
}
}
流量削峰应用
// 秒杀场景的流量削峰
@Service
public class SeckillService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private MessageSender messageSender;
// 秒杀请求入队
public SeckillResult participateInSeckill(Long userId, Long productId) {
String redisKey = "seckill:stock:" + productId;
// 1. Redis预扣库存(原子操作)
Long remainingStock = redisTemplate.opsForValue().decrement(redisKey);
if (remainingStock < 0) {
// 库存不足,恢复库存
redisTemplate.opsForValue().increment(redisKey);
return SeckillResult.failure("商品已售完");
}
// 2. 生成秒杀订单号
String seckillOrderNo = generateSeckillOrderNo(userId, productId);
// 3. 发送消息到队列,异步处理真实订单创建
SeckillOrder seckillOrder = SeckillOrder.builder()
.userId(userId)
.productId(productId)
.seckillOrderNo(seckillOrderNo)
.timestamp(System.currentTimeMillis())
.build();
messageSender.sendMessage("seckill.order.create", seckillOrder);
return SeckillResult.success(seckillOrderNo, "秒杀成功,正在生成订单");
}
@RabbitListener(queues = "seckill.order.create")
public void processSeckillOrder(SeckillOrder seckillOrder) {
try {
// 创建真实订单
CreateOrderRequest request = new CreateOrderRequest();
request.setUserId(seckillOrder.getUserId());
request.setProductId(seckillOrder.getProductId());
request.setQuantity(1);
request.setSeckillOrderNo(seckillOrder.getSeckillOrderNo());
OrderResult result = orderService.createSeckillOrder(request);
if (result.isSuccess()) {
// 通知用户秒杀成功
notifyUser(seckillOrder.getUserId(), "秒杀订单创建成功", result.getOrderId());
} else {
// 订单创建失败,恢复Redis库存
String redisKey = "seckill:stock:" + seckillOrder.getProductId();
redisTemplate.opsForValue().increment(redisKey);
notifyUser(seckillOrder.getUserId(), "秒杀失败", result.getMessage());
}
} catch (Exception e) {
System.err.println("处理秒杀订单失败: " + e.getMessage());
// 恢复库存并通知用户
}
}
private String generateSeckillOrderNo(Long userId, Long productId) {
return "SK" + System.currentTimeMillis() + userId + productId;
}
private void notifyUser(Long userId, String message, Object data) {
// 通过WebSocket、短信或推送通知用户
}
}
2. RabbitMQ深入应用
面试问题:RabbitMQ的消息确认机制,如何保证消息不丢失?
RabbitMQ配置与使用
@Configuration
@EnableRabbit
public class RabbitMQConfig {
// 直连交换机配置
@Bean
public DirectExchange orderDirectExchange() {
return ExchangeBuilder.directExchange("order.direct.exchange")
.durable(true)
.build();
}
// 主题交换机配置
@Bean
public TopicExchange orderTopicExchange() {
return ExchangeBuilder.topicExchange("order.topic.exchange")
.durable(true)
.build();
}
// 死信交换机配置
@Bean
public DirectExchange deadLetterExchange() {
return ExchangeBuilder.directExchange("order.dead.letter.exchange")
.durable(true)
.build();
}
// 延时队列配置
@Bean
public Queue delayQueue() {
return QueueBuilder.durable("order.delay.queue")
.withArgument("x-message-ttl", 30000) // 30秒TTL
.withArgument("x-dead-letter-exchange", "order.dead.letter.exchange")
.withArgument("x-dead-letter-routing-key", "order.timeout")
.build();
}
// 死信队列配置
@Bean
public Queue deadLetterQueue() {
return QueueBuilder.durable("order.dead.letter.queue").build();
}
// 普通队列配置
@Bean
public Queue orderQueue() {
return QueueBuilder.durable("order.process.queue")
.withArgument("x-dead-letter-exchange", "order.dead.letter.exchange")
.withArgument("x-dead-letter-routing-key", "order.failed")
.build();
}
// 绑定关系
@Bean
public Binding orderQueueBinding() {
return BindingBuilder.bind(orderQueue())
.to(orderDirectExchange())
.with("order.create");
}
@Bean
public Binding deadLetterQueueBinding() {
return BindingBuilder.bind(deadLetterQueue())
.to(deadLetterExchange())
.with("order.failed");
}
// RabbitTemplate配置
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
// 开启发送确认
template.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
System.out.println("消息发送成功: " + correlationData);
} else {
System.err.println("消息发送失败: " + cause);
}
});
// 开启路由失败回调
template.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
System.err.println("消息路由失败: " + replyText + ", 交换机: " + exchange +
", 路由键: " + routingKey);
});
template.setMandatory(true); // 开启路由失败回调
return template;
}
}
// 消息发送服务
@Service
public class RabbitMQMessageSender {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
// 可靠消息发送
public void sendReliableMessage(String exchange, String routingKey, Object message) {
String messageId = UUID.randomUUID().toString();
try {
// 1. 先存储消息到Redis(消息表)
MessageRecord record = new MessageRecord(messageId, exchange, routingKey, message);
redisTemplate.opsForValue().set("message:" + messageId, record, Duration.ofHours(24));
// 2. 发送消息
CorrelationData correlationData = new CorrelationData(messageId);
rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
System.out.println("消息发送: " + messageId);
} catch (Exception e) {
System.err.println("消息发送异常: " + e.getMessage());
throw e;
}
}
// 事务消息发送
@Transactional
public void sendTransactionalMessage(String exchange, String routingKey, Object message) {
// 1. 执行本地事务
performLocalTransaction();
// 2. 事务成功后发送消息
rabbitTemplate.convertAndSend(exchange, routingKey, message);
}
private void performLocalTransaction() {
// 本地事务逻辑
}
// 延时消息发送
public void sendDelayMessage(String exchange, String routingKey, Object message, int delaySeconds) {
MessageProperties properties = new MessageProperties();
properties.setExpiration(String.valueOf(delaySeconds * 1000));
Message msg = new Message(JSON.toJSONBytes(message), properties);
rabbitTemplate.send(exchange, routingKey, msg);
}
}
// 消息消费者
@Component
public class OrderMessageConsumer {
@Autowired
private OrderService orderService;
// 手动确认消息
@RabbitListener(queues = "order.process.queue", ackMode = "MANUAL")
public void processOrder(Message message, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
try {
String messageBody = new String(message.getBody());
OrderCreatedEvent event = JSON.parseObject(messageBody, OrderCreatedEvent.class);
// 处理业务逻辑
orderService.processOrderEvent(event);
// 手动确认消息
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
System.err.println("消息处理失败: " + e.getMessage());
try {
// 检查重试次数
Integer retryCount = (Integer) message.getMessageProperties().getHeaders().get("retry-count");
if (retryCount == null) {
retryCount = 0;
}
if (retryCount < 3) {
// 重试次数未达到上限,重新入队
channel.basicNack(deliveryTag, false, true);
} else {
// 达到重试上限,拒绝消息(进入死信队列)
channel.basicNack(deliveryTag, false, false);
}
} catch (IOException ioException) {
System.err.println("消息确认失败: " + ioException.getMessage());
}
}
}
// 死信队列处理
@RabbitListener(queues = "order.dead.letter.queue")
public void handleDeadLetter(OrderCreatedEvent event) {
System.err.println("处理死信消息: " + event.getOrderId());
// 记录到数据库,人工处理
recordFailedMessage(event);
// 或者发送告警通知
sendAlert("订单处理失败", event);
}
private void recordFailedMessage(OrderCreatedEvent event) {
// 记录失败消息到数据库
}
private void sendAlert(String title, Object data) {
// 发送告警通知
}
}
3. Kafka高性能应用
面试问题:Kafka的分区机制,如何保证消息顺序性?
Kafka配置与使用
@Configuration
@EnableKafka
public class KafkaConfig {
// Producer配置
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
// 性能优化配置
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // 批量大小
props.put(ProducerConfig.LINGER_MS_CONFIG, 5); // 批量延迟
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"); // 压缩算法
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); // 缓冲区大小
// 可靠性配置
props.put(ProducerConfig.ACKS_CONFIG, "all"); // 等待所有副本确认
props.put(ProducerConfig.RETRIES_CONFIG, 3); // 重试次数
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 幂等性
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
KafkaTemplate<String, Object> template = new KafkaTemplate<>(producerFactory());
// 设置发送回调
template.setProducerListener(new ProducerListener<String, Object>() {
@Override
public void onSuccess(ProducerRecord<String, Object> producerRecord,
RecordMetadata recordMetadata) {
System.out.println("消息发送成功: " + recordMetadata);
}
@Override
public void onError(ProducerRecord<String, Object> producerRecord,
Exception exception) {
System.err.println("消息发送失败: " + exception.getMessage());
}
});
return template;
}
// Consumer配置
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-service-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
// 消费策略配置
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 手动提交
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500); // 批量拉取
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024); // 最小拉取字节数
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500); // 最大等待时间
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// 并发配置
factory.setConcurrency(3);
// 手动确认模式
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
// 错误处理
factory.setErrorHandler(new SeekToCurrentErrorHandler(
new FixedBackOff(1000L, 3), // 重试3次,间隔1秒
(record, exception) -> {
System.err.println("消息处理失败,发送到死信队列: " + record.value());
// 发送到死信队列或记录到数据库
}
));
return factory;
}
}
// Kafka消息发送服务
@Service
public class KafkaMessageSender {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
// 发送消息(保证顺序性)
public void sendOrderedMessage(String topic, String key, Object message) {
// 使用相同的key保证消息发送到同一分区,从而保证顺序性
kafkaTemplate.send(topic, key, message).addCallback(
result -> System.out.println("消息发送成功: " + result.getRecordMetadata()),
failure -> System.err.println("消息发送失败: " + failure.getMessage())
);
}
// 发送事务消息
@Transactional("kafkaTransactionManager")
public void sendTransactionalMessage(String topic, Object message) {
kafkaTemplate.send(topic, message);
kafkaTemplate.flush(); // 确保消息发送
}
// 批量发送消息
public void sendBatchMessages(String topic, List<Object> messages) {
for (Object message : messages) {
kafkaTemplate.send(topic, message);
}
kafkaTemplate.flush(); // 强制发送
}
}
// Kafka消息消费者
@Component
public class KafkaOrderConsumer {
@Autowired
private OrderService orderService;
// 普通消费
@KafkaListener(topics = "order-events", groupId = "order-service-group")
public void handleOrderEvent(OrderCreatedEvent event, Acknowledgment ack) {
try {
System.out.println("处理订单事件: " + event.getOrderId());
orderService.processOrderEvent(event);
// 手动确认
ack.acknowledge();
} catch (Exception e) {
System.err.println("处理订单事件失败: " + e.getMessage());
// 不确认消息,触发重试
}
}
// 批量消费
@KafkaListener(topics = "order-batch-events", groupId = "batch-order-group")
public void handleOrderEventsBatch(List<OrderCreatedEvent> events, Acknowledgment ack) {
try {
System.out.println("批量处理订单事件: " + events.size() + " 条");
orderService.batchProcessOrderEvents(events);
ack.acknowledge();
} catch (Exception e) {
System.err.println("批量处理订单事件失败: " + e.getMessage());
}
}
// 分区消费(保证顺序性)
@KafkaListener(
topicPartitions = @TopicPartition(topic = "order-sequence-events", partitions = {"0", "1", "2"}),
groupId = "sequence-order-group"
)
public void handleSequentialOrderEvents(
OrderCreatedEvent event,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
Acknowledgment ack
) {
try {
System.out.println("顺序处理订单事件 - 分区: " + partition + ", 订单: " + event.getOrderId());
// 按分区顺序处理
orderService.processOrderEventSequentially(event, partition);
ack.acknowledge();
} catch (Exception e) {
System.err.println("顺序处理失败: " + e.getMessage());
}
}
}
4. 消息幂等性与重复消费
面试问题:如何保证消息消费的幂等性?如何处理重复消息?
幂等性保证机制
@Service
public class IdempotentMessageProcessor {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private JdbcTemplate jdbcTemplate;
// 基于Redis的幂等性控制
public boolean processMessageWithRedisIdempotent(String messageId, Runnable businessLogic) {
String idempotentKey = "message:idempotent:" + messageId;
// 尝试设置幂等键,过期时间24小时
Boolean success = redisTemplate.opsForValue().setIfAbsent(
idempotentKey, "processed", Duration.ofHours(24));
if (Boolean.TRUE.equals(success)) {
try {
// 第一次处理,执行业务逻辑
businessLogic.run();
return true;
} catch (Exception e) {
// 处理失败,删除幂等键,允许重试
redisTemplate.delete(idempotentKey);
throw e;
}
} else {
// 重复消息,忽略处理
System.out.println("重复消息,忽略处理: " + messageId);
return false;
}
}
// 基于数据库的幂等性控制
public boolean processMessageWithDatabaseIdempotent(String messageId, Runnable businessLogic) {
String insertSql = """
INSERT INTO message_idempotent (message_id, status, create_time)
VALUES (?, 'PROCESSING', NOW())
""";
try {
// 尝试插入记录
int inserted = jdbcTemplate.update(insertSql, messageId);
if (inserted > 0) {
try {
// 执行业务逻辑
businessLogic.run();
// 更新状态为已完成
String updateSql = "UPDATE message_idempotent SET status = 'COMPLETED' WHERE message_id = ?";
jdbcTemplate.update(updateSql, messageId);
return true;
} catch (Exception e) {
// 处理失败,删除记录或标记为失败
String deleteSql = "DELETE FROM message_idempotent WHERE message_id = ?";
jdbcTemplate.update(deleteSql, messageId);
throw e;
}
} else {
// 记录已存在,重复消息
System.out.println("重复消息,忽略处理: " + messageId);
return false;
}
} catch (DuplicateKeyException e) {
// 唯一键约束冲突,重复消息
System.out.println("重复消息检测到: " + messageId);
return false;
}
}
// 业务幂等性示例:订单支付
@RabbitListener(queues = "payment.process")
public void handlePaymentMessage(PaymentMessage message, Acknowledgment ack) {
String messageId = message.getMessageId();
boolean processed = processMessageWithRedisIdempotent(messageId, () -> {
processPayment(message);
});
if (processed) {
System.out.println("支付处理完成: " + message.getOrderId());
}
ack.acknowledge();
}
private void processPayment(PaymentMessage message) {
// 检查订单状态
String checkOrderSql = "SELECT status FROM user_order WHERE id = ?";
Integer orderStatus = jdbcTemplate.queryForObject(checkOrderSql, Integer.class, message.getOrderId());
if (orderStatus == null) {
throw new RuntimeException("订单不存在: " + message.getOrderId());
}
if (orderStatus == 2) { // 已支付
System.out.println("订单已支付,忽略重复支付: " + message.getOrderId());
return;
}
if (orderStatus != 1) { // 非待支付状态
throw new RuntimeException("订单状态不正确: " + orderStatus);
}
// 执行支付逻辑
String updateSql = "UPDATE user_order SET status = 2, pay_time = NOW() WHERE id = ? AND status = 1";
int updated = jdbcTemplate.update(updateSql, message.getOrderId());
if (updated == 0) {
throw new RuntimeException("订单状态更新失败,可能已被其他进程处理");
}
// 发送支付成功通知
sendPaymentSuccessNotification(message.getOrderId(), message.getUserId());
}
private void sendPaymentSuccessNotification(Long orderId, Long userId) {
// 发送支付成功通知
}
}
// 消息去重过滤器
@Component
public class MessageDeduplicationFilter {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
private final BloomFilter<String> bloomFilter;
public MessageDeduplicationFilter() {
// 初始化布隆过滤器
this.bloomFilter = BloomFilter.create(
Funnels.stringFunnel(Charset.defaultCharset()),
1000000, // 预期插入数量
0.01 // 误判率
);
}
// 基于布隆过滤器的快速去重
public boolean isMessageDuplicate(String messageId) {
// 先检查布隆过滤器
if (!bloomFilter.mightContain(messageId)) {
// 肯定不存在
bloomFilter.put(messageId);
return false;
}
// 可能存在,进一步检查Redis
String key = "message:processed:" + messageId;
Boolean exists = redisTemplate.hasKey(key);
if (Boolean.TRUE.equals(exists)) {
return true; // 确实重复
} else {
// 布隆过滤器误判,标记为已处理
redisTemplate.opsForValue().set(key, "1", Duration.ofDays(7));
return false;
}
}
// 消息消费前置过滤
@EventListener
public void handleMessage(MessageEvent event) {
if (isMessageDuplicate(event.getMessageId())) {
System.out.println("检测到重复消息,忽略处理: " + event.getMessageId());
return;
}
// 处理消息
processMessage(event);
}
private void processMessage(MessageEvent event) {
// 实际的消息处理逻辑
}
}
5. 消息队列监控与运维
面试问题:如何监控消息队列的性能?如何处理消息积压问题?
监控指标收集
@Component
public class MessageQueueMonitor {
@Autowired
private RabbitAdmin rabbitAdmin;
@Autowired
private MeterRegistry meterRegistry;
@Autowired
private KafkaAdmin kafkaAdmin;
// RabbitMQ监控
@Scheduled(fixedRate = 30000) // 每30秒收集一次
public void collectRabbitMQMetrics() {
try {
// 获取队列信息
Properties queueInfo = rabbitAdmin.getQueueProperties("order.process.queue");
if (queueInfo != null) {
Integer messageCount = (Integer) queueInfo.get(RabbitAdmin.QUEUE_MESSAGE_COUNT);
Integer consumerCount = (Integer) queueInfo.get(RabbitAdmin.QUEUE_CONSUMER_COUNT);
// 记录指标
Gauge.builder("rabbitmq.queue.messages")
.tag("queue", "order.process.queue")
.register(meterRegistry, () -> messageCount != null ? messageCount : 0);
Gauge.builder("rabbitmq.queue.consumers")
.tag("queue", "order.process.queue")
.register(meterRegistry, () -> consumerCount != null ? consumerCount : 0);
// 检查消息积压
if (messageCount != null && messageCount > 1000) {
System.err.println("队列消息积压告警: " + messageCount + " 条消息待处理");
sendAlert("RabbitMQ队列积压", "队列: order.process.queue, 积压消息: " + messageCount);
}
}
} catch (Exception e) {
System.err.println("RabbitMQ监控异常: " + e.getMessage());
}
}
// Kafka监控
@Scheduled(fixedRate = 60000) // 每分钟收集一次
public void collectKafkaMetrics() {
try (AdminClient adminClient = AdminClient.create(getKafkaAdminProps())) {
// 获取Topic信息
ListTopicsResult topicsResult = adminClient.listTopics();
Set<String> topics = topicsResult.names().get();
for (String topic : topics) {
if (topic.startsWith("order-")) {
collectTopicMetrics(adminClient, topic);
}
}
} catch (Exception e) {
System.err.println("Kafka监控异常: " + e.getMessage());
}
}
private void collectTopicMetrics(AdminClient adminClient, String topic) throws Exception {
// 获取Topic描述
DescribeTopicsResult topicResult = adminClient.describeTopics(Collections.singletonList(topic));
TopicDescription description = topicResult.values().get(topic).get();
int partitionCount = description.partitions().size();
// 获取消费者组信息
ListConsumerGroupsResult groupsResult = adminClient.listConsumerGroups();
Collection<ConsumerGroupListing> groups = groupsResult.all().get();
for (ConsumerGroupListing group : groups) {
String groupId = group.groupId();
// 获取消费滞后信息
Map<TopicPartition, OffsetAndMetadata> offsets = adminClient
.listConsumerGroupOffsets(groupId)
.partitionsToOffsetAndMetadata().get();
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
TopicPartition partition = entry.getKey();
if (partition.topic().equals(topic)) {
long consumerOffset = entry.getValue().offset();
// 这里简化处理,实际需要获取最新offset计算lag
System.out.println("Topic: " + topic + ", Partition: " + partition.partition() +
", Consumer Offset: " + consumerOffset);
}
}
}
}
private Properties getKafkaAdminProps() {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
return props;
}
// 消息积压处理
@EventListener
public void handleMessageBacklog(MessageBacklogEvent event) {
if (event.getBacklogCount() > 5000) {
// 严重积压,采取紧急措施
// 1. 增加消费者
scaleUpConsumers(event.getQueueName());
// 2. 临时停止非关键消息生产
pauseNonCriticalProducers();
// 3. 发送告警
sendCriticalAlert("消息队列严重积压", event);
} else if (event.getBacklogCount() > 1000) {
// 轻微积压,发送告警
sendAlert("消息队列积压", event.toString());
}
}
private void scaleUpConsumers(String queueName) {
// 动态增加消费者实例
System.out.println("正在增加消费者实例: " + queueName);
}
private void pauseNonCriticalProducers() {
// 暂停非关键业务的消息生产
System.out.println("暂停非关键消息生产");
}
private void sendAlert(String title, String message) {
// 发送告警通知(邮件、短信、钉钉等)
System.out.println("告警: " + title + " - " + message);
}
private void sendCriticalAlert(String title, Object data) {
// 发送紧急告警
System.err.println("紧急告警: " + title + " - " + data);
}
}
// 消息消费性能统计
@Component
public class MessageConsumerMetrics {
private final Timer.Sample sample;
private final Counter successCounter;
private final Counter failureCounter;
public MessageConsumerMetrics(MeterRegistry meterRegistry) {
this.sample = Timer.start(meterRegistry);
this.successCounter = Counter.builder("message.consumer.success")
.register(meterRegistry);
this.failureCounter = Counter.builder("message.consumer.failure")
.register(meterRegistry);
}
@EventListener
public void onMessageProcessed(MessageProcessedEvent event) {
if (event.isSuccess()) {
successCounter.increment();
} else {
failureCounter.increment();
}
// 记录处理时间
Timer.Sample.stop(Timer.builder("message.processing.time")
.tag("queue", event.getQueueName())
.register(Metrics.globalRegistry), sample);
}
}
高频面试题目
1. 理论深度题目
Q: 如何保证消息的顺序性?
A: 保证消息顺序性的方法:
- 单分区发送:将需要保证顺序的消息发送到同一分区
- 消息键:使用相同的消息键确保路由到同一分区
- 单线程消费:在消费端使用单线程处理同一分区的消息
- 队列模式:RabbitMQ中使用单队列单消费者
Q: 如何处理消息丢失问题?
A: 防止消息丢失的措施:
- 生产者确认:等待broker确认消息接收
- 持久化:消息和队列都设置为持久化
- 副本机制:设置多个副本
- 消费者确认:手动确认消息处理完成
- 事务机制:使用事务保证消息发送
2. 实战应用题目
Q: 设计一个分布式事务消息方案?
答题要点:
- 本地消息表:先执行本地事务,再发送消息
- 事务消息:支持事务的MQ(如RocketMQ)
- Saga模式:事件驱动的分布式事务
- 补偿机制:失败时的补偿操作
- 幂等性保证:确保消息重复消费的幂等性
Q: 如何设计一个高可用的消息队列集群?
答题要点:
- 主从复制:master-slave架构
- 负载均衡:多个broker节点
- 故障转移:自动故障检测和切换
- 数据分片:分区/分片提高并发能力
- 监控告警:完善的监控和告警机制
总结
消息队列面试重点:
- 基础概念:消息队列的作用、使用场景
- 产品特点:不同MQ产品的优缺点和适用场景
- 可靠性保证:消息不丢失、不重复、有序性
- 性能优化:吞吐量、延迟、资源使用优化
- 运维监控:监控指标、故障处理、容量规划
建议结合实际项目中使用消息队列解决的具体问题,能够描述完整的技术方案和实现细节。