分布式缓存架构:构建高性能数据访问层

在现代高并发系统中,数据库往往成为性能瓶颈。随着用户量的增长和业务复杂度的提升,单纯依靠数据库优化已经无法满足系统的性能要求。分布式缓存作为介于应用层和数据层之间的高速数据访问层,能够显著提升系统的响应速度和并发处理能力。本文将深入探讨分布式缓存的架构设计、实现策略以及在实际项目中的最佳实践。

缓存基础理论

缓存的核心价值

性能提升:缓存将热点数据存储在内存中,访问速度比磁盘快几个数量级。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// 缓存性能对比示例
@Service
public class UserService {

@Autowired
private UserRepository userRepository;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

// 不使用缓存的查询(平均响应时间:50-100ms)
public User getUserFromDatabase(Long userId) {
long startTime = System.currentTimeMillis();

User user = userRepository.findById(userId).orElse(null);

long endTime = System.currentTimeMillis();
log.info("Database query time: {} ms", endTime - startTime);

return user;
}

// 使用缓存的查询(平均响应时间:1-5ms)
public User getUserFromCache(Long userId) {
long startTime = System.currentTimeMillis();

String cacheKey = "user:" + userId;
User user = (User) redisTemplate.opsForValue().get(cacheKey);

if (user == null) {
// 缓存未命中,从数据库查询
user = userRepository.findById(userId).orElse(null);
if (user != null) {
// 写入缓存,设置过期时间
redisTemplate.opsForValue().set(cacheKey, user, 30, TimeUnit.MINUTES);
}
}

long endTime = System.currentTimeMillis();
log.info("Cache query time: {} ms", endTime - startTime);

return user;
}
}

减轻数据库压力:通过缓存热点数据,减少对数据库的直接访问。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// 缓存命中率监控
@Component
public class CacheMetricsCollector {

private final AtomicLong cacheHits = new AtomicLong(0);
private final AtomicLong cacheMisses = new AtomicLong(0);
private final AtomicLong databaseQueries = new AtomicLong(0);

public void recordCacheHit() {
cacheHits.incrementAndGet();
}

public void recordCacheMiss() {
cacheMisses.incrementAndGet();
databaseQueries.incrementAndGet();
}

public double getCacheHitRate() {
long hits = cacheHits.get();
long misses = cacheMisses.get();
long total = hits + misses;

return total > 0 ? (double) hits / total : 0.0;
}

public long getDatabaseQueryReduction() {
// 计算缓存减少的数据库查询次数
return cacheHits.get();
}

@Scheduled(fixedRate = 60000) // 每分钟输出一次统计
public void logMetrics() {
double hitRate = getCacheHitRate();
long reduction = getDatabaseQueryReduction();

log.info("Cache Metrics - Hit Rate: {:.2%}, DB Query Reduction: {}",
hitRate, reduction);

// 重置计数器
if (System.currentTimeMillis() % (5 * 60 * 1000) == 0) { // 每5分钟重置
cacheHits.set(0);
cacheMisses.set(0);
databaseQueries.set(0);
}
}
}

缓存模式详解

Cache-Aside模式:应用程序直接管理缓存。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
@Service
public class ProductService {

@Autowired
private ProductRepository productRepository;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Autowired
private CacheMetricsCollector metricsCollector;

private static final String CACHE_PREFIX = "product:";
private static final int CACHE_TTL = 3600; // 1小时

public Product getProduct(Long productId) {
String cacheKey = CACHE_PREFIX + productId;

// 1. 先查缓存
Product product = (Product) redisTemplate.opsForValue().get(cacheKey);

if (product != null) {
metricsCollector.recordCacheHit();
log.debug("Cache hit for product: {}", productId);
return product;
}

// 2. 缓存未命中,查数据库
metricsCollector.recordCacheMiss();
log.debug("Cache miss for product: {}", productId);

product = productRepository.findById(productId).orElse(null);

// 3. 将数据写入缓存
if (product != null) {
redisTemplate.opsForValue().set(cacheKey, product, CACHE_TTL, TimeUnit.SECONDS);
log.debug("Product cached: {}", productId);
}

return product;
}

public Product updateProduct(Product product) {
// 1. 更新数据库
Product updatedProduct = productRepository.save(product);

// 2. 删除缓存(让下次查询时重新加载)
String cacheKey = CACHE_PREFIX + product.getId();
redisTemplate.delete(cacheKey);

log.info("Product updated and cache invalidated: {}", product.getId());

return updatedProduct;
}

public void deleteProduct(Long productId) {
// 1. 删除数据库记录
productRepository.deleteById(productId);

// 2. 删除缓存
String cacheKey = CACHE_PREFIX + productId;
redisTemplate.delete(cacheKey);

log.info("Product deleted and cache cleared: {}", productId);
}
}

