分布式系统设计与实现

概述

分布式系统是大型互联网应用的基础架构,涉及服务拆分、数据一致性、系统容错等复杂问题。本文总结了分布式系统设计中的核心概念和常见面试问题。

核心面试问题

1. CAP理论与BASE理论

面试问题:什么是CAP理论?在实际项目中如何权衡CAP?

CAP理论详解

// CAP理论示例:分布式锁实现
public interface DistributedLock {
    boolean tryLock(String key, String value, long expireTime);
    boolean unlock(String key, String value);
}

// CP系统:强一致性,可能牺牲可用性
@Component
public class ConsistentDistributedLock implements DistributedLock {

    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    @Override
    public boolean tryLock(String key, String value, long expireTime) {
        // 使用Lua脚本保证原子性
        String script = 
            "if redis.call('get', KEYS[1]) == false then " +
            "  return redis.call('setex', KEYS[1], ARGV[2], ARGV[1]) " +
            "else " +
            "  return false " +
            "end";

        DefaultRedisScript<Boolean> redisScript = new DefaultRedisScript<>();
        redisScript.setScriptText(script);
        redisScript.setResultType(Boolean.class);

        return Boolean.TRUE.equals(redisTemplate.execute(redisScript, 
            Collections.singletonList(key), value, String.valueOf(expireTime)));
    }

    @Override
    public boolean unlock(String key, String value) {
        String script = 
            "if redis.call('get', KEYS[1]) == ARGV[1] then " +
            "  return redis.call('del', KEYS[1]) " +
            "else " +
            "  return 0 " +
            "end";

        DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
        redisScript.setScriptText(script);
        redisScript.setResultType(Long.class);

        return Long.valueOf(1).equals(redisTemplate.execute(redisScript, 
            Collections.singletonList(key), value));
    }
}

// AP系统:高可用性,最终一致性
@Component
public class AvailableDistributedLock implements DistributedLock {

    private final List<RedisTemplate<String, String>> redisTemplates;

    public AvailableDistributedLock(List<RedisTemplate<String, String>> redisTemplates) {
        this.redisTemplates = redisTemplates;
    }

    @Override
    public boolean tryLock(String key, String value, long expireTime) {
        int successCount = 0;
        int majority = redisTemplates.size() / 2 + 1;

        for (RedisTemplate<String, String> template : redisTemplates) {
            try {
                Boolean result = template.opsForValue().setIfAbsent(key, value, 
                    Duration.ofMillis(expireTime));
                if (Boolean.TRUE.equals(result)) {
                    successCount++;
                }
            } catch (Exception e) {
                // 忽略单个节点失败,保证可用性
                System.out.println("Redis节点异常: " + e.getMessage());
            }
        }

        return successCount >= majority;
    }

    @Override
    public boolean unlock(String key, String value) {
        int successCount = 0;

        for (RedisTemplate<String, String> template : redisTemplates) {
            try {
                String currentValue = template.opsForValue().get(key);
                if (value.equals(currentValue)) {
                    template.delete(key);
                    successCount++;
                }
            } catch (Exception e) {
                // 最终一致性,部分节点失败可接受
                System.out.println("释放锁异常: " + e.getMessage());
            }
        }

        return successCount > 0;
    }
}

BASE理论实现

// 最终一致性的订单系统
@Service
public class OrderService {

    @Autowired
    private OrderRepository orderRepository;

    @Autowired
    private InventoryService inventoryService;

    @Autowired
    private PaymentService paymentService;

    @Autowired
    private EventPublisher eventPublisher;

    // Basically Available: 基本可用
    public OrderResult createOrder(OrderRequest request) {
        try {
            // 1. 创建订单(本地事务)
            Order order = new Order(request);
            order.setStatus(OrderStatus.PENDING);
            orderRepository.save(order);

            // 2. 发布事件,异步处理后续步骤
            eventPublisher.publishEvent(new OrderCreatedEvent(order.getId()));

            return OrderResult.success(order.getId(), "订单创建成功,正在处理中");

        } catch (Exception e) {
            return OrderResult.failure("订单创建失败");
        }
    }

