消息队列与中间件应用

概述

消息队列是分布式系统中重要的中间件组件,用于系统解耦、异步处理、流量削峰等场景。本文涵盖了RabbitMQ、Kafka、RocketMQ等主流消息队列的原理和实战应用。

核心面试问题

1. 消息队列基础概念

面试问题:消息队列的作用是什么?不同MQ产品的特点和适用场景?

消息队列的作用

// 1. 系统解耦示例
@Service
public class OrderService {

    @Autowired
    private MessageSender messageSender;

    @Autowired
    private OrderRepository orderRepository;

    // 同步处理方式(紧耦合)
    public OrderResult createOrderSync(CreateOrderRequest request) {
        try {
            // 1. 创建订单
            Order order = new Order(request);
            orderRepository.save(order);

            // 2. 扣减库存(同步调用)
            inventoryService.reduceStock(request.getProductId(), request.getQuantity());

            // 3. 发送短信(同步调用)
            smsService.sendOrderNotification(request.getUserId(), order.getOrderNo());

            // 4. 更新积分(同步调用)
            pointService.addPoints(request.getUserId(), order.getAmount());

            return OrderResult.success(order.getId());
        } catch (Exception e) {
            // 任何一个步骤失败都会影响整个流程
            return OrderResult.failure("订单创建失败: " + e.getMessage());
        }
    }

    // 异步处理方式(解耦)
    public OrderResult createOrderAsync(CreateOrderRequest request) {
        try {
            // 1. 创建订单(核心业务逻辑)
            Order order = new Order(request);
            orderRepository.save(order);

            // 2. 发送消息,异步处理后续步骤
            OrderCreatedEvent event = new OrderCreatedEvent(
                order.getId(),
                order.getUserId(),
                order.getProductId(),
                order.getQuantity(),
                order.getAmount(),
                order.getOrderNo()
            );

            messageSender.sendMessage("order.created", event);

            return OrderResult.success(order.getId());
        } catch (Exception e) {
            return OrderResult.failure("订单创建失败: " + e.getMessage());
        }
    }
}

// 消息处理器
@Component
public class OrderEventHandler {

    @Autowired
    private InventoryService inventoryService;

    @Autowired
    private SmsService smsService;

    @Autowired
    private PointService pointService;

    @RabbitListener(queues = "inventory.reduce")
    public void handleInventoryReduce(OrderCreatedEvent event) {
        try {
            inventoryService.reduceStock(event.getProductId(), event.getQuantity());
            System.out.println("库存扣减完成: " + event.getOrderId());
        } catch (Exception e) {
            System.err.println("库存扣减失败: " + e.getMessage());
            // 发送补偿消息或重试
        }
    }

    @RabbitListener(queues = "notification.sms")
    public void handleSmsNotification(OrderCreatedEvent event) {
        try {
            smsService.sendOrderNotification(event.getUserId(), event.getOrderNo());
            System.out.println("短信发送完成: " + event.getOrderId());
        } catch (Exception e) {
            System.err.println("短信发送失败: " + e.getMessage());
            // 短信发送失败不影响主流程,可以重试或记录
        }
    }

    @RabbitListener(queues = "point.add")
    public void handlePointAdd(OrderCreatedEvent event) {
        try {
            pointService.addPoints(event.getUserId(), event.getAmount());
            System.out.println("积分添加完成: " + event.getOrderId());
        } catch (Exception e) {
            System.err.println("积分添加失败: " + e.getMessage());
        }
    }
}

流量削峰应用

// 秒杀场景的流量削峰
@Service
public class SeckillService {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    @Autowired
    private MessageSender messageSender;

    // 秒杀请求入队
    public SeckillResult participateInSeckill(Long userId, Long productId) {
        String redisKey = "seckill:stock:" + productId;

        // 1. Redis预扣库存(原子操作)
        Long remainingStock = redisTemplate.opsForValue().decrement(redisKey);

        if (remainingStock < 0) {
            // 库存不足,恢复库存
            redisTemplate.opsForValue().increment(redisKey);
            return SeckillResult.failure("商品已售完");
        }

        // 2. 生成秒杀订单号
        String seckillOrderNo = generateSeckillOrderNo(userId, productId);

        // 3. 发送消息到队列,异步处理真实订单创建
        SeckillOrder seckillOrder = SeckillOrder.builder()
            .userId(userId)
            .productId(productId)
            .seckillOrderNo(seckillOrderNo)
            .timestamp(System.currentTimeMillis())
            .build();

        messageSender.sendMessage("seckill.order.create", seckillOrder);

        return SeckillResult.success(seckillOrderNo, "秒杀成功,正在生成订单");
    }