Write-Through模式:写操作同时更新缓存和数据库。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
@Service
public class WriteThroughCacheService {

@Autowired
private UserRepository userRepository;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

private static final String USER_CACHE_PREFIX = "user:";
private static final int CACHE_TTL = 1800; // 30分钟

@Transactional
public User saveUser(User user) {
try {
// 1. 保存到数据库
User savedUser = userRepository.save(user);

// 2. 同时更新缓存
String cacheKey = USER_CACHE_PREFIX + savedUser.getId();
redisTemplate.opsForValue().set(cacheKey, savedUser, CACHE_TTL, TimeUnit.SECONDS);

log.info("User saved with write-through cache: {}", savedUser.getId());

return savedUser;

} catch (Exception e) {
log.error("Failed to save user with write-through cache", e);

// 如果缓存更新失败,删除可能的脏数据
String cacheKey = USER_CACHE_PREFIX + user.getId();
redisTemplate.delete(cacheKey);

throw e;
}
}

public User getUser(Long userId) {
String cacheKey = USER_CACHE_PREFIX + userId;

// 先查缓存
User user = (User) redisTemplate.opsForValue().get(cacheKey);

if (user == null) {
// 缓存未命中,查数据库并更新缓存
user = userRepository.findById(userId).orElse(null);
if (user != null) {
redisTemplate.opsForValue().set(cacheKey, user, CACHE_TTL, TimeUnit.SECONDS);
}
}

return user;
}
}

Write-Behind模式:先写缓存,异步写数据库。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
@Service
public class WriteBehindCacheService {

@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Autowired
private UserRepository userRepository;

private final BlockingQueue<WriteOperation> writeQueue = new LinkedBlockingQueue<>();
private final ExecutorService writeExecutor = Executors.newFixedThreadPool(5);

@PostConstruct
public void startWriteBehindProcessor() {
// 启动异步写入处理器
for (int i = 0; i < 3; i++) {
writeExecutor.submit(this::processWriteOperations);
}
}

public User saveUser(User user) {
try {
// 1. 立即更新缓存
String cacheKey = "user:" + user.getId();
redisTemplate.opsForValue().set(cacheKey, user, 30, TimeUnit.MINUTES);

// 2. 将写操作加入队列,异步处理
WriteOperation operation = new WriteOperation(
WriteOperationType.SAVE_USER,
user.getId(),
user
);

if (!writeQueue.offer(operation)) {
log.warn("Write queue is full, performing synchronous write for user: {}", user.getId());
// 队列满时,同步写入数据库
userRepository.save(user);
}

log.info("User cached with write-behind: {}", user.getId());
return user;

} catch (Exception e) {
log.error("Failed to cache user with write-behind", e);
// 缓存失败时,直接写数据库
return userRepository.save(user);
}
}

private void processWriteOperations() {
while (!Thread.currentThread().isInterrupted()) {
try {
WriteOperation operation = writeQueue.take();

switch (operation.getType()) {
case SAVE_USER:
User user = (User) operation.getData();
userRepository.save(user);
log.debug("Async write completed for user: {}", user.getId());
break;
case DELETE_USER:
Long userId = (Long) operation.getData();
userRepository.deleteById(userId);
log.debug("Async delete completed for user: {}", userId);
break;
}

} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
log.error("Error processing write operation", e);
// 可以实现重试逻辑或者将失败的操作记录到死信队列
}
}
}