    // Soft State: 软状态,允许中间状态
    @EventListener
    @Async
    public void handleOrderCreated(OrderCreatedEvent event) {
        Long orderId = event.getOrderId();
        Order order = orderRepository.findById(orderId);

        try {
            // 3. 扣减库存
            order.setStatus(OrderStatus.PROCESSING);
            orderRepository.save(order);

            boolean inventorySuccess = inventoryService.reduceInventory(
                order.getProductId(), order.getQuantity());

            if (inventorySuccess) {
                // 4. 处理支付
                boolean paymentSuccess = paymentService.processPayment(
                    order.getUserId(), order.getAmount());

                if (paymentSuccess) {
                    order.setStatus(OrderStatus.COMPLETED);
                    eventPublisher.publishEvent(new OrderCompletedEvent(orderId));
                } else {
                    // 支付失败,回滚库存
                    inventoryService.rollbackInventory(order.getProductId(), order.getQuantity());
                    order.setStatus(OrderStatus.FAILED);
                }
            } else {
                order.setStatus(OrderStatus.FAILED);
            }

        } catch (Exception e) {
            order.setStatus(OrderStatus.FAILED);
            // 发布补偿事件
            eventPublisher.publishEvent(new OrderCompensationEvent(orderId));
        } finally {
            orderRepository.save(order);
        }
    }

    // Eventually Consistent: 最终一致性
    @EventListener
    @Retryable(value = Exception.class, maxAttempts = 3)
    public void handleOrderCompleted(OrderCompletedEvent event) {
        // 更新相关系统状态
        Long orderId = event.getOrderId();

        // 更新用户积分(可以失败重试)
        try {
            userService.addPoints(event.getUserId(), event.getPoints());
        } catch (Exception e) {
            // 记录失败,后续补偿
            compensationService.recordFailedOperation("ADD_POINTS", orderId);
        }

        // 更新商品销量统计(可以失败重试)
        try {
            productService.updateSalesCount(event.getProductId(), event.getQuantity());
        } catch (Exception e) {
            compensationService.recordFailedOperation("UPDATE_SALES", orderId);
        }
    }
}

2. 分布式事务解决方案

面试问题:分布式事务的实现方案有哪些?各有什么优缺点?

2PC(两阶段提交)实现

// 分布式事务协调器
@Component
public class TwoPhaseCommitCoordinator {

    private final List<TransactionParticipant> participants;
    private final TransactionLogService logService;

    public TwoPhaseCommitCoordinator(List<TransactionParticipant> participants,
                                   TransactionLogService logService) {
        this.participants = participants;
        this.logService = logService;
    }

    public boolean executeTransaction(String transactionId, TransactionContext context) {
        // 记录事务开始
        logService.logTransactionStart(transactionId);

        try {
            // Phase 1: Prepare阶段
            if (!preparePhase(transactionId, context)) {
                rollbackAll(transactionId);
                return false;
            }

            // Phase 2: Commit阶段
            return commitPhase(transactionId);

        } catch (Exception e) {
            rollbackAll(transactionId);
            return false;
        }
    }

    private boolean preparePhase(String transactionId, TransactionContext context) {
        for (TransactionParticipant participant : participants) {
            try {
                boolean prepared = participant.prepare(transactionId, context);
                logService.logParticipantPrepare(transactionId, participant.getName(), prepared);

                if (!prepared) {
                    return false;
                }
            } catch (Exception e) {
                logService.logParticipantError(transactionId, participant.getName(), e);
                return false;
            }
        }
        return true;
    }

    private boolean commitPhase(String transactionId) {
        boolean allCommitted = true;

        for (TransactionParticipant participant : participants) {
            try {
                boolean committed = participant.commit(transactionId);
                logService.logParticipantCommit(transactionId, participant.getName(), committed);

                if (!committed) {
                    allCommitted = false;
                }
            } catch (Exception e) {
                logService.logParticipantError(transactionId, participant.getName(), e);
                allCommitted = false;
            }
        }

        logService.logTransactionEnd(transactionId, allCommitted);
        return allCommitted;
    }

    private void rollbackAll(String transactionId) {
        for (TransactionParticipant participant : participants) {
            try {
                participant.rollback(transactionId);
                logService.logParticipantRollback(transactionId, participant.getName());
            } catch (Exception e) {
                logService.logParticipantError(transactionId, participant.getName(), e);
            }
        }
    }
}

// 事务参与者接口
public interface TransactionParticipant {
    String getName();
    boolean prepare(String transactionId, TransactionContext context);
    boolean commit(String transactionId);
    boolean rollback(String transactionId);
}