    @RabbitListener(queues = "seckill.order.create")
    public void processSeckillOrder(SeckillOrder seckillOrder) {
        try {
            // 创建真实订单
            CreateOrderRequest request = new CreateOrderRequest();
            request.setUserId(seckillOrder.getUserId());
            request.setProductId(seckillOrder.getProductId());
            request.setQuantity(1);
            request.setSeckillOrderNo(seckillOrder.getSeckillOrderNo());

            OrderResult result = orderService.createSeckillOrder(request);

            if (result.isSuccess()) {
                // 通知用户秒杀成功
                notifyUser(seckillOrder.getUserId(), "秒杀订单创建成功", result.getOrderId());
            } else {
                // 订单创建失败,恢复Redis库存
                String redisKey = "seckill:stock:" + seckillOrder.getProductId();
                redisTemplate.opsForValue().increment(redisKey);

                notifyUser(seckillOrder.getUserId(), "秒杀失败", result.getMessage());
            }
        } catch (Exception e) {
            System.err.println("处理秒杀订单失败: " + e.getMessage());
            // 恢复库存并通知用户
        }
    }

    private String generateSeckillOrderNo(Long userId, Long productId) {
        return "SK" + System.currentTimeMillis() + userId + productId;
    }

    private void notifyUser(Long userId, String message, Object data) {
        // 通过WebSocket、短信或推送通知用户
    }
}

2. RabbitMQ深入应用

面试问题:RabbitMQ的消息确认机制,如何保证消息不丢失?

RabbitMQ配置与使用

@Configuration
@EnableRabbit
public class RabbitMQConfig {

    // 直连交换机配置
    @Bean
    public DirectExchange orderDirectExchange() {
        return ExchangeBuilder.directExchange("order.direct.exchange")
                .durable(true)
                .build();
    }

    // 主题交换机配置
    @Bean
    public TopicExchange orderTopicExchange() {
        return ExchangeBuilder.topicExchange("order.topic.exchange")
                .durable(true)
                .build();
    }

    // 死信交换机配置
    @Bean
    public DirectExchange deadLetterExchange() {
        return ExchangeBuilder.directExchange("order.dead.letter.exchange")
                .durable(true)
                .build();
    }

    // 延时队列配置
    @Bean
    public Queue delayQueue() {
        return QueueBuilder.durable("order.delay.queue")
                .withArgument("x-message-ttl", 30000)  // 30秒TTL
                .withArgument("x-dead-letter-exchange", "order.dead.letter.exchange")
                .withArgument("x-dead-letter-routing-key", "order.timeout")
                .build();
    }

    // 死信队列配置
    @Bean
    public Queue deadLetterQueue() {
        return QueueBuilder.durable("order.dead.letter.queue").build();
    }

    // 普通队列配置
    @Bean
    public Queue orderQueue() {
        return QueueBuilder.durable("order.process.queue")
                .withArgument("x-dead-letter-exchange", "order.dead.letter.exchange")
                .withArgument("x-dead-letter-routing-key", "order.failed")
                .build();
    }

    // 绑定关系
    @Bean
    public Binding orderQueueBinding() {
        return BindingBuilder.bind(orderQueue())
                .to(orderDirectExchange())
                .with("order.create");
    }

    @Bean
    public Binding deadLetterQueueBinding() {
        return BindingBuilder.bind(deadLetterQueue())
                .to(deadLetterExchange())
                .with("order.failed");
    }

    // RabbitTemplate配置
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);

        // 开启发送确认
        template.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                System.out.println("消息发送成功: " + correlationData);
            } else {
                System.err.println("消息发送失败: " + cause);
            }
        });

        // 开启路由失败回调
        template.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            System.err.println("消息路由失败: " + replyText + ", 交换机: " + exchange + 
                             ", 路由键: " + routingKey);
        });

        template.setMandatory(true);  // 开启路由失败回调

        return template;
    }
}