@PreDestroy
public void shutdown() {
writeExecutor.shutdown();
try {
if (!writeExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
writeExecutor.shutdownNow();
}
} catch (InterruptedException e) {
writeExecutor.shutdownNow();
}
}

private static class WriteOperation {
private final WriteOperationType type;
private final Object key;
private final Object data;

public WriteOperation(WriteOperationType type, Object key, Object data) {
this.type = type;
this.key = key;
this.data = data;
}

// getters...
public WriteOperationType getType() { return type; }
public Object getKey() { return key; }
public Object getData() { return data; }
}

private enum WriteOperationType {
SAVE_USER, DELETE_USER
}
}

Redis分布式缓存实现

Redis集群架构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
// Redis集群配置
@Configuration
public class RedisClusterConfiguration {

@Value("${redis.cluster.nodes}")
private String clusterNodes;

@Value("${redis.cluster.max-redirects:3}")
private int maxRedirects;

@Bean
public LettuceConnectionFactory redisConnectionFactory() {
// 解析集群节点
String[] nodes = clusterNodes.split(",");
Set<RedisNode> redisNodes = Arrays.stream(nodes)
.map(node -> {
String[] parts = node.split(":");
return new RedisNode(parts[0], Integer.parseInt(parts[1]));
})
.collect(Collectors.toSet());

RedisClusterConfiguration clusterConfig = new RedisClusterConfiguration();
clusterConfig.setClusterNodes(redisNodes);
clusterConfig.setMaxRedirects(maxRedirects);

// 连接池配置
GenericObjectPoolConfig<Object> poolConfig = new GenericObjectPoolConfig<>();
poolConfig.setMaxTotal(100);
poolConfig.setMaxIdle(20);
poolConfig.setMinIdle(5);
poolConfig.setTestOnBorrow(true);
poolConfig.setTestOnReturn(true);
poolConfig.setTestWhileIdle(true);

LettucePoolingClientConfiguration clientConfig = LettucePoolingClientConfiguration.builder()
.poolConfig(poolConfig)
.commandTimeout(Duration.ofSeconds(5))
.shutdownTimeout(Duration.ofSeconds(10))
.build();

return new LettuceConnectionFactory(clusterConfig, clientConfig);
}

@Bean
public RedisTemplate<String, Object> redisTemplate(LettuceConnectionFactory connectionFactory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(connectionFactory);

// 序列化配置
Jackson2JsonRedisSerializer<Object> serializer = new Jackson2JsonRedisSerializer<>(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL);
serializer.setObjectMapper(objectMapper);

template.setKeySerializer(new StringRedisSerializer());
template.setHashKeySerializer(new StringRedisSerializer());
template.setValueSerializer(serializer);
template.setHashValueSerializer(serializer);

template.afterPropertiesSet();
return template;
}

@Bean
public RedisScript<Long> rateLimitScript() {
DefaultRedisScript<Long> script = new DefaultRedisScript<>();
script.setScriptText(
"local key = KEYS[1]\n" +
"local limit = tonumber(ARGV[1])\n" +
"local window = tonumber(ARGV[2])\n" +
"local current = redis.call('GET', key)\n" +
"if current == false then\n" +
" redis.call('SET', key, 1)\n" +
" redis.call('EXPIRE', key, window)\n" +
" return 1\n" +
"else\n" +
" local count = tonumber(current)\n" +
" if count < limit then\n" +
" redis.call('INCR', key)\n" +
" return count + 1\n" +
" else\n" +
" return -1\n" +
" end\n" +
"end"
);
script.setResultType(Long.class);
return script;
}
}