// 具体参与者实现
@Component
public class InventoryParticipant implements TransactionParticipant {

    @Autowired
    private InventoryRepository inventoryRepository;

    @Autowired
    private TransactionCache transactionCache;

    @Override
    public String getName() {
        return "InventoryService";
    }

    @Override
    public boolean prepare(String transactionId, TransactionContext context) {
        try {
            // 检查库存是否足够
            InventoryOperation operation = context.getInventoryOperation();
            int currentStock = inventoryRepository.getStock(operation.getProductId());

            if (currentStock >= operation.getQuantity()) {
                // 预锁定库存
                transactionCache.lockInventory(transactionId, operation);
                return true;
            }
            return false;
        } catch (Exception e) {
            return false;
        }
    }

    @Override
    public boolean commit(String transactionId) {
        try {
            InventoryOperation operation = transactionCache.getInventoryOperation(transactionId);
            inventoryRepository.reduceStock(operation.getProductId(), operation.getQuantity());
            transactionCache.clearInventoryLock(transactionId);
            return true;
        } catch (Exception e) {
            return false;
        }
    }

    @Override
    public boolean rollback(String transactionId) {
        try {
            transactionCache.clearInventoryLock(transactionId);
            return true;
        } catch (Exception e) {
            return false;
        }
    }
}

Saga模式实现

// Saga事务管理器
@Service
public class SagaTransactionManager {

    private final Map<String, SagaDefinition> sagaDefinitions;
    private final SagaLogService sagaLogService;

    public SagaTransactionManager(Map<String, SagaDefinition> sagaDefinitions,
                                SagaLogService sagaLogService) {
        this.sagaDefinitions = sagaDefinitions;
        this.sagaLogService = sagaLogService;
    }

    public SagaResult executeSaga(String sagaType, SagaContext context) {
        String sagaId = UUID.randomUUID().toString();
        SagaDefinition definition = sagaDefinitions.get(sagaType);

        if (definition == null) {
            return SagaResult.failure("未找到Saga定义: " + sagaType);
        }

        sagaLogService.logSagaStart(sagaId, sagaType);

        List<SagaStep> executedSteps = new ArrayList<>();

        try {
            // 顺序执行Saga步骤
            for (SagaStep step : definition.getSteps()) {
                boolean success = executeStep(sagaId, step, context);
                executedSteps.add(step);

                if (!success) {
                    // 执行补偿操作
                    compensate(sagaId, executedSteps, context);
                    return SagaResult.failure("Saga执行失败,已回滚");
                }
            }

            sagaLogService.logSagaSuccess(sagaId);
            return SagaResult.success(sagaId);

        } catch (Exception e) {
            compensate(sagaId, executedSteps, context);
            sagaLogService.logSagaFailure(sagaId, e);
            return SagaResult.failure("Saga执行异常: " + e.getMessage());
        }
    }

    private boolean executeStep(String sagaId, SagaStep step, SagaContext context) {
        try {
            sagaLogService.logStepStart(sagaId, step.getName());
            boolean result = step.execute(context);
            sagaLogService.logStepResult(sagaId, step.getName(), result);
            return result;
        } catch (Exception e) {
            sagaLogService.logStepError(sagaId, step.getName(), e);
            return false;
        }
    }

    private void compensate(String sagaId, List<SagaStep> executedSteps, SagaContext context) {
        // 逆序执行补偿操作
        Collections.reverse(executedSteps);

        for (SagaStep step : executedSteps) {
            try {
                sagaLogService.logCompensationStart(sagaId, step.getName());
                step.compensate(context);
                sagaLogService.logCompensationSuccess(sagaId, step.getName());
            } catch (Exception e) {
                sagaLogService.logCompensationError(sagaId, step.getName(), e);
                // 补偿失败,记录日志,可能需要人工介入
            }
        }
    }
}

// Saga步骤定义
public interface SagaStep {
    String getName();
    boolean execute(SagaContext context);
    void compensate(SagaContext context);
}

// 订单Saga步骤实现
@Component
public class CreateOrderStep implements SagaStep {

    @Autowired
    private OrderService orderService;

    @Override
    public String getName() {
        return "CreateOrder";
    }

    @Override
    public boolean execute(SagaContext context) {
        OrderInfo orderInfo = context.getOrderInfo();
        Order order = orderService.createOrder(orderInfo);
        context.setOrderId(order.getId());
        return order != null;
    }