// 消息发送服务
@Service
public class RabbitMQMessageSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    // 可靠消息发送
    public void sendReliableMessage(String exchange, String routingKey, Object message) {
        String messageId = UUID.randomUUID().toString();

        try {
            // 1. 先存储消息到Redis(消息表)
            MessageRecord record = new MessageRecord(messageId, exchange, routingKey, message);
            redisTemplate.opsForValue().set("message:" + messageId, record, Duration.ofHours(24));

            // 2. 发送消息
            CorrelationData correlationData = new CorrelationData(messageId);
            rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);

            System.out.println("消息发送: " + messageId);

        } catch (Exception e) {
            System.err.println("消息发送异常: " + e.getMessage());
            throw e;
        }
    }

    // 事务消息发送
    @Transactional
    public void sendTransactionalMessage(String exchange, String routingKey, Object message) {
        // 1. 执行本地事务
        performLocalTransaction();

        // 2. 事务成功后发送消息
        rabbitTemplate.convertAndSend(exchange, routingKey, message);
    }

    private void performLocalTransaction() {
        // 本地事务逻辑
    }

    // 延时消息发送
    public void sendDelayMessage(String exchange, String routingKey, Object message, int delaySeconds) {
        MessageProperties properties = new MessageProperties();
        properties.setExpiration(String.valueOf(delaySeconds * 1000));

        Message msg = new Message(JSON.toJSONBytes(message), properties);

        rabbitTemplate.send(exchange, routingKey, msg);
    }
}

// 消息消费者
@Component
public class OrderMessageConsumer {

    @Autowired
    private OrderService orderService;

    // 手动确认消息
    @RabbitListener(queues = "order.process.queue", ackMode = "MANUAL")
    public void processOrder(Message message, Channel channel, 
                           @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
        try {
            String messageBody = new String(message.getBody());
            OrderCreatedEvent event = JSON.parseObject(messageBody, OrderCreatedEvent.class);

            // 处理业务逻辑
            orderService.processOrderEvent(event);

            // 手动确认消息
            channel.basicAck(deliveryTag, false);

        } catch (Exception e) {
            System.err.println("消息处理失败: " + e.getMessage());

            try {
                // 检查重试次数
                Integer retryCount = (Integer) message.getMessageProperties().getHeaders().get("retry-count");
                if (retryCount == null) {
                    retryCount = 0;
                }

                if (retryCount < 3) {
                    // 重试次数未达到上限,重新入队
                    channel.basicNack(deliveryTag, false, true);
                } else {
                    // 达到重试上限,拒绝消息(进入死信队列)
                    channel.basicNack(deliveryTag, false, false);
                }
            } catch (IOException ioException) {
                System.err.println("消息确认失败: " + ioException.getMessage());
            }
        }
    }

    // 死信队列处理
    @RabbitListener(queues = "order.dead.letter.queue")
    public void handleDeadLetter(OrderCreatedEvent event) {
        System.err.println("处理死信消息: " + event.getOrderId());

        // 记录到数据库,人工处理
        recordFailedMessage(event);

        // 或者发送告警通知
        sendAlert("订单处理失败", event);
    }

    private void recordFailedMessage(OrderCreatedEvent event) {
        // 记录失败消息到数据库
    }

    private void sendAlert(String title, Object data) {
        // 发送告警通知
    }
}

3. Kafka高性能应用

面试问题:Kafka的分区机制,如何保证消息顺序性?

Kafka配置与使用

@Configuration
@EnableKafka
public class KafkaConfig {

    // Producer配置
    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