分布式锁实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
// Redis分布式锁
@Component
public class RedisDistributedLock {

@Autowired
private RedisTemplate<String, Object> redisTemplate;

private static final String LOCK_PREFIX = "lock:";
private static final String UNLOCK_SCRIPT =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" return redis.call('del', KEYS[1]) " +
"else " +
" return 0 " +
"end";

private final RedisScript<Long> unlockScript;

public RedisDistributedLock() {
DefaultRedisScript<Long> script = new DefaultRedisScript<>();
script.setScriptText(UNLOCK_SCRIPT);
script.setResultType(Long.class);
this.unlockScript = script;
}

public boolean tryLock(String lockKey, String lockValue, long expireTime, TimeUnit timeUnit) {
String key = LOCK_PREFIX + lockKey;

Boolean result = redisTemplate.opsForValue().setIfAbsent(key, lockValue, expireTime, timeUnit);

if (Boolean.TRUE.equals(result)) {
log.debug("Lock acquired: {}", lockKey);
return true;
} else {
log.debug("Lock acquisition failed: {}", lockKey);
return false;
}
}

public boolean unlock(String lockKey, String lockValue) {
String key = LOCK_PREFIX + lockKey;

Long result = redisTemplate.execute(unlockScript, Collections.singletonList(key), lockValue);

if (result != null && result == 1L) {
log.debug("Lock released: {}", lockKey);
return true;
} else {
log.warn("Lock release failed: {}", lockKey);
return false;
}
}

public <T> T executeWithLock(String lockKey, long expireTime, TimeUnit timeUnit,
Supplier<T> action) {
String lockValue = UUID.randomUUID().toString();

if (tryLock(lockKey, lockValue, expireTime, timeUnit)) {
try {
return action.get();
} finally {
unlock(lockKey, lockValue);
}
} else {
throw new RuntimeException("Failed to acquire lock: " + lockKey);
}
}

public void executeWithLock(String lockKey, long expireTime, TimeUnit timeUnit,
Runnable action) {
executeWithLock(lockKey, expireTime, timeUnit, () -> {
action.run();
return null;
});
}
}

// 分布式锁使用示例
@Service
public class InventoryService {

@Autowired
private RedisDistributedLock distributedLock;

@Autowired
private InventoryRepository inventoryRepository;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

public boolean reduceStock(Long productId, int quantity) {
String lockKey = "inventory:" + productId;

return distributedLock.executeWithLock(lockKey, 10, TimeUnit.SECONDS, () -> {
// 在锁保护下执行库存扣减
Inventory inventory = getInventoryFromCache(productId);

if (inventory == null) {
inventory = inventoryRepository.findByProductId(productId);
if (inventory == null) {
throw new RuntimeException("Product not found: " + productId);
}
}

if (inventory.getStock() < quantity) {
throw new RuntimeException("Insufficient stock for product: " + productId);
}

// 扣减库存
inventory.setStock(inventory.getStock() - quantity);

// 更新数据库
inventoryRepository.save(inventory);

// 更新缓存
updateInventoryCache(inventory);

log.info("Stock reduced for product {}: {} -> {}",
productId, inventory.getStock() + quantity, inventory.getStock());

return true;
});
}

private Inventory getInventoryFromCache(Long productId) {
String cacheKey = "inventory:" + productId;
return (Inventory) redisTemplate.opsForValue().get(cacheKey);
}

private void updateInventoryCache(Inventory inventory) {
String cacheKey = "inventory:" + inventory.getProductId();
redisTemplate.opsForValue().set(cacheKey, inventory, 30, TimeUnit.MINUTES);
}
}