    @Override
    public void compensate(SagaContext context) {
        Long orderId = context.getOrderId();
        if (orderId != null) {
            orderService.cancelOrder(orderId);
        }
    }
}

@Component
public class ReduceInventoryStep implements SagaStep {

    @Autowired
    private InventoryService inventoryService;

    @Override
    public String getName() {
        return "ReduceInventory";
    }

    @Override
    public boolean execute(SagaContext context) {
        OrderInfo orderInfo = context.getOrderInfo();
        return inventoryService.reduceInventory(
            orderInfo.getProductId(), orderInfo.getQuantity());
    }

    @Override
    public void compensate(SagaContext context) {
        OrderInfo orderInfo = context.getOrderInfo();
        inventoryService.addInventory(
            orderInfo.getProductId(), orderInfo.getQuantity());
    }
}

3. 分布式锁与选主算法

面试问题:分布式锁的实现方式有哪些?Raft算法的基本原理?

Redlock算法实现

@Component
public class RedlockDistributedLock {

    private final List<RedisTemplate<String, String>> redisInstances;
    private final int quorum;
    private final long clockDriftFactor = 10; // 时钟漂移因子

    public RedlockDistributedLock(List<RedisTemplate<String, String>> redisInstances) {
        this.redisInstances = redisInstances;
        this.quorum = redisInstances.size() / 2 + 1;
    }

    public RedlockResult acquireLock(String resource, String value, long ttl) {
        List<RedisLockResult> results = new ArrayList<>();
        long startTime = System.currentTimeMillis();

        // 尝试在所有Redis实例上获取锁
        for (RedisTemplate<String, String> redis : redisInstances) {
            RedisLockResult result = acquireLockOnInstance(redis, resource, value, ttl);
            results.add(result);
        }

        long endTime = System.currentTimeMillis();
        long elapsedTime = endTime - startTime;

        // 计算成功获取锁的实例数
        long successCount = results.stream()
            .mapToLong(result -> result.isSuccess() ? 1 : 0)
            .sum();

        // 检查是否满足Redlock条件
        if (successCount >= quorum && 
            elapsedTime < (ttl - clockDriftFactor)) {

            long validityTime = ttl - elapsedTime - clockDriftFactor;
            return RedlockResult.success(resource, value, validityTime, results);
        } else {
            // 释放已获取的锁
            releaseLock(resource, value, results);
            return RedlockResult.failure("获取锁失败");
        }
    }

    private RedisLockResult acquireLockOnInstance(RedisTemplate<String, String> redis,
                                                String resource, String value, long ttl) {
        try {
            String script = 
                "if redis.call('set', KEYS[1], ARGV[1], 'NX', 'PX', ARGV[2]) then " +
                "  return 1 " +
                "else " +
                "  return 0 " +
                "end";

            DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
            redisScript.setScriptText(script);
            redisScript.setResultType(Long.class);

            long startTime = System.currentTimeMillis();
            Long result = redis.execute(redisScript, 
                Collections.singletonList(resource), value, String.valueOf(ttl));
            long endTime = System.currentTimeMillis();

            return new RedisLockResult(redis, result == 1, endTime - startTime);

        } catch (Exception e) {
            return new RedisLockResult(redis, false, 0);
        }
    }

    public void releaseLock(String resource, String value, List<RedisLockResult> results) {
        String script = 
            "if redis.call('get', KEYS[1]) == ARGV[1] then " +
            "  return redis.call('del', KEYS[1]) " +
            "else " +
            "  return 0 " +
            "end";

        DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
        redisScript.setScriptText(script);
        redisScript.setResultType(Long.class);

        for (RedisLockResult result : results) {
            if (result.isSuccess()) {
                try {
                    result.getRedisTemplate().execute(redisScript,
                        Collections.singletonList(resource), value);
                } catch (Exception e) {
                    // 记录释放锁失败的情况
                    System.err.println("释放锁失败: " + e.getMessage());
                }
            }
        }
    }
}

基于ZooKeeper的分布式锁

@Component
public class ZookeeperDistributedLock {

    private final CuratorFramework client;
    private final String lockPath = "/distributed-locks";

