分布式系统设计与实现
概述
分布式系统是大型互联网应用的基础架构,涉及服务拆分、数据一致性、系统容错等复杂问题。本文总结了分布式系统设计中的核心概念和常见面试问题。
核心面试问题
1. CAP理论与BASE理论
面试问题:什么是CAP理论?在实际项目中如何权衡CAP?
CAP理论详解
// CAP理论示例:分布式锁实现
public interface DistributedLock {
boolean tryLock(String key, String value, long expireTime);
boolean unlock(String key, String value);
}
// CP系统:强一致性,可能牺牲可用性
@Component
public class ConsistentDistributedLock implements DistributedLock {
@Autowired
private RedisTemplate<String, String> redisTemplate;
@Override
public boolean tryLock(String key, String value, long expireTime) {
// 使用Lua脚本保证原子性
String script =
"if redis.call('get', KEYS[1]) == false then " +
" return redis.call('setex', KEYS[1], ARGV[2], ARGV[1]) " +
"else " +
" return false " +
"end";
DefaultRedisScript<Boolean> redisScript = new DefaultRedisScript<>();
redisScript.setScriptText(script);
redisScript.setResultType(Boolean.class);
return Boolean.TRUE.equals(redisTemplate.execute(redisScript,
Collections.singletonList(key), value, String.valueOf(expireTime)));
}
@Override
public boolean unlock(String key, String value) {
String script =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" return redis.call('del', KEYS[1]) " +
"else " +
" return 0 " +
"end";
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptText(script);
redisScript.setResultType(Long.class);
return Long.valueOf(1).equals(redisTemplate.execute(redisScript,
Collections.singletonList(key), value));
}
}
// AP系统:高可用性,最终一致性
@Component
public class AvailableDistributedLock implements DistributedLock {
private final List<RedisTemplate<String, String>> redisTemplates;
public AvailableDistributedLock(List<RedisTemplate<String, String>> redisTemplates) {
this.redisTemplates = redisTemplates;
}
@Override
public boolean tryLock(String key, String value, long expireTime) {
int successCount = 0;
int majority = redisTemplates.size() / 2 + 1;
for (RedisTemplate<String, String> template : redisTemplates) {
try {
Boolean result = template.opsForValue().setIfAbsent(key, value,
Duration.ofMillis(expireTime));
if (Boolean.TRUE.equals(result)) {
successCount++;
}
} catch (Exception e) {
// 忽略单个节点失败,保证可用性
System.out.println("Redis节点异常: " + e.getMessage());
}
}
return successCount >= majority;
}
@Override
public boolean unlock(String key, String value) {
int successCount = 0;
for (RedisTemplate<String, String> template : redisTemplates) {
try {
String currentValue = template.opsForValue().get(key);
if (value.equals(currentValue)) {
template.delete(key);
successCount++;
}
} catch (Exception e) {
// 最终一致性,部分节点失败可接受
System.out.println("释放锁异常: " + e.getMessage());
}
}
return successCount > 0;
}
}
BASE理论实现
// 最终一致性的订单系统
@Service
public class OrderService {
@Autowired
private OrderRepository orderRepository;
@Autowired
private InventoryService inventoryService;
@Autowired
private PaymentService paymentService;
@Autowired
private EventPublisher eventPublisher;
// Basically Available: 基本可用
public OrderResult createOrder(OrderRequest request) {
try {
// 1. 创建订单(本地事务)
Order order = new Order(request);
order.setStatus(OrderStatus.PENDING);
orderRepository.save(order);
// 2. 发布事件,异步处理后续步骤
eventPublisher.publishEvent(new OrderCreatedEvent(order.getId()));
return OrderResult.success(order.getId(), "订单创建成功,正在处理中");
} catch (Exception e) {
return OrderResult.failure("订单创建失败");
}
}
// Soft State: 软状态,允许中间状态
@EventListener
@Async
public void handleOrderCreated(OrderCreatedEvent event) {
Long orderId = event.getOrderId();
Order order = orderRepository.findById(orderId);
try {
// 3. 扣减库存
order.setStatus(OrderStatus.PROCESSING);
orderRepository.save(order);
boolean inventorySuccess = inventoryService.reduceInventory(
order.getProductId(), order.getQuantity());
if (inventorySuccess) {
// 4. 处理支付
boolean paymentSuccess = paymentService.processPayment(
order.getUserId(), order.getAmount());
if (paymentSuccess) {
order.setStatus(OrderStatus.COMPLETED);
eventPublisher.publishEvent(new OrderCompletedEvent(orderId));
} else {
// 支付失败,回滚库存
inventoryService.rollbackInventory(order.getProductId(), order.getQuantity());
order.setStatus(OrderStatus.FAILED);
}
} else {
order.setStatus(OrderStatus.FAILED);
}
} catch (Exception e) {
order.setStatus(OrderStatus.FAILED);
// 发布补偿事件
eventPublisher.publishEvent(new OrderCompensationEvent(orderId));
} finally {
orderRepository.save(order);
}
}
// Eventually Consistent: 最终一致性
@EventListener
@Retryable(value = Exception.class, maxAttempts = 3)
public void handleOrderCompleted(OrderCompletedEvent event) {
// 更新相关系统状态
Long orderId = event.getOrderId();
// 更新用户积分(可以失败重试)
try {
userService.addPoints(event.getUserId(), event.getPoints());
} catch (Exception e) {
// 记录失败,后续补偿
compensationService.recordFailedOperation("ADD_POINTS", orderId);
}
// 更新商品销量统计(可以失败重试)
try {
productService.updateSalesCount(event.getProductId(), event.getQuantity());
} catch (Exception e) {
compensationService.recordFailedOperation("UPDATE_SALES", orderId);
}
}
}
2. 分布式事务解决方案
面试问题:分布式事务的实现方案有哪些?各有什么优缺点?
2PC(两阶段提交)实现
// 分布式事务协调器
@Component
public class TwoPhaseCommitCoordinator {
private final List<TransactionParticipant> participants;
private final TransactionLogService logService;
public TwoPhaseCommitCoordinator(List<TransactionParticipant> participants,
TransactionLogService logService) {
this.participants = participants;
this.logService = logService;
}
public boolean executeTransaction(String transactionId, TransactionContext context) {
// 记录事务开始
logService.logTransactionStart(transactionId);
try {
// Phase 1: Prepare阶段
if (!preparePhase(transactionId, context)) {
rollbackAll(transactionId);
return false;
}
// Phase 2: Commit阶段
return commitPhase(transactionId);
} catch (Exception e) {
rollbackAll(transactionId);
return false;
}
}
private boolean preparePhase(String transactionId, TransactionContext context) {
for (TransactionParticipant participant : participants) {
try {
boolean prepared = participant.prepare(transactionId, context);
logService.logParticipantPrepare(transactionId, participant.getName(), prepared);
if (!prepared) {
return false;
}
} catch (Exception e) {
logService.logParticipantError(transactionId, participant.getName(), e);
return false;
}
}
return true;
}
private boolean commitPhase(String transactionId) {
boolean allCommitted = true;
for (TransactionParticipant participant : participants) {
try {
boolean committed = participant.commit(transactionId);
logService.logParticipantCommit(transactionId, participant.getName(), committed);
if (!committed) {
allCommitted = false;
}
} catch (Exception e) {
logService.logParticipantError(transactionId, participant.getName(), e);
allCommitted = false;
}
}
logService.logTransactionEnd(transactionId, allCommitted);
return allCommitted;
}
private void rollbackAll(String transactionId) {
for (TransactionParticipant participant : participants) {
try {
participant.rollback(transactionId);
logService.logParticipantRollback(transactionId, participant.getName());
} catch (Exception e) {
logService.logParticipantError(transactionId, participant.getName(), e);
}
}
}
}
// 事务参与者接口
public interface TransactionParticipant {
String getName();
boolean prepare(String transactionId, TransactionContext context);
boolean commit(String transactionId);
boolean rollback(String transactionId);
}
// 具体参与者实现
@Component
public class InventoryParticipant implements TransactionParticipant {
@Autowired
private InventoryRepository inventoryRepository;
@Autowired
private TransactionCache transactionCache;
@Override
public String getName() {
return "InventoryService";
}
@Override
public boolean prepare(String transactionId, TransactionContext context) {
try {
// 检查库存是否足够
InventoryOperation operation = context.getInventoryOperation();
int currentStock = inventoryRepository.getStock(operation.getProductId());
if (currentStock >= operation.getQuantity()) {
// 预锁定库存
transactionCache.lockInventory(transactionId, operation);
return true;
}
return false;
} catch (Exception e) {
return false;
}
}
@Override
public boolean commit(String transactionId) {
try {
InventoryOperation operation = transactionCache.getInventoryOperation(transactionId);
inventoryRepository.reduceStock(operation.getProductId(), operation.getQuantity());
transactionCache.clearInventoryLock(transactionId);
return true;
} catch (Exception e) {
return false;
}
}
@Override
public boolean rollback(String transactionId) {
try {
transactionCache.clearInventoryLock(transactionId);
return true;
} catch (Exception e) {
return false;
}
}
}
Saga模式实现
// Saga事务管理器
@Service
public class SagaTransactionManager {
private final Map<String, SagaDefinition> sagaDefinitions;
private final SagaLogService sagaLogService;
public SagaTransactionManager(Map<String, SagaDefinition> sagaDefinitions,
SagaLogService sagaLogService) {
this.sagaDefinitions = sagaDefinitions;
this.sagaLogService = sagaLogService;
}
public SagaResult executeSaga(String sagaType, SagaContext context) {
String sagaId = UUID.randomUUID().toString();
SagaDefinition definition = sagaDefinitions.get(sagaType);
if (definition == null) {
return SagaResult.failure("未找到Saga定义: " + sagaType);
}
sagaLogService.logSagaStart(sagaId, sagaType);
List<SagaStep> executedSteps = new ArrayList<>();
try {
// 顺序执行Saga步骤
for (SagaStep step : definition.getSteps()) {
boolean success = executeStep(sagaId, step, context);
executedSteps.add(step);
if (!success) {
// 执行补偿操作
compensate(sagaId, executedSteps, context);
return SagaResult.failure("Saga执行失败,已回滚");
}
}
sagaLogService.logSagaSuccess(sagaId);
return SagaResult.success(sagaId);
} catch (Exception e) {
compensate(sagaId, executedSteps, context);
sagaLogService.logSagaFailure(sagaId, e);
return SagaResult.failure("Saga执行异常: " + e.getMessage());
}
}
private boolean executeStep(String sagaId, SagaStep step, SagaContext context) {
try {
sagaLogService.logStepStart(sagaId, step.getName());
boolean result = step.execute(context);
sagaLogService.logStepResult(sagaId, step.getName(), result);
return result;
} catch (Exception e) {
sagaLogService.logStepError(sagaId, step.getName(), e);
return false;
}
}
private void compensate(String sagaId, List<SagaStep> executedSteps, SagaContext context) {
// 逆序执行补偿操作
Collections.reverse(executedSteps);
for (SagaStep step : executedSteps) {
try {
sagaLogService.logCompensationStart(sagaId, step.getName());
step.compensate(context);
sagaLogService.logCompensationSuccess(sagaId, step.getName());
} catch (Exception e) {
sagaLogService.logCompensationError(sagaId, step.getName(), e);
// 补偿失败,记录日志,可能需要人工介入
}
}
}
}
// Saga步骤定义
public interface SagaStep {
String getName();
boolean execute(SagaContext context);
void compensate(SagaContext context);
}
// 订单Saga步骤实现
@Component
public class CreateOrderStep implements SagaStep {
@Autowired
private OrderService orderService;
@Override
public String getName() {
return "CreateOrder";
}
@Override
public boolean execute(SagaContext context) {
OrderInfo orderInfo = context.getOrderInfo();
Order order = orderService.createOrder(orderInfo);
context.setOrderId(order.getId());
return order != null;
}
@Override
public void compensate(SagaContext context) {
Long orderId = context.getOrderId();
if (orderId != null) {
orderService.cancelOrder(orderId);
}
}
}
@Component
public class ReduceInventoryStep implements SagaStep {
@Autowired
private InventoryService inventoryService;
@Override
public String getName() {
return "ReduceInventory";
}
@Override
public boolean execute(SagaContext context) {
OrderInfo orderInfo = context.getOrderInfo();
return inventoryService.reduceInventory(
orderInfo.getProductId(), orderInfo.getQuantity());
}
@Override
public void compensate(SagaContext context) {
OrderInfo orderInfo = context.getOrderInfo();
inventoryService.addInventory(
orderInfo.getProductId(), orderInfo.getQuantity());
}
}
3. 分布式锁与选主算法
面试问题:分布式锁的实现方式有哪些?Raft算法的基本原理?
Redlock算法实现
@Component
public class RedlockDistributedLock {
private final List<RedisTemplate<String, String>> redisInstances;
private final int quorum;
private final long clockDriftFactor = 10; // 时钟漂移因子
public RedlockDistributedLock(List<RedisTemplate<String, String>> redisInstances) {
this.redisInstances = redisInstances;
this.quorum = redisInstances.size() / 2 + 1;
}
public RedlockResult acquireLock(String resource, String value, long ttl) {
List<RedisLockResult> results = new ArrayList<>();
long startTime = System.currentTimeMillis();
// 尝试在所有Redis实例上获取锁
for (RedisTemplate<String, String> redis : redisInstances) {
RedisLockResult result = acquireLockOnInstance(redis, resource, value, ttl);
results.add(result);
}
long endTime = System.currentTimeMillis();
long elapsedTime = endTime - startTime;
// 计算成功获取锁的实例数
long successCount = results.stream()
.mapToLong(result -> result.isSuccess() ? 1 : 0)
.sum();
// 检查是否满足Redlock条件
if (successCount >= quorum &&
elapsedTime < (ttl - clockDriftFactor)) {
long validityTime = ttl - elapsedTime - clockDriftFactor;
return RedlockResult.success(resource, value, validityTime, results);
} else {
// 释放已获取的锁
releaseLock(resource, value, results);
return RedlockResult.failure("获取锁失败");
}
}
private RedisLockResult acquireLockOnInstance(RedisTemplate<String, String> redis,
String resource, String value, long ttl) {
try {
String script =
"if redis.call('set', KEYS[1], ARGV[1], 'NX', 'PX', ARGV[2]) then " +
" return 1 " +
"else " +
" return 0 " +
"end";
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptText(script);
redisScript.setResultType(Long.class);
long startTime = System.currentTimeMillis();
Long result = redis.execute(redisScript,
Collections.singletonList(resource), value, String.valueOf(ttl));
long endTime = System.currentTimeMillis();
return new RedisLockResult(redis, result == 1, endTime - startTime);
} catch (Exception e) {
return new RedisLockResult(redis, false, 0);
}
}
public void releaseLock(String resource, String value, List<RedisLockResult> results) {
String script =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" return redis.call('del', KEYS[1]) " +
"else " +
" return 0 " +
"end";
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptText(script);
redisScript.setResultType(Long.class);
for (RedisLockResult result : results) {
if (result.isSuccess()) {
try {
result.getRedisTemplate().execute(redisScript,
Collections.singletonList(resource), value);
} catch (Exception e) {
// 记录释放锁失败的情况
System.err.println("释放锁失败: " + e.getMessage());
}
}
}
}
}
基于ZooKeeper的分布式锁
@Component
public class ZookeeperDistributedLock {
private final CuratorFramework client;
private final String lockPath = "/distributed-locks";
public ZookeeperDistributedLock(CuratorFramework client) {
this.client = client;
try {
// 确保根路径存在
if (client.checkExists().forPath(lockPath) == null) {
client.create().creatingParentsIfNeeded().forPath(lockPath);
}
} catch (Exception e) {
throw new RuntimeException("初始化ZooKeeper锁路径失败", e);
}
}
public ZkLockResult acquireLock(String lockName, long timeout, TimeUnit unit) {
String fullPath = lockPath + "/" + lockName;
InterProcessMutex mutex = new InterProcessMutex(client, fullPath);
try {
boolean acquired = mutex.acquire(timeout, unit);
if (acquired) {
return ZkLockResult.success(mutex);
} else {
return ZkLockResult.failure("获取锁超时");
}
} catch (Exception e) {
return ZkLockResult.failure("获取锁异常: " + e.getMessage());
}
}
public void releaseLock(InterProcessMutex mutex) {
try {
if (mutex.isAcquiredInThisProcess()) {
mutex.release();
}
} catch (Exception e) {
System.err.println("释放ZooKeeper锁失败: " + e.getMessage());
}
}
// 分布式读写锁
public class ZkReadWriteLock {
private final InterProcessReadWriteLock rwLock;
public ZkReadWriteLock(String lockName) {
String fullPath = lockPath + "/" + lockName;
this.rwLock = new InterProcessReadWriteLock(client, fullPath);
}
public InterProcessMutex readLock() {
return rwLock.readLock();
}
public InterProcessMutex writeLock() {
return rwLock.writeLock();
}
}
// 领导选举
public class LeaderElection {
private final LeaderSelector leaderSelector;
private final String name;
public LeaderElection(String path, String name, LeaderSelectorListener listener) {
this.name = name;
this.leaderSelector = new LeaderSelector(client, path, listener);
this.leaderSelector.autoRequeue();
}
public void start() {
leaderSelector.start();
}
public void close() {
leaderSelector.close();
}
public boolean hasLeadership() {
return leaderSelector.hasLeadership();
}
}
}
// 领导选举监听器实现
@Component
public class ApplicationLeaderListener implements LeaderSelectorListener {
private final AtomicBoolean isLeader = new AtomicBoolean(false);
@Override
public void takeLeadership(CuratorFramework client) throws Exception {
isLeader.set(true);
System.out.println("成为领导者,开始执行领导者任务");
try {
// 执行领导者专属任务
while (isLeader.get()) {
performLeaderTask();
Thread.sleep(5000);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
isLeader.set(false);
System.out.println("失去领导者身份");
}
}
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
if (newState == ConnectionState.LOST || newState == ConnectionState.SUSPENDED) {
isLeader.set(false);
}
}
private void performLeaderTask() {
// 执行只能由领导者执行的任务
System.out.println("执行领导者任务: " + new Date());
// 例如:
// 1. 清理过期数据
// 2. 生成报表
// 3. 同步数据
// 4. 发送心跳
}
public boolean isLeader() {
return isLeader.get();
}
}
4. 服务发现与负载均衡
面试问题:服务发现的实现方式,负载均衡算法有哪些?
服务注册与发现
// 服务注册中心
@Component
public class ServiceRegistry {
private final Map<String, Set<ServiceInstance>> serviceInstances = new ConcurrentHashMap<>();
private final ScheduledExecutorService healthCheckExecutor = Executors.newScheduledThreadPool(5);
// 注册服务实例
public void registerService(ServiceInstance instance) {
String serviceName = instance.getServiceName();
serviceInstances.computeIfAbsent(serviceName, k -> ConcurrentHashMap.newKeySet())
.add(instance);
// 启动健康检查
startHealthCheck(instance);
System.out.println("服务注册成功: " + serviceName + " - " + instance.getAddress());
}
// 注销服务实例
public void deregisterService(ServiceInstance instance) {
String serviceName = instance.getServiceName();
Set<ServiceInstance> instances = serviceInstances.get(serviceName);
if (instances != null) {
instances.remove(instance);
if (instances.isEmpty()) {
serviceInstances.remove(serviceName);
}
}
System.out.println("服务注销成功: " + serviceName + " - " + instance.getAddress());
}
// 获取服务实例列表
public List<ServiceInstance> getServiceInstances(String serviceName) {
Set<ServiceInstance> instances = serviceInstances.get(serviceName);
return instances != null ? new ArrayList<>(instances) : Collections.emptyList();
}
// 健康检查
private void startHealthCheck(ServiceInstance instance) {
healthCheckExecutor.scheduleWithFixedDelay(() -> {
try {
boolean healthy = performHealthCheck(instance);
instance.setHealthy(healthy);
if (!healthy) {
System.out.println("服务不健康: " + instance.getServiceName() +
" - " + instance.getAddress());
}
} catch (Exception e) {
instance.setHealthy(false);
System.err.println("健康检查异常: " + e.getMessage());
}
}, 10, 30, TimeUnit.SECONDS);
}
private boolean performHealthCheck(ServiceInstance instance) {
try {
// 发送HTTP健康检查请求
String healthUrl = "http://" + instance.getAddress() + ":" +
instance.getPort() + "/health";
RestTemplate restTemplate = new RestTemplate();
ResponseEntity<String> response = restTemplate.getForEntity(healthUrl, String.class);
return response.getStatusCode().is2xxSuccessful();
} catch (Exception e) {
return false;
}
}
}
// 服务实例
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ServiceInstance {
private String serviceName;
private String address;
private int port;
private Map<String, String> metadata;
private boolean healthy = true;
private long registerTime = System.currentTimeMillis();
@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null || getClass() != obj.getClass()) return false;
ServiceInstance that = (ServiceInstance) obj;
return port == that.port &&
Objects.equals(serviceName, that.serviceName) &&
Objects.equals(address, that.address);
}
@Override
public int hashCode() {
return Objects.hash(serviceName, address, port);
}
}
负载均衡算法实现
// 负载均衡器接口
public interface LoadBalancer {
ServiceInstance select(List<ServiceInstance> instances);
}
// 轮询算法
@Component
public class RoundRobinLoadBalancer implements LoadBalancer {
private final AtomicInteger counter = new AtomicInteger(0);
@Override
public ServiceInstance select(List<ServiceInstance> instances) {
if (instances.isEmpty()) {
return null;
}
int index = counter.getAndIncrement() % instances.size();
return instances.get(index);
}
}
// 加权轮询算法
@Component
public class WeightedRoundRobinLoadBalancer implements LoadBalancer {
private final Map<String, AtomicInteger> currentWeights = new ConcurrentHashMap<>();
@Override
public ServiceInstance select(List<ServiceInstance> instances) {
if (instances.isEmpty()) {
return null;
}
// 计算总权重
int totalWeight = instances.stream()
.mapToInt(this::getWeight)
.sum();
ServiceInstance selected = null;
int maxCurrentWeight = 0;
for (ServiceInstance instance : instances) {
String key = getInstanceKey(instance);
int weight = getWeight(instance);
AtomicInteger currentWeight = currentWeights.computeIfAbsent(key,
k -> new AtomicInteger(0));
currentWeight.addAndGet(weight);
if (currentWeight.get() > maxCurrentWeight) {
maxCurrentWeight = currentWeight.get();
selected = instance;
}
}
if (selected != null) {
String selectedKey = getInstanceKey(selected);
currentWeights.get(selectedKey).addAndGet(-totalWeight);
}
return selected;
}
private int getWeight(ServiceInstance instance) {
String weight = instance.getMetadata().get("weight");
return weight != null ? Integer.parseInt(weight) : 1;
}
private String getInstanceKey(ServiceInstance instance) {
return instance.getServiceName() + ":" + instance.getAddress() + ":" + instance.getPort();
}
}
// 最少连接数算法
@Component
public class LeastConnectionsLoadBalancer implements LoadBalancer {
private final Map<String, AtomicInteger> connectionCounts = new ConcurrentHashMap<>();
@Override
public ServiceInstance select(List<ServiceInstance> instances) {
if (instances.isEmpty()) {
return null;
}
ServiceInstance selected = null;
int minConnections = Integer.MAX_VALUE;
for (ServiceInstance instance : instances) {
String key = getInstanceKey(instance);
int connections = connectionCounts.computeIfAbsent(key,
k -> new AtomicInteger(0)).get();
if (connections < minConnections) {
minConnections = connections;
selected = instance;
}
}
// 增加选中实例的连接数
if (selected != null) {
String selectedKey = getInstanceKey(selected);
connectionCounts.get(selectedKey).incrementAndGet();
}
return selected;
}
public void releaseConnection(ServiceInstance instance) {
String key = getInstanceKey(instance);
AtomicInteger count = connectionCounts.get(key);
if (count != null && count.get() > 0) {
count.decrementAndGet();
}
}
private String getInstanceKey(ServiceInstance instance) {
return instance.getServiceName() + ":" + instance.getAddress() + ":" + instance.getPort();
}
}
// 一致性哈希算法
@Component
public class ConsistentHashLoadBalancer implements LoadBalancer {
private static final int VIRTUAL_NODES = 160;
@Override
public ServiceInstance select(List<ServiceInstance> instances) {
if (instances.isEmpty()) {
return null;
}
TreeMap<Integer, ServiceInstance> hashRing = buildHashRing(instances);
// 这里简化处理,实际应该根据请求参数计算hash
int requestHash = ThreadLocalRandom.current().nextInt();
Map.Entry<Integer, ServiceInstance> entry = hashRing.ceilingEntry(requestHash);
if (entry == null) {
entry = hashRing.firstEntry();
}
return entry.getValue();
}
private TreeMap<Integer, ServiceInstance> buildHashRing(List<ServiceInstance> instances) {
TreeMap<Integer, ServiceInstance> hashRing = new TreeMap<>();
for (ServiceInstance instance : instances) {
for (int i = 0; i < VIRTUAL_NODES; i++) {
String virtualNode = getInstanceKey(instance) + "#" + i;
int hash = hash(virtualNode);
hashRing.put(hash, instance);
}
}
return hashRing;
}
private int hash(String key) {
return key.hashCode();
}
private String getInstanceKey(ServiceInstance instance) {
return instance.getServiceName() + ":" + instance.getAddress() + ":" + instance.getPort();
}
}
高频面试题目
1. 理论深度题目
Q: 什么是拜占庭将军问题?PBFT算法的基本原理?
A: 拜占庭将军问题描述分布式系统中节点可能出现任意故障(包括恶意行为)的情况。PBFT(实用拜占庭容错)算法通过三阶段协议(预准备、准备、提交)和超过2/3节点的共识来解决此问题。
Q: 分布式系统中的时钟同步问题如何解决?
A: 主要方法包括:
- 物理时钟同步:NTP协议同步物理时钟
- 逻辑时钟:Lamport时间戳、向量时钟
- 混合时钟:TrueTime(Google Spanner使用)
2. 实战应用题目
Q: 如何设计一个分布式ID生成器?
答题要点:
- Snowflake算法:时间戳+机器ID+序列号
- 数据库自增:多个数据库实例,不同起始值和步长
- UUID:全局唯一但无序
- Redis计数器:INCR命令生成递增ID
Q: 如何实现分布式缓存的一致性?
答题要点:
- 缓存更新策略:先更新数据库,再删除缓存
- 双删策略:删除缓存,更新数据库,再次删除缓存
- 延时双删:考虑主从延迟的双删
- 分布式锁:保证更新操作的原子性
总结
分布式系统面试重点:
- 理论基础:CAP理论、BASE理论、一致性算法
- 分布式事务:2PC、3PC、Saga、TCC模式
- 分布式锁:Redis、ZooKeeper、数据库实现
- 服务治理:服务发现、负载均衡、容错机制
- 系统设计:分库分表、缓存策略、消息队列
建议结合具体的分布式系统项目经验,能够描述系统架构设计和解决的技术难题。