缓存一致性保证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
// 缓存一致性管理器
@Component
public class CacheConsistencyManager {

@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Autowired
private MessageProducer messageProducer;

private static final String CACHE_VERSION_PREFIX = "version:";
private static final String CACHE_INVALIDATION_TOPIC = "cache.invalidation";

// 版本号机制保证一致性
public void updateWithVersion(String cacheKey, Object data, String dataVersion) {
String versionKey = CACHE_VERSION_PREFIX + cacheKey;

// 使用Lua脚本保证原子性
String script =
"local currentVersion = redis.call('GET', KEYS[2])\n" +
"if currentVersion == false or currentVersion < ARGV[2] then\n" +
" redis.call('SET', KEYS[1], ARGV[1])\n" +
" redis.call('SET', KEYS[2], ARGV[2])\n" +
" redis.call('EXPIRE', KEYS[1], ARGV[3])\n" +
" redis.call('EXPIRE', KEYS[2], ARGV[3])\n" +
" return 1\n" +
"else\n" +
" return 0\n" +
"end";

DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptText(script);
redisScript.setResultType(Long.class);

Long result = redisTemplate.execute(redisScript,
Arrays.asList(cacheKey, versionKey),
JSON.toJSONString(data), dataVersion, "3600");

if (result != null && result == 1L) {
log.debug("Cache updated with version: {} -> {}", cacheKey, dataVersion);
} else {
log.debug("Cache update skipped due to version conflict: {}", cacheKey);
}
}

// 发布缓存失效消息
public void invalidateCache(String cacheKey, String reason) {
try {
// 本地删除缓存
redisTemplate.delete(cacheKey);
redisTemplate.delete(CACHE_VERSION_PREFIX + cacheKey);

// 发布失效消息到其他节点
CacheInvalidationMessage message = new CacheInvalidationMessage(
cacheKey, reason, System.currentTimeMillis(), getNodeId()
);

messageProducer.publish(CACHE_INVALIDATION_TOPIC, message);

log.info("Cache invalidated and message published: {}", cacheKey);

} catch (Exception e) {
log.error("Failed to invalidate cache: {}", cacheKey, e);
}
}

// 处理缓存失效消息
@EventListener
public void handleCacheInvalidation(CacheInvalidationMessage message) {
// 避免处理自己发出的消息
if (getNodeId().equals(message.getSourceNodeId())) {
return;
}

try {
redisTemplate.delete(message.getCacheKey());
redisTemplate.delete(CACHE_VERSION_PREFIX + message.getCacheKey());

log.info("Cache invalidated by remote message: {}", message.getCacheKey());

} catch (Exception e) {
log.error("Failed to handle cache invalidation message", e);
}
}

private String getNodeId() {
// 返回当前节点的唯一标识
return InetAddress.getLocalHost().getHostName() + ":" +
ManagementFactory.getRuntimeMXBean().getName();
}

// 双写一致性
@Transactional
public void updateDataWithCache(String cacheKey, Object data,
Supplier<Object> databaseUpdate) {
try {
// 1. 先删除缓存
redisTemplate.delete(cacheKey);

// 2. 更新数据库
Object result = databaseUpdate.get();

// 3. 延迟双删(防止并发读取时的脏数据)
CompletableFuture.runAsync(() -> {
try {
Thread.sleep(500); // 延迟500ms
redisTemplate.delete(cacheKey);
log.debug("Delayed cache deletion completed: {}", cacheKey);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("Delayed cache deletion interrupted: {}", cacheKey);
}
});

log.info("Data updated with cache consistency: {}", cacheKey);

} catch (Exception e) {
log.error("Failed to update data with cache consistency", e);
throw e;
}
}
}

// 缓存失效消息
public class CacheInvalidationMessage {
private String cacheKey;
private String reason;
private long timestamp;
private String sourceNodeId;

// 构造函数和getter/setter
public CacheInvalidationMessage(String cacheKey, String reason, long timestamp, String sourceNodeId) {
this.cacheKey = cacheKey;
this.reason = reason;
this.timestamp = timestamp;
this.sourceNodeId = sourceNodeId;
}

// getters and setters...
public String getCacheKey() { return cacheKey; }
public String getReason() { return reason; }
public long getTimestamp() { return timestamp; }
public String getSourceNodeId() { return sourceNodeId; }
}

缓存策略与优化