    public ZookeeperDistributedLock(CuratorFramework client) {
        this.client = client;
        try {
            // 确保根路径存在
            if (client.checkExists().forPath(lockPath) == null) {
                client.create().creatingParentsIfNeeded().forPath(lockPath);
            }
        } catch (Exception e) {
            throw new RuntimeException("初始化ZooKeeper锁路径失败", e);
        }
    }

    public ZkLockResult acquireLock(String lockName, long timeout, TimeUnit unit) {
        String fullPath = lockPath + "/" + lockName;
        InterProcessMutex mutex = new InterProcessMutex(client, fullPath);

        try {
            boolean acquired = mutex.acquire(timeout, unit);
            if (acquired) {
                return ZkLockResult.success(mutex);
            } else {
                return ZkLockResult.failure("获取锁超时");
            }
        } catch (Exception e) {
            return ZkLockResult.failure("获取锁异常: " + e.getMessage());
        }
    }

    public void releaseLock(InterProcessMutex mutex) {
        try {
            if (mutex.isAcquiredInThisProcess()) {
                mutex.release();
            }
        } catch (Exception e) {
            System.err.println("释放ZooKeeper锁失败: " + e.getMessage());
        }
    }

    // 分布式读写锁
    public class ZkReadWriteLock {
        private final InterProcessReadWriteLock rwLock;

        public ZkReadWriteLock(String lockName) {
            String fullPath = lockPath + "/" + lockName;
            this.rwLock = new InterProcessReadWriteLock(client, fullPath);
        }

        public InterProcessMutex readLock() {
            return rwLock.readLock();
        }

        public InterProcessMutex writeLock() {
            return rwLock.writeLock();
        }
    }

    // 领导选举
    public class LeaderElection {
        private final LeaderSelector leaderSelector;
        private final String name;

        public LeaderElection(String path, String name, LeaderSelectorListener listener) {
            this.name = name;
            this.leaderSelector = new LeaderSelector(client, path, listener);
            this.leaderSelector.autoRequeue();
        }

        public void start() {
            leaderSelector.start();
        }

        public void close() {
            leaderSelector.close();
        }

        public boolean hasLeadership() {
            return leaderSelector.hasLeadership();
        }
    }
}

// 领导选举监听器实现
@Component
public class ApplicationLeaderListener implements LeaderSelectorListener {

    private final AtomicBoolean isLeader = new AtomicBoolean(false);

    @Override
    public void takeLeadership(CuratorFramework client) throws Exception {
        isLeader.set(true);
        System.out.println("成为领导者,开始执行领导者任务");

        try {
            // 执行领导者专属任务
            while (isLeader.get()) {
                performLeaderTask();
                Thread.sleep(5000);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            isLeader.set(false);
            System.out.println("失去领导者身份");
        }
    }

    @Override
    public void stateChanged(CuratorFramework client, ConnectionState newState) {
        if (newState == ConnectionState.LOST || newState == ConnectionState.SUSPENDED) {
            isLeader.set(false);
        }
    }

    private void performLeaderTask() {
        // 执行只能由领导者执行的任务
        System.out.println("执行领导者任务: " + new Date());

        // 例如:
        // 1. 清理过期数据
        // 2. 生成报表
        // 3. 同步数据
        // 4. 发送心跳
    }

    public boolean isLeader() {
        return isLeader.get();
    }
}

4. 服务发现与负载均衡

面试问题:服务发现的实现方式,负载均衡算法有哪些?

服务注册与发现

// 服务注册中心
@Component
public class ServiceRegistry {

    private final Map<String, Set<ServiceInstance>> serviceInstances = new ConcurrentHashMap<>();
    private final ScheduledExecutorService healthCheckExecutor = Executors.newScheduledThreadPool(5);

    // 注册服务实例
    public void registerService(ServiceInstance instance) {
        String serviceName = instance.getServiceName();
        serviceInstances.computeIfAbsent(serviceName, k -> ConcurrentHashMap.newKeySet())
                       .add(instance);

        // 启动健康检查
        startHealthCheck(instance);

        System.out.println("服务注册成功: " + serviceName + " - " + instance.getAddress());
    }

    // 注销服务实例
    public void deregisterService(ServiceInstance instance) {
        String serviceName = instance.getServiceName();
        Set<ServiceInstance> instances = serviceInstances.get(serviceName);
        if (instances != null) {
            instances.remove(instance);
            if (instances.isEmpty()) {
                serviceInstances.remove(serviceName);
            }
        }

        System.out.println("服务注销成功: " + serviceName + " - " + instance.getAddress());
    }