        // 性能优化配置
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);        // 批量大小
        props.put(ProducerConfig.LINGER_MS_CONFIG, 5);             // 批量延迟
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");   // 压缩算法
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);   // 缓冲区大小

        // 可靠性配置
        props.put(ProducerConfig.ACKS_CONFIG, "all");              // 等待所有副本确认
        props.put(ProducerConfig.RETRIES_CONFIG, 3);               // 重试次数
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 幂等性

        return new DefaultKafkaProducerFactory<>(props);
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        KafkaTemplate<String, Object> template = new KafkaTemplate<>(producerFactory());

        // 设置发送回调
        template.setProducerListener(new ProducerListener<String, Object>() {
            @Override
            public void onSuccess(ProducerRecord<String, Object> producerRecord, 
                                RecordMetadata recordMetadata) {
                System.out.println("消息发送成功: " + recordMetadata);
            }

            @Override
            public void onError(ProducerRecord<String, Object> producerRecord, 
                              Exception exception) {
                System.err.println("消息发送失败: " + exception.getMessage());
            }
        });

        return template;
    }

    // Consumer配置
    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-service-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);

        // 消费策略配置
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);  // 手动提交
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);      // 批量拉取
        props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024);      // 最小拉取字节数
        props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);     // 最大等待时间

        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = 
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        // 并发配置
        factory.setConcurrency(3);

        // 手动确认模式
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);

        // 错误处理
        factory.setErrorHandler(new SeekToCurrentErrorHandler(
            new FixedBackOff(1000L, 3), // 重试3次,间隔1秒
            (record, exception) -> {
                System.err.println("消息处理失败,发送到死信队列: " + record.value());
                // 发送到死信队列或记录到数据库
            }
        ));

        return factory;
    }
}

// Kafka消息发送服务
@Service
public class KafkaMessageSender {

    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    // 发送消息(保证顺序性)
    public void sendOrderedMessage(String topic, String key, Object message) {
        // 使用相同的key保证消息发送到同一分区,从而保证顺序性
        kafkaTemplate.send(topic, key, message).addCallback(
            result -> System.out.println("消息发送成功: " + result.getRecordMetadata()),
            failure -> System.err.println("消息发送失败: " + failure.getMessage())
        );
    }

    // 发送事务消息
    @Transactional("kafkaTransactionManager")
    public void sendTransactionalMessage(String topic, Object message) {
        kafkaTemplate.send(topic, message);
        kafkaTemplate.flush(); // 确保消息发送
    }

    // 批量发送消息
    public void sendBatchMessages(String topic, List<Object> messages) {
        for (Object message : messages) {
            kafkaTemplate.send(topic, message);
        }
        kafkaTemplate.flush(); // 强制发送
    }
}

// Kafka消息消费者
@Component
public class KafkaOrderConsumer {

    @Autowired
    private OrderService orderService;

    // 普通消费
    @KafkaListener(topics = "order-events", groupId = "order-service-group")
    public void handleOrderEvent(OrderCreatedEvent event, Acknowledgment ack) {
        try {
            System.out.println("处理订单事件: " + event.getOrderId());

            orderService.processOrderEvent(event);

            // 手动确认
            ack.acknowledge();

        } catch (Exception e) {
            System.err.println("处理订单事件失败: " + e.getMessage());
            // 不确认消息,触发重试
        }
    }

    // 批量消费
    @KafkaListener(topics = "order-batch-events", groupId = "batch-order-group")
    public void handleOrderEventsBatch(List<OrderCreatedEvent> events, Acknowledgment ack) {
        try {
            System.out.println("批量处理订单事件: " + events.size() + " 条");

            orderService.batchProcessOrderEvents(events);

            ack.acknowledge();

        } catch (Exception e) {
            System.err.println("批量处理订单事件失败: " + e.getMessage());
        }
    }

    // 分区消费(保证顺序性)
    @KafkaListener(
        topicPartitions = @TopicPartition(topic = "order-sequence-events", partitions = {"0", "1", "2"}),
        groupId = "sequence-order-group"
    )
    public void handleSequentialOrderEvents(
        OrderCreatedEvent event,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
        Acknowledgment ack
    ) {
        try {
            System.out.println("顺序处理订单事件 - 分区: " + partition + ", 订单: " + event.getOrderId());

            // 按分区顺序处理
            orderService.processOrderEventSequentially(event, partition);

            ack.acknowledge();

        } catch (Exception e) {
            System.err.println("顺序处理失败: " + e.getMessage());
        }
    }
}

4. 消息幂等性与重复消费

面试问题:如何保证消息消费的幂等性?如何处理重复消息?

幂等性保证机制

@Service
public class IdempotentMessageProcessor {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    @Autowired
    private JdbcTemplate jdbcTemplate;