缓存预热策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
// 缓存预热服务
@Service
public class CacheWarmupService {

@Autowired
private ProductService productService;

@Autowired
private UserService userService;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Autowired
private TaskExecutor taskExecutor;

@EventListener(ApplicationReadyEvent.class)
public void warmupCache() {
log.info("Starting cache warmup...");

CompletableFuture<Void> productWarmup = CompletableFuture.runAsync(
this::warmupProductCache, taskExecutor
);

CompletableFuture<Void> userWarmup = CompletableFuture.runAsync(
this::warmupUserCache, taskExecutor
);

CompletableFuture<Void> configWarmup = CompletableFuture.runAsync(
this::warmupConfigCache, taskExecutor
);

CompletableFuture.allOf(productWarmup, userWarmup, configWarmup)
.thenRun(() -> log.info("Cache warmup completed"))
.exceptionally(throwable -> {
log.error("Cache warmup failed", throwable);
return null;
});
}

private void warmupProductCache() {
try {
log.info("Warming up product cache...");

// 预热热门商品
List<Long> hotProductIds = productService.getHotProductIds(100);

for (Long productId : hotProductIds) {
try {
productService.getProduct(productId);
Thread.sleep(10); // 避免过快的请求
} catch (Exception e) {
log.warn("Failed to warmup product: {}", productId, e);
}
}

log.info("Product cache warmup completed: {} products", hotProductIds.size());

} catch (Exception e) {
log.error("Product cache warmup failed", e);
}
}

private void warmupUserCache() {
try {
log.info("Warming up user cache...");

// 预热活跃用户
List<Long> activeUserIds = userService.getActiveUserIds(1000);

// 分批处理,避免内存压力
int batchSize = 50;
for (int i = 0; i < activeUserIds.size(); i += batchSize) {
int endIndex = Math.min(i + batchSize, activeUserIds.size());
List<Long> batch = activeUserIds.subList(i, endIndex);

for (Long userId : batch) {
try {
userService.getUser(userId);
} catch (Exception e) {
log.warn("Failed to warmup user: {}", userId, e);
}
}

// 批次间暂停
Thread.sleep(100);
}

log.info("User cache warmup completed: {} users", activeUserIds.size());

} catch (Exception e) {
log.error("User cache warmup failed", e);
}
}

private void warmupConfigCache() {
try {
log.info("Warming up config cache...");

// 预热系统配置
Map<String, Object> configs = Map.of(
"system.settings", getSystemSettings(),
"feature.flags", getFeatureFlags(),
"rate.limits", getRateLimits()
);

for (Map.Entry<String, Object> entry : configs.entrySet()) {
redisTemplate.opsForValue().set(
"config:" + entry.getKey(),
entry.getValue(),
24, TimeUnit.HOURS
);
}

log.info("Config cache warmup completed: {} configs", configs.size());

} catch (Exception e) {
log.error("Config cache warmup failed", e);
}
}

// 定时刷新热点数据
@Scheduled(fixedRate = 300000) // 每5分钟
public void refreshHotData() {
try {
// 刷新热门商品缓存
List<Long> currentHotProducts = productService.getHotProductIds(50);

for (Long productId : currentHotProducts) {
// 异步刷新,避免阻塞
CompletableFuture.runAsync(() -> {
try {
Product product = productService.getProductFromDatabase(productId);
if (product != null) {
String cacheKey = "product:" + productId;
redisTemplate.opsForValue().set(cacheKey, product, 30, TimeUnit.MINUTES);
}
} catch (Exception e) {
log.warn("Failed to refresh hot product: {}", productId, e);
}
}, taskExecutor);
}

log.debug("Hot data refresh initiated for {} products", currentHotProducts.size());

} catch (Exception e) {
log.error("Hot data refresh failed", e);
}
}

private Object getSystemSettings() {
// 获取系统设置
return Map.of(
"maintenance.mode", false,
"max.upload.size", "10MB",
"session.timeout", 3600
);
}

private Object getFeatureFlags() {
// 获取功能开关
return Map.of(
"new.ui.enabled", true,
"payment.v2.enabled", false,
"recommendation.enabled", true
);
}

private Object getRateLimits() {
// 获取限流配置
return Map.of(
"api.rate.limit", 1000,
"login.rate.limit", 10,
"search.rate.limit", 100
);
}
}

版权所有,如有侵权请联系我