    // 获取服务实例列表
    public List<ServiceInstance> getServiceInstances(String serviceName) {
        Set<ServiceInstance> instances = serviceInstances.get(serviceName);
        return instances != null ? new ArrayList<>(instances) : Collections.emptyList();
    }

    // 健康检查
    private void startHealthCheck(ServiceInstance instance) {
        healthCheckExecutor.scheduleWithFixedDelay(() -> {
            try {
                boolean healthy = performHealthCheck(instance);
                instance.setHealthy(healthy);

                if (!healthy) {
                    System.out.println("服务不健康: " + instance.getServiceName() + 
                                     " - " + instance.getAddress());
                }
            } catch (Exception e) {
                instance.setHealthy(false);
                System.err.println("健康检查异常: " + e.getMessage());
            }
        }, 10, 30, TimeUnit.SECONDS);
    }

    private boolean performHealthCheck(ServiceInstance instance) {
        try {
            // 发送HTTP健康检查请求
            String healthUrl = "http://" + instance.getAddress() + ":" + 
                              instance.getPort() + "/health";

            RestTemplate restTemplate = new RestTemplate();
            ResponseEntity<String> response = restTemplate.getForEntity(healthUrl, String.class);

            return response.getStatusCode().is2xxSuccessful();
        } catch (Exception e) {
            return false;
        }
    }
}

// 服务实例
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ServiceInstance {
    private String serviceName;
    private String address;
    private int port;
    private Map<String, String> metadata;
    private boolean healthy = true;
    private long registerTime = System.currentTimeMillis();

    @Override
    public boolean equals(Object obj) {
        if (this == obj) return true;
        if (obj == null || getClass() != obj.getClass()) return false;
        ServiceInstance that = (ServiceInstance) obj;
        return port == that.port && 
               Objects.equals(serviceName, that.serviceName) && 
               Objects.equals(address, that.address);
    }

    @Override
    public int hashCode() {
        return Objects.hash(serviceName, address, port);
    }
}

负载均衡算法实现

// 负载均衡器接口
public interface LoadBalancer {
    ServiceInstance select(List<ServiceInstance> instances);
}

// 轮询算法
@Component
public class RoundRobinLoadBalancer implements LoadBalancer {
    private final AtomicInteger counter = new AtomicInteger(0);

    @Override
    public ServiceInstance select(List<ServiceInstance> instances) {
        if (instances.isEmpty()) {
            return null;
        }

        int index = counter.getAndIncrement() % instances.size();
        return instances.get(index);
    }
}

// 加权轮询算法
@Component
public class WeightedRoundRobinLoadBalancer implements LoadBalancer {
    private final Map<String, AtomicInteger> currentWeights = new ConcurrentHashMap<>();

    @Override
    public ServiceInstance select(List<ServiceInstance> instances) {
        if (instances.isEmpty()) {
            return null;
        }

        // 计算总权重
        int totalWeight = instances.stream()
            .mapToInt(this::getWeight)
            .sum();

        ServiceInstance selected = null;
        int maxCurrentWeight = 0;

        for (ServiceInstance instance : instances) {
            String key = getInstanceKey(instance);
            int weight = getWeight(instance);

            AtomicInteger currentWeight = currentWeights.computeIfAbsent(key, 
                k -> new AtomicInteger(0));

            currentWeight.addAndGet(weight);

            if (currentWeight.get() > maxCurrentWeight) {
                maxCurrentWeight = currentWeight.get();
                selected = instance;
            }
        }

        if (selected != null) {
            String selectedKey = getInstanceKey(selected);
            currentWeights.get(selectedKey).addAndGet(-totalWeight);
        }

        return selected;
    }

    private int getWeight(ServiceInstance instance) {
        String weight = instance.getMetadata().get("weight");
        return weight != null ? Integer.parseInt(weight) : 1;
    }

    private String getInstanceKey(ServiceInstance instance) {
        return instance.getServiceName() + ":" + instance.getAddress() + ":" + instance.getPort();
    }
}

// 最少连接数算法
@Component
public class LeastConnectionsLoadBalancer implements LoadBalancer {
    private final Map<String, AtomicInteger> connectionCounts = new ConcurrentHashMap<>();