    // 基于Redis的幂等性控制
    public boolean processMessageWithRedisIdempotent(String messageId, Runnable businessLogic) {
        String idempotentKey = "message:idempotent:" + messageId;

        // 尝试设置幂等键,过期时间24小时
        Boolean success = redisTemplate.opsForValue().setIfAbsent(
            idempotentKey, "processed", Duration.ofHours(24));

        if (Boolean.TRUE.equals(success)) {
            try {
                // 第一次处理,执行业务逻辑
                businessLogic.run();
                return true;
            } catch (Exception e) {
                // 处理失败,删除幂等键,允许重试
                redisTemplate.delete(idempotentKey);
                throw e;
            }
        } else {
            // 重复消息,忽略处理
            System.out.println("重复消息,忽略处理: " + messageId);
            return false;
        }
    }

    // 基于数据库的幂等性控制
    public boolean processMessageWithDatabaseIdempotent(String messageId, Runnable businessLogic) {
        String insertSql = """
            INSERT INTO message_idempotent (message_id, status, create_time) 
            VALUES (?, 'PROCESSING', NOW())
            """;

        try {
            // 尝试插入记录
            int inserted = jdbcTemplate.update(insertSql, messageId);

            if (inserted > 0) {
                try {
                    // 执行业务逻辑
                    businessLogic.run();

                    // 更新状态为已完成
                    String updateSql = "UPDATE message_idempotent SET status = 'COMPLETED' WHERE message_id = ?";
                    jdbcTemplate.update(updateSql, messageId);

                    return true;
                } catch (Exception e) {
                    // 处理失败,删除记录或标记为失败
                    String deleteSql = "DELETE FROM message_idempotent WHERE message_id = ?";
                    jdbcTemplate.update(deleteSql, messageId);
                    throw e;
                }
            } else {
                // 记录已存在,重复消息
                System.out.println("重复消息,忽略处理: " + messageId);
                return false;
            }
        } catch (DuplicateKeyException e) {
            // 唯一键约束冲突,重复消息
            System.out.println("重复消息检测到: " + messageId);
            return false;
        }
    }

    // 业务幂等性示例:订单支付
    @RabbitListener(queues = "payment.process")
    public void handlePaymentMessage(PaymentMessage message, Acknowledgment ack) {
        String messageId = message.getMessageId();

        boolean processed = processMessageWithRedisIdempotent(messageId, () -> {
            processPayment(message);
        });

        if (processed) {
            System.out.println("支付处理完成: " + message.getOrderId());
        }

        ack.acknowledge();
    }

    private void processPayment(PaymentMessage message) {
        // 检查订单状态
        String checkOrderSql = "SELECT status FROM user_order WHERE id = ?";
        Integer orderStatus = jdbcTemplate.queryForObject(checkOrderSql, Integer.class, message.getOrderId());

        if (orderStatus == null) {
            throw new RuntimeException("订单不存在: " + message.getOrderId());
        }

        if (orderStatus == 2) { // 已支付
            System.out.println("订单已支付,忽略重复支付: " + message.getOrderId());
            return;
        }

        if (orderStatus != 1) { // 非待支付状态
            throw new RuntimeException("订单状态不正确: " + orderStatus);
        }

        // 执行支付逻辑
        String updateSql = "UPDATE user_order SET status = 2, pay_time = NOW() WHERE id = ? AND status = 1";
        int updated = jdbcTemplate.update(updateSql, message.getOrderId());

        if (updated == 0) {
            throw new RuntimeException("订单状态更新失败,可能已被其他进程处理");
        }

        // 发送支付成功通知
        sendPaymentSuccessNotification(message.getOrderId(), message.getUserId());
    }

    private void sendPaymentSuccessNotification(Long orderId, Long userId) {
        // 发送支付成功通知
    }
}

// 消息去重过滤器
@Component
public class MessageDeduplicationFilter {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    private final BloomFilter<String> bloomFilter;

    public MessageDeduplicationFilter() {
        // 初始化布隆过滤器
        this.bloomFilter = BloomFilter.create(
            Funnels.stringFunnel(Charset.defaultCharset()),
            1000000, // 预期插入数量
            0.01     // 误判率
        );
    }

    // 基于布隆过滤器的快速去重
    public boolean isMessageDuplicate(String messageId) {
        // 先检查布隆过滤器
        if (!bloomFilter.mightContain(messageId)) {
            // 肯定不存在
            bloomFilter.put(messageId);
            return false;
        }

        // 可能存在,进一步检查Redis
        String key = "message:processed:" + messageId;
        Boolean exists = redisTemplate.hasKey(key);

        if (Boolean.TRUE.equals(exists)) {
            return true; // 确实重复
        } else {
            // 布隆过滤器误判,标记为已处理
            redisTemplate.opsForValue().set(key, "1", Duration.ofDays(7));
            return false;
        }
    }

    // 消息消费前置过滤
    @EventListener
    public void handleMessage(MessageEvent event) {
        if (isMessageDuplicate(event.getMessageId())) {
            System.out.println("检测到重复消息,忽略处理: " + event.getMessageId());
            return;
        }

        // 处理消息
        processMessage(event);
    }

    private void processMessage(MessageEvent event) {
        // 实际的消息处理逻辑
    }
}

5. 消息队列监控与运维

面试问题:如何监控消息队列的性能?如何处理消息积压问题?

监控指标收集

@Component
public class MessageQueueMonitor {

    @Autowired
    private RabbitAdmin rabbitAdmin;

    @Autowired
    private MeterRegistry meterRegistry;

    @Autowired
    private KafkaAdmin kafkaAdmin;

    // RabbitMQ监控
    @Scheduled(fixedRate = 30000) // 每30秒收集一次
    public void collectRabbitMQMetrics() {
        try {
            // 获取队列信息
            Properties queueInfo = rabbitAdmin.getQueueProperties("order.process.queue");
            if (queueInfo != null) {
                Integer messageCount = (Integer) queueInfo.get(RabbitAdmin.QUEUE_MESSAGE_COUNT);
                Integer consumerCount = (Integer) queueInfo.get(RabbitAdmin.QUEUE_CONSUMER_COUNT);

                // 记录指标
                Gauge.builder("rabbitmq.queue.messages")
                    .tag("queue", "order.process.queue")
                    .register(meterRegistry, () -> messageCount != null ? messageCount : 0);

                Gauge.builder("rabbitmq.queue.consumers")
                    .tag("queue", "order.process.queue")
                    .register(meterRegistry, () -> consumerCount != null ? consumerCount : 0);

                // 检查消息积压
                if (messageCount != null && messageCount > 1000) {
                    System.err.println("队列消息积压告警: " + messageCount + " 条消息待处理");
                    sendAlert("RabbitMQ队列积压", "队列: order.process.queue, 积压消息: " + messageCount);
                }
            }
        } catch (Exception e) {
            System.err.println("RabbitMQ监控异常: " + e.getMessage());
        }
    }

    // Kafka监控
    @Scheduled(fixedRate = 60000) // 每分钟收集一次
    public void collectKafkaMetrics() {
        try (AdminClient adminClient = AdminClient.create(getKafkaAdminProps())) {

            // 获取Topic信息
            ListTopicsResult topicsResult = adminClient.listTopics();
            Set<String> topics = topicsResult.names().get();

            for (String topic : topics) {
                if (topic.startsWith("order-")) {
                    collectTopicMetrics(adminClient, topic);
                }
            }

        } catch (Exception e) {
            System.err.println("Kafka监控异常: " + e.getMessage());
        }
    }