    @Override
    public ServiceInstance select(List<ServiceInstance> instances) {
        if (instances.isEmpty()) {
            return null;
        }

        ServiceInstance selected = null;
        int minConnections = Integer.MAX_VALUE;

        for (ServiceInstance instance : instances) {
            String key = getInstanceKey(instance);
            int connections = connectionCounts.computeIfAbsent(key, 
                k -> new AtomicInteger(0)).get();

            if (connections < minConnections) {
                minConnections = connections;
                selected = instance;
            }
        }

        // 增加选中实例的连接数
        if (selected != null) {
            String selectedKey = getInstanceKey(selected);
            connectionCounts.get(selectedKey).incrementAndGet();
        }

        return selected;
    }

    public void releaseConnection(ServiceInstance instance) {
        String key = getInstanceKey(instance);
        AtomicInteger count = connectionCounts.get(key);
        if (count != null && count.get() > 0) {
            count.decrementAndGet();
        }
    }

    private String getInstanceKey(ServiceInstance instance) {
        return instance.getServiceName() + ":" + instance.getAddress() + ":" + instance.getPort();
    }
}

// 一致性哈希算法
@Component
public class ConsistentHashLoadBalancer implements LoadBalancer {
    private static final int VIRTUAL_NODES = 160;

    @Override
    public ServiceInstance select(List<ServiceInstance> instances) {
        if (instances.isEmpty()) {
            return null;
        }

        TreeMap<Integer, ServiceInstance> hashRing = buildHashRing(instances);

        // 这里简化处理,实际应该根据请求参数计算hash
        int requestHash = ThreadLocalRandom.current().nextInt();

        Map.Entry<Integer, ServiceInstance> entry = hashRing.ceilingEntry(requestHash);
        if (entry == null) {
            entry = hashRing.firstEntry();
        }

        return entry.getValue();
    }

    private TreeMap<Integer, ServiceInstance> buildHashRing(List<ServiceInstance> instances) {
        TreeMap<Integer, ServiceInstance> hashRing = new TreeMap<>();

        for (ServiceInstance instance : instances) {
            for (int i = 0; i < VIRTUAL_NODES; i++) {
                String virtualNode = getInstanceKey(instance) + "#" + i;
                int hash = hash(virtualNode);
                hashRing.put(hash, instance);
            }
        }

        return hashRing;
    }

    private int hash(String key) {
        return key.hashCode();
    }

    private String getInstanceKey(ServiceInstance instance) {
        return instance.getServiceName() + ":" + instance.getAddress() + ":" + instance.getPort();
    }
}

高频面试题目

1. 理论深度题目

Q: 什么是拜占庭将军问题?PBFT算法的基本原理?

A: 拜占庭将军问题描述分布式系统中节点可能出现任意故障(包括恶意行为)的情况。PBFT(实用拜占庭容错)算法通过三阶段协议(预准备、准备、提交)和超过2/3节点的共识来解决此问题。

Q: 分布式系统中的时钟同步问题如何解决?

A: 主要方法包括:

  • 物理时钟同步:NTP协议同步物理时钟
  • 逻辑时钟:Lamport时间戳、向量时钟
  • 混合时钟:TrueTime(Google Spanner使用)

2. 实战应用题目

Q: 如何设计一个分布式ID生成器?

答题要点:

  1. Snowflake算法:时间戳+机器ID+序列号
  2. 数据库自增:多个数据库实例,不同起始值和步长
  3. UUID:全局唯一但无序
  4. Redis计数器:INCR命令生成递增ID

Q: 如何实现分布式缓存的一致性?

答题要点:

  1. 缓存更新策略:先更新数据库,再删除缓存
  2. 双删策略:删除缓存,更新数据库,再次删除缓存
  3. 延时双删:考虑主从延迟的双删
  4. 分布式锁:保证更新操作的原子性

总结

分布式系统面试重点:

  1. 理论基础:CAP理论、BASE理论、一致性算法
  2. 分布式事务:2PC、3PC、Saga、TCC模式
  3. 分布式锁:Redis、ZooKeeper、数据库实现
  4. 服务治理:服务发现、负载均衡、容错机制
  5. 系统设计:分库分表、缓存策略、消息队列

建议结合具体的分布式系统项目经验,能够描述系统架构设计和解决的技术难题。

powered by Gitbook© 2025 编外计划 | 最后修改: 2025-07-28 18:05:38

results matching ""

    No results matching ""