    private void collectTopicMetrics(AdminClient adminClient, String topic) throws Exception {
        // 获取Topic描述
        DescribeTopicsResult topicResult = adminClient.describeTopics(Collections.singletonList(topic));
        TopicDescription description = topicResult.values().get(topic).get();

        int partitionCount = description.partitions().size();

        // 获取消费者组信息
        ListConsumerGroupsResult groupsResult = adminClient.listConsumerGroups();
        Collection<ConsumerGroupListing> groups = groupsResult.all().get();

        for (ConsumerGroupListing group : groups) {
            String groupId = group.groupId();

            // 获取消费滞后信息
            Map<TopicPartition, OffsetAndMetadata> offsets = adminClient
                .listConsumerGroupOffsets(groupId)
                .partitionsToOffsetAndMetadata().get();

            for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
                TopicPartition partition = entry.getKey();
                if (partition.topic().equals(topic)) {
                    long consumerOffset = entry.getValue().offset();

                    // 这里简化处理,实际需要获取最新offset计算lag
                    System.out.println("Topic: " + topic + ", Partition: " + partition.partition() + 
                                     ", Consumer Offset: " + consumerOffset);
                }
            }
        }
    }

    private Properties getKafkaAdminProps() {
        Properties props = new Properties();
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        return props;
    }

    // 消息积压处理
    @EventListener
    public void handleMessageBacklog(MessageBacklogEvent event) {
        if (event.getBacklogCount() > 5000) {
            // 严重积压,采取紧急措施

            // 1. 增加消费者
            scaleUpConsumers(event.getQueueName());

            // 2. 临时停止非关键消息生产
            pauseNonCriticalProducers();

            // 3. 发送告警
            sendCriticalAlert("消息队列严重积压", event);
        } else if (event.getBacklogCount() > 1000) {
            // 轻微积压,发送告警
            sendAlert("消息队列积压", event.toString());
        }
    }

    private void scaleUpConsumers(String queueName) {
        // 动态增加消费者实例
        System.out.println("正在增加消费者实例: " + queueName);
    }

    private void pauseNonCriticalProducers() {
        // 暂停非关键业务的消息生产
        System.out.println("暂停非关键消息生产");
    }

    private void sendAlert(String title, String message) {
        // 发送告警通知(邮件、短信、钉钉等)
        System.out.println("告警: " + title + " - " + message);
    }

    private void sendCriticalAlert(String title, Object data) {
        // 发送紧急告警
        System.err.println("紧急告警: " + title + " - " + data);
    }
}

// 消息消费性能统计
@Component
public class MessageConsumerMetrics {

    private final Timer.Sample sample;
    private final Counter successCounter;
    private final Counter failureCounter;

    public MessageConsumerMetrics(MeterRegistry meterRegistry) {
        this.sample = Timer.start(meterRegistry);
        this.successCounter = Counter.builder("message.consumer.success")
            .register(meterRegistry);
        this.failureCounter = Counter.builder("message.consumer.failure")
            .register(meterRegistry);
    }

    @EventListener
    public void onMessageProcessed(MessageProcessedEvent event) {
        if (event.isSuccess()) {
            successCounter.increment();
        } else {
            failureCounter.increment();
        }

        // 记录处理时间
        Timer.Sample.stop(Timer.builder("message.processing.time")
            .tag("queue", event.getQueueName())
            .register(Metrics.globalRegistry), sample);
    }
}

高频面试题目

1. 理论深度题目

Q: 如何保证消息的顺序性?

A: 保证消息顺序性的方法:

  1. 单分区发送:将需要保证顺序的消息发送到同一分区
  2. 消息键:使用相同的消息键确保路由到同一分区
  3. 单线程消费:在消费端使用单线程处理同一分区的消息
  4. 队列模式:RabbitMQ中使用单队列单消费者

Q: 如何处理消息丢失问题?

A: 防止消息丢失的措施:

  • 生产者确认:等待broker确认消息接收
  • 持久化:消息和队列都设置为持久化
  • 副本机制:设置多个副本
  • 消费者确认:手动确认消息处理完成
  • 事务机制:使用事务保证消息发送

2. 实战应用题目

Q: 设计一个分布式事务消息方案?

答题要点:

  1. 本地消息表:先执行本地事务,再发送消息
  2. 事务消息:支持事务的MQ(如RocketMQ)
  3. Saga模式:事件驱动的分布式事务
  4. 补偿机制:失败时的补偿操作
  5. 幂等性保证:确保消息重复消费的幂等性

Q: 如何设计一个高可用的消息队列集群?

答题要点:

  1. 主从复制:master-slave架构
  2. 负载均衡:多个broker节点
  3. 故障转移:自动故障检测和切换
  4. 数据分片:分区/分片提高并发能力
  5. 监控告警:完善的监控和告警机制

总结

消息队列面试重点:

  1. 基础概念:消息队列的作用、使用场景
  2. 产品特点:不同MQ产品的优缺点和适用场景
  3. 可靠性保证:消息不丢失、不重复、有序性
  4. 性能优化:吞吐量、延迟、资源使用优化
  5. 运维监控:监控指标、故障处理、容量规划

建议结合实际项目中使用消息队列解决的具体问题,能够描述完整的技术方案和实现细节。

powered by Gitbook© 2025 编外计划 | 最后修改: 2025-07-28 18:05:38

results matching ""

    No results matching ""