Redis应用实践:从缓存策略到高可用架构的全面解析

Redis(Remote Dictionary Server)作为一款高性能的内存数据结构存储系统,在现代互联网架构中扮演着至关重要的角色。它不仅可以作为数据库、缓存和消息代理使用,还支持多种数据结构,为开发者提供了灵活而强大的数据处理能力。本文将深入探讨Redis的应用实践,从基础的缓存策略到复杂的高可用架构设计,为读者提供全面的Redis应用指南。

Redis基础架构与数据结构

Redis核心特性

内存存储:Redis将数据存储在内存中,提供极高的读写性能。

1
2
3
4
5
6
7
8
9
10
11
12
13
# Redis基本操作示例
redis-cli

# 字符串操作
SET user:1001:name "张三"
GET user:1001:name
SETEX session:abc123 3600 "user_data" # 设置过期时间
INCR page_views # 原子递增
DECR inventory:item123 # 原子递减

# 批量操作
MSET user:1001:name "张三" user:1001:age 25 user:1001:city "北京"
MGET user:1001:name user:1001:age user:1001:city

持久化机制:Redis提供RDB和AOF两种持久化方式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# redis.conf持久化配置

# RDB配置
save 900 1 # 900秒内至少1个key发生变化时保存
save 300 10 # 300秒内至少10个key发生变化时保存
save 60 10000 # 60秒内至少10000个key发生变化时保存

stop-writes-on-bgsave-error yes
rdbcompression yes
rdbchecksum yes
dbfilename dump.rdb
dir /var/lib/redis

# AOF配置
appendonly yes
appendfilename "appendonly.aof"
appendfsync everysec # 每秒同步一次
# appendfsync always # 每次写入都同步(最安全但性能最低)
# appendfsync no # 由操作系统决定同步时机(性能最高但可能丢失数据)

auto-aof-rewrite-percentage 100
auto-aof-rewrite-min-size 64mb

Redis数据结构详解

字符串(String):最基本的数据类型,可以存储字符串、整数或浮点数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 字符串高级操作
SET counter 100
INCRBY counter 50 # 增加50,结果为150
DECRBY counter 30 # 减少30,结果为120

# 位操作
SETBIT online_users 123 1 # 设置用户123在线
GETBIT online_users 123 # 检查用户123是否在线
BITCOUNT online_users # 统计在线用户数

# 字符串范围操作
SET message "Hello Redis World"
GETRANGE message 0 4 # 获取"Hello"
SETRANGE message 6 "Python" # 替换为"Hello Python World"

哈希(Hash):适合存储对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 哈希操作
HSET user:1001 name "张三" age 25 city "北京" email "zhangsan@example.com"
HGET user:1001 name
HGETALL user:1001
HMGET user:1001 name age city

# 哈希数值操作
HINCRBY user:1001 login_count 1 # 登录次数加1
HINCRBYFLOAT user:1001 balance 99.99 # 余额增加99.99

# 哈希字段存在性检查
HEXISTS user:1001 phone
HLEN user:1001 # 获取字段数量
HKEYS user:1001 # 获取所有字段名
HVALS user:1001 # 获取所有字段值

列表(List):有序的字符串列表,支持双端操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 列表操作
LPUSH message_queue "task1" "task2" "task3" # 左侧插入
RPUSH message_queue "task4" "task5" # 右侧插入

LPOP message_queue # 左侧弹出
RPOP message_queue # 右侧弹出

# 阻塞操作(用于实现消息队列)
BLPOP message_queue 10 # 阻塞10秒等待左侧弹出
BRPOP message_queue 10 # 阻塞10秒等待右侧弹出

# 列表范围操作
LRANGE message_queue 0 -1 # 获取所有元素
LRANGE message_queue 0 2 # 获取前3个元素
LTRIM message_queue 0 99 # 保留前100个元素

# 列表长度和索引操作
LLEN message_queue # 获取列表长度
LINDEX message_queue 0 # 获取索引0的元素
LSET message_queue 0 "new_task" # 设置索引0的元素

集合(Set):无序的唯一字符串集合。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 集合操作
SADD tags:article:1001 "Redis" "缓存" "NoSQL" "数据库"
SADD tags:article:1002 "MySQL" "数据库" "SQL" "关系型"

SMEMBERS tags:article:1001 # 获取所有成员
SISMEMBER tags:article:1001 "Redis" # 检查成员是否存在
SCARD tags:article:1001 # 获取集合大小

# 集合运算
SINTER tags:article:1001 tags:article:1002 # 交集
SUNION tags:article:1001 tags:article:1002 # 并集
SDIFF tags:article:1001 tags:article:1002 # 差集

# 随机操作
SRANDMEMBER tags:article:1001 2 # 随机获取2个成员
SPOP tags:article:1001 # 随机弹出一个成员

有序集合(Sorted Set):按分数排序的唯一字符串集合。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 有序集合操作
ZADD leaderboard 1000 "player1" 1500 "player2" 800 "player3" 2000 "player4"

# 范围查询
ZRANGE leaderboard 0 -1 WITHSCORES # 按分数升序获取所有成员
ZREVRANGE leaderboard 0 2 WITHSCORES # 按分数降序获取前3名

# 分数范围查询
ZRANGEBYSCORE leaderboard 1000 1500 WITHSCORES # 获取分数在1000-1500之间的成员
ZCOUNT leaderboard 1000 1500 # 统计分数在1000-1500之间的成员数

# 排名查询
ZRANK leaderboard "player2" # 获取player2的排名(升序)
ZREVRANK leaderboard "player2" # 获取player2的排名(降序)
ZSCORE leaderboard "player2" # 获取player2的分数

# 分数操作
ZINCRBY leaderboard 100 "player1" # 给player1增加100分
ZREM leaderboard "player3" # 移除player3

缓存策略与模式

缓存更新策略

Cache-Aside模式:应用程序负责维护缓存。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Service
public class UserService {

@Autowired
private UserRepository userRepository;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

private static final String USER_CACHE_PREFIX = "user:";
private static final int CACHE_EXPIRE_SECONDS = 3600;

public User getUserById(Long userId) {
String cacheKey = USER_CACHE_PREFIX + userId;

// 1. 先查缓存
User user = (User) redisTemplate.opsForValue().get(cacheKey);
if (user != null) {
return user;
}

// 2. 缓存未命中,查数据库
user = userRepository.findById(userId).orElse(null);
if (user != null) {
// 3. 写入缓存
redisTemplate.opsForValue().set(cacheKey, user, CACHE_EXPIRE_SECONDS, TimeUnit.SECONDS);
}

return user;
}

public void updateUser(User user) {
// 1. 更新数据库
userRepository.save(user);

// 2. 删除缓存(让下次查询时重新加载)
String cacheKey = USER_CACHE_PREFIX + user.getId();
redisTemplate.delete(cacheKey);
}
}

Write-Through模式:缓存和数据库同步更新。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Service
public class WriteThoughCacheService {

@Autowired
private UserRepository userRepository;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Transactional
public void updateUser(User user) {
try {
// 1. 同时更新数据库和缓存
userRepository.save(user);

String cacheKey = "user:" + user.getId();
redisTemplate.opsForValue().set(cacheKey, user, 3600, TimeUnit.SECONDS);

} catch (Exception e) {
// 如果任一操作失败,回滚事务
throw new RuntimeException("Failed to update user", e);
}
}
}

Write-Behind模式:异步更新数据库。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
@Service
public class WriteBehindCacheService {

@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Autowired
private AsyncTaskExecutor taskExecutor;

@Autowired
private UserRepository userRepository;

public void updateUser(User user) {
// 1. 立即更新缓存
String cacheKey = "user:" + user.getId();
redisTemplate.opsForValue().set(cacheKey, user, 3600, TimeUnit.SECONDS);

// 2. 异步更新数据库
taskExecutor.execute(() -> {
try {
Thread.sleep(100); // 批量延迟
userRepository.save(user);
} catch (Exception e) {
log.error("Failed to update user in database", e);
// 可以实现重试机制或将失败的更新放入死信队列
}
});
}

// 定期批量同步
@Scheduled(fixedRate = 30000) // 每30秒执行一次
public void batchSyncToDatabase() {
// 获取需要同步的数据
Set<String> dirtyKeys = redisTemplate.opsForSet().members("dirty_users");

if (dirtyKeys != null && !dirtyKeys.isEmpty()) {
List<User> usersToSync = new ArrayList<>();

for (String key : dirtyKeys) {
User user = (User) redisTemplate.opsForValue().get(key);
if (user != null) {
usersToSync.add(user);
}
}

// 批量更新数据库
if (!usersToSync.isEmpty()) {
userRepository.saveAll(usersToSync);
// 清除脏标记
redisTemplate.opsForSet().remove("dirty_users", dirtyKeys.toArray());
}
}
}
}

缓存穿透、击穿、雪崩解决方案

缓存穿透解决方案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
@Service
public class CachePenetrationSolution {

@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Autowired
private UserRepository userRepository;

// 方案1:缓存空值
public User getUserWithNullCache(Long userId) {
String cacheKey = "user:" + userId;

// 检查缓存
if (redisTemplate.hasKey(cacheKey)) {
Object cached = redisTemplate.opsForValue().get(cacheKey);
if ("NULL".equals(cached)) {
return null; // 缓存的空值
}
return (User) cached;
}

// 查询数据库
User user = userRepository.findById(userId).orElse(null);

if (user != null) {
// 缓存真实数据
redisTemplate.opsForValue().set(cacheKey, user, 3600, TimeUnit.SECONDS);
} else {
// 缓存空值,设置较短的过期时间
redisTemplate.opsForValue().set(cacheKey, "NULL", 300, TimeUnit.SECONDS);
}

return user;
}

// 方案2:布隆过滤器
@Autowired
private BloomFilter<Long> userBloomFilter;

public User getUserWithBloomFilter(Long userId) {
// 先检查布隆过滤器
if (!userBloomFilter.mightContain(userId)) {
return null; // 确定不存在
}

// 可能存在,继续查缓存和数据库
return getUserWithNullCache(userId);
}

// 初始化布隆过滤器
@PostConstruct
public void initBloomFilter() {
List<Long> allUserIds = userRepository.findAllUserIds();
for (Long userId : allUserIds) {
userBloomFilter.put(userId);
}
}
}

缓存击穿解决方案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
@Service
public class CacheBreakdownSolution {

@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Autowired
private UserRepository userRepository;

private final Map<String, Object> lockMap = new ConcurrentHashMap<>();

// 方案1:互斥锁
public User getUserWithMutex(Long userId) {
String cacheKey = "user:" + userId;
String lockKey = "lock:user:" + userId;

// 检查缓存
User user = (User) redisTemplate.opsForValue().get(cacheKey);
if (user != null) {
return user;
}

// 尝试获取分布式锁
Boolean lockAcquired = redisTemplate.opsForValue().setIfAbsent(lockKey, "1", 10, TimeUnit.SECONDS);

if (Boolean.TRUE.equals(lockAcquired)) {
try {
// 再次检查缓存(双重检查)
user = (User) redisTemplate.opsForValue().get(cacheKey);
if (user != null) {
return user;
}

// 查询数据库并更新缓存
user = userRepository.findById(userId).orElse(null);
if (user != null) {
redisTemplate.opsForValue().set(cacheKey, user, 3600, TimeUnit.SECONDS);
}

return user;

} finally {
// 释放锁
redisTemplate.delete(lockKey);
}
} else {
// 未获取到锁,等待一段时间后重试
try {
Thread.sleep(50);
return getUserWithMutex(userId); // 递归重试
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
}
}
}

// 方案2:热点数据永不过期
public User getHotUserData(Long userId) {
String cacheKey = "hot_user:" + userId;

// 获取缓存数据(包含逻辑过期时间)
String cachedData = (String) redisTemplate.opsForValue().get(cacheKey);

if (cachedData != null) {
UserCacheData userCacheData = JSON.parseObject(cachedData, UserCacheData.class);

// 检查逻辑过期时间
if (userCacheData.getExpireTime().isAfter(LocalDateTime.now())) {
return userCacheData.getUser(); // 未过期,直接返回
} else {
// 已过期,异步更新
CompletableFuture.runAsync(() -> {
User freshUser = userRepository.findById(userId).orElse(null);
if (freshUser != null) {
UserCacheData newCacheData = new UserCacheData();
newCacheData.setUser(freshUser);
newCacheData.setExpireTime(LocalDateTime.now().plusHours(1));

redisTemplate.opsForValue().set(cacheKey, JSON.toJSONString(newCacheData));
}
});

return userCacheData.getUser(); // 返回过期数据
}
}

// 缓存不存在,同步加载
User user = userRepository.findById(userId).orElse(null);
if (user != null) {
UserCacheData cacheData = new UserCacheData();
cacheData.setUser(user);
cacheData.setExpireTime(LocalDateTime.now().plusHours(1));

redisTemplate.opsForValue().set(cacheKey, JSON.toJSONString(cacheData));
}

return user;
}

@Data
private static class UserCacheData {
private User user;
private LocalDateTime expireTime;
}
}

缓存雪崩解决方案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
@Service
public class CacheAvalancheSolution {

@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Autowired
private UserRepository userRepository;

private final Random random = new Random();

// 方案1:随机过期时间
public void cacheUserWithRandomExpire(User user) {
String cacheKey = "user:" + user.getId();

// 基础过期时间 + 随机时间(0-300秒)
int expireSeconds = 3600 + random.nextInt(300);

redisTemplate.opsForValue().set(cacheKey, user, expireSeconds, TimeUnit.SECONDS);
}

// 方案2:多级缓存
public User getUserWithMultiLevelCache(Long userId) {
// L1缓存:本地缓存(Caffeine)
User user = localCache.getIfPresent("user:" + userId);
if (user != null) {
return user;
}

// L2缓存:Redis缓存
String cacheKey = "user:" + userId;
user = (User) redisTemplate.opsForValue().get(cacheKey);
if (user != null) {
// 回写到本地缓存
localCache.put("user:" + userId, user);
return user;
}

// L3:数据库
user = userRepository.findById(userId).orElse(null);
if (user != null) {
// 写入各级缓存
localCache.put("user:" + userId, user);
cacheUserWithRandomExpire(user);
}

return user;
}

@Autowired
private Cache<String, User> localCache;

// 方案3:熔断降级
@Autowired
private CircuitBreaker circuitBreaker;

public User getUserWithCircuitBreaker(Long userId) {
return circuitBreaker.executeSupplier(() -> {
// 正常的缓存查询逻辑
return getUserWithMultiLevelCache(userId);
}).recover(throwable -> {
// 降级逻辑:返回默认用户或从备用数据源获取
log.warn("Cache circuit breaker opened, using fallback", throwable);
return getDefaultUser(userId);
});
}

private User getDefaultUser(Long userId) {
// 返回默认用户信息或从备用数据源获取
User defaultUser = new User();
defaultUser.setId(userId);
defaultUser.setName("默认用户");
return defaultUser;
}
}

Redis集群与高可用

Redis主从复制

主从复制配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# 主节点配置 (redis-master.conf)
port 6379
bind 0.0.0.0
protected-mode no

# 持久化配置
save 900 1
save 300 10
save 60 10000
appendonly yes
appendfsync everysec

# 主从复制配置
repl-diskless-sync no
repl-diskless-sync-delay 5
repl-ping-slave-period 10
repl-timeout 60
repl-disable-tcp-nodelay no
repl-backlog-size 1mb
repl-backlog-ttl 3600

# 从节点配置 (redis-slave.conf)
port 6380
bind 0.0.0.0
protected-mode no

# 指定主节点
replicaof 192.168.1.100 6379
# masterauth <master-password> # 如果主节点设置了密码

# 从节点只读
replica-read-only yes

# 从节点优先级(用于故障转移)
replica-priority 100

主从复制监控

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 在主节点查看复制状态
redis-cli -p 6379
INFO replication

# 输出示例:
# role:master
# connected_slaves:2
# slave0:ip=192.168.1.101,port=6380,state=online,offset=1234567,lag=0
# slave1:ip=192.168.1.102,port=6380,state=online,offset=1234567,lag=1

# 在从节点查看复制状态
redis-cli -p 6380
INFO replication

# 输出示例:
# role:slave
# master_host:192.168.1.100
# master_port:6379
# master_link_status:up
# master_last_io_seconds_ago:0
# master_sync_in_progress:0

Redis Sentinel高可用

Sentinel配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# sentinel.conf
port 26379
dir /var/lib/redis

# 监控主节点
sentinel monitor mymaster 192.168.1.100 6379 2

# 主节点密码(如果设置了)
# sentinel auth-pass mymaster <password>

# 故障转移配置
sentinel down-after-milliseconds mymaster 30000
sentinel parallel-syncs mymaster 1
sentinel failover-timeout mymaster 180000

# 通知脚本
sentinel notification-script mymaster /var/redis/notify.sh
sentinel client-reconfig-script mymaster /var/redis/reconfig.sh

# 日志配置
logfile /var/log/redis/sentinel.log
loglevel notice

Sentinel集群部署

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 启动Sentinel集群(至少3个节点)
# 节点1
redis-sentinel /etc/redis/sentinel-1.conf

# 节点2
redis-sentinel /etc/redis/sentinel-2.conf

# 节点3
redis-sentinel /etc/redis/sentinel-3.conf

# 查看Sentinel状态
redis-cli -p 26379
SENTINEL masters
SENTINEL slaves mymaster
SENTINEL sentinels mymaster

# 手动故障转移
SENTINEL failover mymaster

Java客户端Sentinel集成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
@Configuration
public class RedisSentinelConfig {

@Bean
public LettuceConnectionFactory redisConnectionFactory() {
// Sentinel节点配置
RedisSentinelConfiguration sentinelConfig = new RedisSentinelConfiguration()
.master("mymaster")
.sentinel("192.168.1.100", 26379)
.sentinel("192.168.1.101", 26379)
.sentinel("192.168.1.102", 26379);

// 连接池配置
GenericObjectPoolConfig<StatefulRedisConnection<String, String>> poolConfig =
new GenericObjectPoolConfig<>();
poolConfig.setMaxTotal(20);
poolConfig.setMaxIdle(10);
poolConfig.setMinIdle(5);
poolConfig.setTestOnBorrow(true);
poolConfig.setTestOnReturn(true);
poolConfig.setTestWhileIdle(true);

LettucePoolingClientConfiguration clientConfig = LettucePoolingClientConfiguration.builder()
.poolConfig(poolConfig)
.commandTimeout(Duration.ofSeconds(5))
.shutdownTimeout(Duration.ofSeconds(10))
.build();

return new LettuceConnectionFactory(sentinelConfig, clientConfig);
}

@Bean
public RedisTemplate<String, Object> redisTemplate() {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(redisConnectionFactory());

// 序列化配置
Jackson2JsonRedisSerializer<Object> serializer = new Jackson2JsonRedisSerializer<>(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL);
serializer.setObjectMapper(objectMapper);

template.setKeySerializer(new StringRedisSerializer());
template.setHashKeySerializer(new StringRedisSerializer());
template.setValueSerializer(serializer);
template.setHashValueSerializer(serializer);

template.afterPropertiesSet();
return template;
}
}

// Sentinel事件监听
@Component
public class SentinelEventListener {

@EventListener
public void handleSentinelEvent(RedisSentinelFailoverEvent event) {
log.info("Redis Sentinel failover event: old master={}, new master={}",
event.getOldMaster(), event.getNewMaster());

// 可以在这里实现故障转移后的业务逻辑
// 例如:清理本地缓存、通知其他服务等
}
}

Redis Cluster集群

Cluster集群搭建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# 创建6个节点的配置文件(3主3从)
# redis-7000.conf
port 7000
cluster-enabled yes
cluster-config-file nodes-7000.conf
cluster-node-timeout 15000
appendonly yes
dir /var/lib/redis/7000

# redis-7001.conf
port 7001
cluster-enabled yes
cluster-config-file nodes-7001.conf
cluster-node-timeout 15000
appendonly yes
dir /var/lib/redis/7001

# ... 其他节点配置类似

# 启动所有节点
redis-server redis-7000.conf
redis-server redis-7001.conf
redis-server redis-7002.conf
redis-server redis-7003.conf
redis-server redis-7004.conf
redis-server redis-7005.conf

# 创建集群
redis-cli --cluster create 127.0.0.1:7000 127.0.0.1:7001 127.0.0.1:7002 \
127.0.0.1:7003 127.0.0.1:7004 127.0.0.1:7005 --cluster-replicas 1

# 检查集群状态
redis-cli -c -p 7000
CLUSTER NODES
CLUSTER INFO

Cluster集群管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 添加新节点
redis-cli --cluster add-node 127.0.0.1:7006 127.0.0.1:7000

# 重新分配槽位
redis-cli --cluster reshard 127.0.0.1:7000

# 删除节点
redis-cli --cluster del-node 127.0.0.1:7000 <node-id>

# 检查集群健康状态
redis-cli --cluster check 127.0.0.1:7000

# 修复集群
redis-cli --cluster fix 127.0.0.1:7000

# 集群信息查看
redis-cli -c -p 7000
CLUSTER SLOTS # 查看槽位分配
CLUSTER KEYSLOT mykey # 查看key对应的槽位

Java客户端Cluster集成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
@Configuration
public class RedisClusterConfig {

@Bean
public LettuceConnectionFactory redisConnectionFactory() {
// 集群节点配置
RedisClusterConfiguration clusterConfig = new RedisClusterConfiguration()
.clusterNode("192.168.1.100", 7000)
.clusterNode("192.168.1.100", 7001)
.clusterNode("192.168.1.100", 7002)
.clusterNode("192.168.1.101", 7000)
.clusterNode("192.168.1.101", 7001)
.clusterNode("192.168.1.101", 7002);

clusterConfig.setMaxRedirects(3);

// 连接池配置
GenericObjectPoolConfig<StatefulRedisConnection<String, String>> poolConfig =
new GenericObjectPoolConfig<>();
poolConfig.setMaxTotal(50);
poolConfig.setMaxIdle(20);
poolConfig.setMinIdle(10);
poolConfig.setTestOnBorrow(true);

LettucePoolingClientConfiguration clientConfig = LettucePoolingClientConfiguration.builder()
.poolConfig(poolConfig)
.commandTimeout(Duration.ofSeconds(5))
.shutdownTimeout(Duration.ofSeconds(10))
.build();

return new LettuceConnectionFactory(clusterConfig, clientConfig);
}

@Bean
public RedisTemplate<String, Object> redisTemplate() {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(redisConnectionFactory());

// 序列化配置
template.setKeySerializer(new StringRedisSerializer());
template.setHashKeySerializer(new StringRedisSerializer());
template.setValueSerializer(new GenericJackson2JsonRedisSerializer());
template.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());

template.afterPropertiesSet();
return template;
}
}

// 集群操作示例
@Service
public class RedisClusterService {

@Autowired
private RedisTemplate<String, Object> redisTemplate;

// 批量操作(需要考虑跨槽位问题)
public void batchSet(Map<String, Object> keyValues) {
// 使用pipeline提高性能
redisTemplate.executePipelined(new RedisCallback<Object>() {
@Override
public Object doInRedis(RedisConnection connection) throws DataAccessException {
for (Map.Entry<String, Object> entry : keyValues.entrySet()) {
connection.set(
entry.getKey().getBytes(),
serialize(entry.getValue())
);
}
return null;
}
});
}

// 使用Hash Tag确保相关key在同一槽位
public void setRelatedData(String userId, String sessionData, String profileData) {
String userKey = "user:{" + userId + "}:session";
String profileKey = "user:{" + userId + "}:profile";

// 这两个key会被分配到同一个槽位
redisTemplate.opsForValue().set(userKey, sessionData);
redisTemplate.opsForValue().set(profileKey, profileData);
}

private byte[] serialize(Object obj) {
// 实现序列化逻辑
return obj.toString().getBytes();
}
}

Redis性能优化

内存优化策略

内存使用分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 查看内存使用情况
redis-cli
INFO memory

# 关键指标:
# used_memory: 已使用内存
# used_memory_human: 人类可读的已使用内存
# used_memory_rss: 操作系统分配的物理内存
# used_memory_peak: 内存使用峰值
# mem_fragmentation_ratio: 内存碎片率

# 分析大key
redis-cli --bigkeys

# 分析内存使用详情(需要安装redis-rdb-tools)
rdb -c memory dump.rdb

# 查看key的内存使用
MEMORY USAGE mykey

# 查看数据库大小
DBSIZE

内存优化配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# redis.conf内存优化配置

# 最大内存限制
maxmemory 2gb

# 内存淘汰策略
maxmemory-policy allkeys-lru
# 可选策略:
# noeviction: 不淘汰,内存满时返回错误
# allkeys-lru: 在所有key中使用LRU淘汰
# allkeys-lfu: 在所有key中使用LFU淘汰
# volatile-lru: 在设置了过期时间的key中使用LRU淘汰
# volatile-lfu: 在设置了过期时间的key中使用LFU淘汰
# volatile-random: 在设置了过期时间的key中随机淘汰
# volatile-ttl: 淘汰即将过期的key

# 内存采样数量(影响LRU/LFU精度)
maxmemory-samples 5

# 惰性删除(异步删除大key)
lazyfree-lazy-eviction yes
lazyfree-lazy-expire yes
lazyfree-lazy-server-del yes

# 压缩配置
hash-max-ziplist-entries 512
hash-max-ziplist-value 64
list-max-ziplist-size -2
list-compress-depth 0
set-max-intset-entries 512
zset-max-ziplist-entries 128
zset-max-ziplist-value 64

数据结构优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
@Service
public class RedisMemoryOptimization {

@Autowired
private RedisTemplate<String, Object> redisTemplate;

// 优化1:使用Hash存储对象而不是JSON字符串
public void storeUserOptimized(User user) {
String key = "user:" + user.getId();

// 不推荐:存储为JSON字符串
// redisTemplate.opsForValue().set(key, JSON.toJSONString(user));

// 推荐:使用Hash存储
Map<String, Object> userMap = new HashMap<>();
userMap.put("name", user.getName());
userMap.put("age", user.getAge());
userMap.put("email", user.getEmail());
userMap.put("city", user.getCity());

redisTemplate.opsForHash().putAll(key, userMap);
}

// 优化2:使用位图存储布尔值
public void setUserOnlineStatus(Long userId, boolean online) {
String key = "online_users";
redisTemplate.opsForValue().setBit(key, userId, online);
}

public boolean getUserOnlineStatus(Long userId) {
String key = "online_users";
return Boolean.TRUE.equals(redisTemplate.opsForValue().getBit(key, userId));
}

// 优化3:使用HyperLogLog统计唯一访客
public void addUniqueVisitor(String page, String userId) {
String key = "unique_visitors:" + page;
redisTemplate.opsForHyperLogLog().add(key, userId);
}

public long getUniqueVisitorCount(String page) {
String key = "unique_visitors:" + page;
return redisTemplate.opsForHyperLogLog().size(key);
}

// 优化4:批量操作减少网络开销
public void batchSetUsers(List<User> users) {
redisTemplate.executePipelined(new RedisCallback<Object>() {
@Override
public Object doInRedis(RedisConnection connection) throws DataAccessException {
for (User user : users) {
String key = "user:" + user.getId();
Map<byte[], byte[]> userMap = new HashMap<>();
userMap.put("name".getBytes(), user.getName().getBytes());
userMap.put("age".getBytes(), String.valueOf(user.getAge()).getBytes());

connection.hMSet(key.getBytes(), userMap);
connection.expire(key.getBytes(), 3600);
}
return null;
}
});
}

// 优化5:使用压缩算法存储大数据
public void setCompressedData(String key, String data) {
try {
// 使用GZIP压缩
ByteArrayOutputStream baos = new ByteArrayOutputStream();
GZIPOutputStream gzipOut = new GZIPOutputStream(baos);
gzipOut.write(data.getBytes(StandardCharsets.UTF_8));
gzipOut.close();

byte[] compressedData = baos.toByteArray();
redisTemplate.opsForValue().set(key, compressedData);

} catch (IOException e) {
throw new RuntimeException("Failed to compress data", e);
}
}

public String getCompressedData(String key) {
try {
byte[] compressedData = (byte[]) redisTemplate.opsForValue().get(key);
if (compressedData == null) {
return null;
}

// 解压缩
ByteArrayInputStream bais = new ByteArrayInputStream(compressedData);
GZIPInputStream gzipIn = new GZIPInputStream(bais);

ByteArrayOutputStream baos = new ByteArrayOutputStream();
byte[] buffer = new byte[1024];
int len;
while ((len = gzipIn.read(buffer)) != -1) {
baos.write(buffer, 0, len);
}

return baos.toString(StandardCharsets.UTF_8.name());

} catch (IOException e) {
throw new RuntimeException("Failed to decompress data", e);
}
}
}

网络优化

连接池优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@Configuration
public class RedisConnectionOptimization {

@Bean
public LettuceConnectionFactory redisConnectionFactory() {
// 连接池配置
GenericObjectPoolConfig<StatefulRedisConnection<String, String>> poolConfig =
new GenericObjectPoolConfig<>();

// 连接池大小配置
poolConfig.setMaxTotal(50); // 最大连接数
poolConfig.setMaxIdle(20); // 最大空闲连接数
poolConfig.setMinIdle(10); // 最小空闲连接数

// 连接验证配置
poolConfig.setTestOnBorrow(true); // 借用时验证
poolConfig.setTestOnReturn(false); // 归还时验证
poolConfig.setTestWhileIdle(true); // 空闲时验证

// 连接等待配置
poolConfig.setMaxWaitMillis(3000); // 最大等待时间
poolConfig.setBlockWhenExhausted(true); // 连接耗尽时是否阻塞

// 空闲连接检测配置
poolConfig.setTimeBetweenEvictionRunsMillis(30000); // 检测间隔
poolConfig.setMinEvictableIdleTimeMillis(60000); // 最小空闲时间
poolConfig.setNumTestsPerEvictionRun(3); // 每次检测数量

// 客户端配置
LettucePoolingClientConfiguration clientConfig = LettucePoolingClientConfiguration.builder()
.poolConfig(poolConfig)
.commandTimeout(Duration.ofSeconds(5)) // 命令超时
.shutdownTimeout(Duration.ofSeconds(10)) // 关闭超时
.build();

// Redis配置
RedisStandaloneConfiguration redisConfig = new RedisStandaloneConfiguration();
redisConfig.setHostName("localhost");
redisConfig.setPort(6379);
redisConfig.setDatabase(0);

return new LettuceConnectionFactory(redisConfig, clientConfig);
}
}

Pipeline批量操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
@Service
public class RedisPipelineOptimization {

@Autowired
private RedisTemplate<String, Object> redisTemplate;

// 批量设置key-value
public void batchSet(Map<String, Object> keyValues) {
List<Object> results = redisTemplate.executePipelined(new RedisCallback<Object>() {
@Override
public Object doInRedis(RedisConnection connection) throws DataAccessException {
for (Map.Entry<String, Object> entry : keyValues.entrySet()) {
connection.set(
entry.getKey().getBytes(),
serialize(entry.getValue())
);
}
return null;
}
});

log.info("Batch set {} keys, results: {}", keyValues.size(), results.size());
}

// 批量获取key-value
public Map<String, Object> batchGet(List<String> keys) {
List<Object> results = redisTemplate.executePipelined(new RedisCallback<Object>() {
@Override
public Object doInRedis(RedisConnection connection) throws DataAccessException {
for (String key : keys) {
connection.get(key.getBytes());
}
return null;
}
});

Map<String, Object> resultMap = new HashMap<>();
for (int i = 0; i < keys.size(); i++) {
if (results.get(i) != null) {
resultMap.put(keys.get(i), deserialize((byte[]) results.get(i)));
}
}

return resultMap;
}

// 批量删除
public void batchDelete(List<String> keys) {
redisTemplate.executePipelined(new RedisCallback<Object>() {
@Override
public Object doInRedis(RedisConnection connection) throws DataAccessException {
for (String key : keys) {
connection.del(key.getBytes());
}
return null;
}
});
}

// 批量Hash操作
public void batchHashSet(String hashKey, Map<String, Object> fieldValues) {
redisTemplate.executePipelined(new RedisCallback<Object>() {
@Override
public Object doInRedis(RedisConnection connection) throws DataAccessException {
for (Map.Entry<String, Object> entry : fieldValues.entrySet()) {
connection.hSet(
hashKey.getBytes(),
entry.getKey().getBytes(),
serialize(entry.getValue())
);
}
return null;
}
});
}

private byte[] serialize(Object obj) {
// 实现序列化逻辑
return obj.toString().getBytes();
}

private Object deserialize(byte[] data) {
// 实现反序列化逻辑
return new String(data);
}
}

性能监控与调优

性能监控指标

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
@Component
public class RedisPerformanceMonitor {

@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Scheduled(fixedRate = 60000) // 每分钟监控一次
public void monitorRedisPerformance() {
try {
// 获取Redis信息
Properties info = redisTemplate.getConnectionFactory().getConnection().info();

// 内存使用情况
String usedMemory = info.getProperty("used_memory_human");
String usedMemoryPeak = info.getProperty("used_memory_peak_human");
String memFragmentationRatio = info.getProperty("mem_fragmentation_ratio");

// 连接情况
String connectedClients = info.getProperty("connected_clients");
String blockedClients = info.getProperty("blocked_clients");

// 操作统计
String totalCommandsProcessed = info.getProperty("total_commands_processed");
String instantaneousOpsPerSec = info.getProperty("instantaneous_ops_per_sec");

// 键空间统计
String db0Keys = info.getProperty("db0");

// 持久化情况
String lastSaveTime = info.getProperty("last_save_time");
String bgsaveInProgress = info.getProperty("bgsave_in_progress");

// 记录监控数据
log.info("Redis Performance Metrics: " +
"Memory[used={}, peak={}, fragmentation={}], " +
"Connections[clients={}, blocked={}], " +
"Operations[total={}, ops/sec={}], " +
"Keys[db0={}], " +
"Persistence[last_save={}, bgsave_in_progress={}]",
usedMemory, usedMemoryPeak, memFragmentationRatio,
connectedClients, blockedClients,
totalCommandsProcessed, instantaneousOpsPerSec,
db0Keys,
lastSaveTime, bgsaveInProgress);

// 告警检查
checkAlerts(info);

} catch (Exception e) {
log.error("Failed to monitor Redis performance", e);
}
}

private void checkAlerts(Properties info) {
// 内存使用率告警
String maxMemory = info.getProperty("maxmemory");
String usedMemory = info.getProperty("used_memory");

if (maxMemory != null && !"0".equals(maxMemory)) {
long maxMem = Long.parseLong(maxMemory);
long usedMem = Long.parseLong(usedMemory);
double memoryUsagePercent = (double) usedMem / maxMem * 100;

if (memoryUsagePercent > 80) {
log.warn("Redis memory usage is high: {:.2f}%", memoryUsagePercent);
}
}

// 内存碎片率告警
String fragRatio = info.getProperty("mem_fragmentation_ratio");
if (fragRatio != null) {
double fragmentationRatio = Double.parseDouble(fragRatio);
if (fragmentationRatio > 1.5) {
log.warn("Redis memory fragmentation is high: {}", fragmentationRatio);
}
}

// 连接数告警
String connectedClients = info.getProperty("connected_clients");
if (connectedClients != null) {
int clients = Integer.parseInt(connectedClients);
if (clients > 1000) {
log.warn("Redis connected clients is high: {}", clients);
}
}

// 阻塞客户端告警
String blockedClients = info.getProperty("blocked_clients");
if (blockedClients != null) {
int blocked = Integer.parseInt(blockedClients);
if (blocked > 10) {
log.warn("Redis blocked clients detected: {}", blocked);
}
}
}

// 慢查询监控
@Scheduled(fixedRate = 300000) // 每5分钟检查一次
public void monitorSlowQueries() {
try {
RedisConnection connection = redisTemplate.getConnectionFactory().getConnection();

// 获取慢查询日志
List<Object> slowLogs = connection.slowLogGet(10); // 获取最近10条慢查询

if (!slowLogs.isEmpty()) {
log.warn("Found {} slow queries in Redis", slowLogs.size());

for (Object slowLog : slowLogs) {
// 解析慢查询信息
if (slowLog instanceof List) {
List<?> logEntry = (List<?>) slowLog;
if (logEntry.size() >= 4) {
Long id = (Long) logEntry.get(0);
Long timestamp = (Long) logEntry.get(1);
Long duration = (Long) logEntry.get(2);
List<?> command = (List<?>) logEntry.get(3);

log.warn("Slow query [id={}, timestamp={}, duration={}μs, command={}]",
id, timestamp, duration, command);
}
}
}
}

} catch (Exception e) {
log.error("Failed to monitor slow queries", e);
}
}
}

Redis应用场景实践

分布式锁实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
@Component
public class RedisDistributedLock {

@Autowired
private RedisTemplate<String, Object> redisTemplate;

private static final String LOCK_PREFIX = "lock:";
private static final int DEFAULT_EXPIRE_TIME = 30; // 30秒

/**
* 获取分布式锁
* @param lockKey 锁的key
* @param requestId 请求标识
* @param expireTime 过期时间(秒)
* @return 是否获取成功
*/
public boolean tryLock(String lockKey, String requestId, int expireTime) {
String key = LOCK_PREFIX + lockKey;

// 使用SET命令的NX和EX参数实现原子操作
Boolean result = redisTemplate.opsForValue().setIfAbsent(key, requestId, expireTime, TimeUnit.SECONDS);
return Boolean.TRUE.equals(result);
}

/**
* 获取分布式锁(使用默认过期时间)
*/
public boolean tryLock(String lockKey, String requestId) {
return tryLock(lockKey, requestId, DEFAULT_EXPIRE_TIME);
}

/**
* 释放分布式锁
* @param lockKey 锁的key
* @param requestId 请求标识
* @return 是否释放成功
*/
public boolean releaseLock(String lockKey, String requestId) {
String key = LOCK_PREFIX + lockKey;

// 使用Lua脚本确保原子性
String luaScript =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" return redis.call('del', KEYS[1]) " +
"else " +
" return 0 " +
"end";

DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptText(luaScript);
redisScript.setResultType(Long.class);

Long result = redisTemplate.execute(redisScript, Collections.singletonList(key), requestId);
return Long.valueOf(1).equals(result);
}

/**
* 可重入锁实现
*/
public boolean tryReentrantLock(String lockKey, String requestId, int expireTime) {
String key = LOCK_PREFIX + lockKey;

String luaScript =
"local lockKey = KEYS[1] " +
"local requestId = ARGV[1] " +
"local expireTime = ARGV[2] " +
"local lockValue = redis.call('get', lockKey) " +
"if lockValue == false then " +
" redis.call('set', lockKey, requestId, 'EX', expireTime) " +
" return 1 " +
"elseif lockValue == requestId then " +
" redis.call('expire', lockKey, expireTime) " +
" return 1 " +
"else " +
" return 0 " +
"end";

DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptText(luaScript);
redisScript.setResultType(Long.class);

Long result = redisTemplate.execute(redisScript, Collections.singletonList(key), requestId, String.valueOf(expireTime));
return Long.valueOf(1).equals(result);
}

/**
* 自动续期锁
*/
public void renewLock(String lockKey, String requestId, int expireTime) {
String key = LOCK_PREFIX + lockKey;

String luaScript =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" return redis.call('expire', KEYS[1], ARGV[2]) " +
"else " +
" return 0 " +
"end";

DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptText(luaScript);
redisScript.setResultType(Long.class);

redisTemplate.execute(redisScript, Collections.singletonList(key), requestId, String.valueOf(expireTime));
}
}

// 分布式锁使用示例
@Service
public class OrderService {

@Autowired
private RedisDistributedLock distributedLock;

@Autowired
private OrderRepository orderRepository;

public void processOrder(Long orderId) {
String lockKey = "order:" + orderId;
String requestId = UUID.randomUUID().toString();

if (distributedLock.tryLock(lockKey, requestId, 30)) {
try {
// 业务逻辑处理
Order order = orderRepository.findById(orderId).orElse(null);
if (order != null && "PENDING".equals(order.getStatus())) {
order.setStatus("PROCESSING");
orderRepository.save(order);

// 模拟耗时操作
Thread.sleep(5000);

order.setStatus("COMPLETED");
orderRepository.save(order);
}
} catch (Exception e) {
log.error("Failed to process order: " + orderId, e);
} finally {
distributedLock.releaseLock(lockKey, requestId);
}
} else {
log.warn("Failed to acquire lock for order: " + orderId);
}
}
}

限流器实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
@Component
public class RedisRateLimiter {

@Autowired
private RedisTemplate<String, Object> redisTemplate;

/**
* 固定窗口限流
* @param key 限流key
* @param limit 限制次数
* @param window 时间窗口(秒)
* @return 是否允许通过
*/
public boolean isAllowedFixedWindow(String key, int limit, int window) {
String luaScript =
"local key = KEYS[1] " +
"local limit = tonumber(ARGV[1]) " +
"local window = tonumber(ARGV[2]) " +
"local current = redis.call('get', key) " +
"if current == false then " +
" redis.call('set', key, 1, 'EX', window) " +
" return 1 " +
"elseif tonumber(current) < limit then " +
" redis.call('incr', key) " +
" return 1 " +
"else " +
" return 0 " +
"end";

DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptText(luaScript);
redisScript.setResultType(Long.class);

Long result = redisTemplate.execute(redisScript, Collections.singletonList(key), String.valueOf(limit), String.valueOf(window));
return Long.valueOf(1).equals(result);
}

/**
* 滑动窗口限流
* @param key 限流key
* @param limit 限制次数
* @param window 时间窗口(秒)
* @return 是否允许通过
*/
public boolean isAllowedSlidingWindow(String key, int limit, int window) {
long now = System.currentTimeMillis();
long windowStart = now - window * 1000L;

String luaScript =
"local key = KEYS[1] " +
"local limit = tonumber(ARGV[1]) " +
"local windowStart = tonumber(ARGV[2]) " +
"local now = tonumber(ARGV[3]) " +
"redis.call('zremrangebyscore', key, 0, windowStart) " +
"local current = redis.call('zcard', key) " +
"if current < limit then " +
" redis.call('zadd', key, now, now) " +
" redis.call('expire', key, " + window + ") " +
" return 1 " +
"else " +
" return 0 " +
"end";

DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptText(luaScript);
redisScript.setResultType(Long.class);

Long result = redisTemplate.execute(redisScript, Collections.singletonList(key),
String.valueOf(limit), String.valueOf(windowStart), String.valueOf(now));
return Long.valueOf(1).equals(result);
}

/**
* 令牌桶限流
* @param key 限流key
* @param capacity 桶容量
* @param refillRate 令牌补充速率(每秒)
* @param tokens 请求令牌数
* @return 是否允许通过
*/
public boolean isAllowedTokenBucket(String key, int capacity, double refillRate, int tokens) {
long now = System.currentTimeMillis();

String luaScript =
"local key = KEYS[1] " +
"local capacity = tonumber(ARGV[1]) " +
"local refillRate = tonumber(ARGV[2]) " +
"local tokens = tonumber(ARGV[3]) " +
"local now = tonumber(ARGV[4]) " +
"local bucket = redis.call('hmget', key, 'tokens', 'lastRefill') " +
"local currentTokens = tonumber(bucket[1]) or capacity " +
"local lastRefill = tonumber(bucket[2]) or now " +
"local timePassed = (now - lastRefill) / 1000 " +
"local newTokens = math.min(capacity, currentTokens + timePassed * refillRate) " +
"if newTokens >= tokens then " +
" newTokens = newTokens - tokens " +
" redis.call('hmset', key, 'tokens', newTokens, 'lastRefill', now) " +
" redis.call('expire', key, 3600) " +
" return 1 " +
"else " +
" redis.call('hmset', key, 'tokens', newTokens, 'lastRefill', now) " +
" redis.call('expire', key, 3600) " +
" return 0 " +
"end";

DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptText(luaScript);
redisScript.setResultType(Long.class);

Long result = redisTemplate.execute(redisScript, Collections.singletonList(key),
String.valueOf(capacity), String.valueOf(refillRate), String.valueOf(tokens), String.valueOf(now));
return Long.valueOf(1).equals(result);
}
}

// 限流器使用示例
@RestController
public class ApiController {

@Autowired
private RedisRateLimiter rateLimiter;

@GetMapping("/api/data")
public ResponseEntity<?> getData(HttpServletRequest request) {
String clientIp = getClientIp(request);
String rateLimitKey = "rate_limit:" + clientIp;

// 每分钟最多100次请求
if (!rateLimiter.isAllowedSlidingWindow(rateLimitKey, 100, 60)) {
return ResponseEntity.status(HttpStatus.TOO_MANY_REQUESTS)
.body("Rate limit exceeded");
}

// 正常业务逻辑
return ResponseEntity.ok("Data response");
}

private String getClientIp(HttpServletRequest request) {
String xForwardedFor = request.getHeader("X-Forwarded-For");
if (xForwardedFor != null && !xForwardedFor.isEmpty()) {
return xForwardedFor.split(",")[0].trim();
}
return request.getRemoteAddr();
}
}

消息队列实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
@Component
public class RedisMessageQueue {

@Autowired
private RedisTemplate<String, Object> redisTemplate;

/**
* 发送消息到队列
* @param queueName 队列名称
* @param message 消息内容
*/
public void sendMessage(String queueName, Object message) {
try {
String messageJson = JSON.toJSONString(message);
redisTemplate.opsForList().leftPush(queueName, messageJson);
log.info("Message sent to queue {}: {}", queueName, messageJson);
} catch (Exception e) {
log.error("Failed to send message to queue: " + queueName, e);
}
}

/**
* 从队列接收消息(阻塞)
* @param queueName 队列名称
* @param timeout 超时时间(秒)
* @return 消息内容
*/
public String receiveMessage(String queueName, int timeout) {
try {
List<Object> result = redisTemplate.opsForList().rightPop(queueName, timeout, TimeUnit.SECONDS);
if (result != null && !result.isEmpty()) {
return (String) result.get(1);
}
return null;
} catch (Exception e) {
log.error("Failed to receive message from queue: " + queueName, e);
return null;
}
}

/**
* 发送延时消息
* @param queueName 队列名称
* @param message 消息内容
* @param delaySeconds 延时秒数
*/
public void sendDelayedMessage(String queueName, Object message, int delaySeconds) {
try {
String messageJson = JSON.toJSONString(message);
long executeTime = System.currentTimeMillis() + delaySeconds * 1000L;

String delayedQueueName = queueName + ":delayed";
redisTemplate.opsForZSet().add(delayedQueueName, messageJson, executeTime);

log.info("Delayed message sent to queue {}: {}, execute at: {}",
queueName, messageJson, new Date(executeTime));
} catch (Exception e) {
log.error("Failed to send delayed message to queue: " + queueName, e);
}
}

/**
* 处理延时消息(定时任务调用)
* @param queueName 队列名称
*/
public void processDelayedMessages(String queueName) {
try {
String delayedQueueName = queueName + ":delayed";
long now = System.currentTimeMillis();

Set<Object> messages = redisTemplate.opsForZSet().rangeByScore(delayedQueueName, 0, now);

for (Object message : messages) {
// 移动到正常队列
redisTemplate.opsForList().leftPush(queueName, message);
redisTemplate.opsForZSet().remove(delayedQueueName, message);

log.info("Delayed message moved to queue {}: {}", queueName, message);
}
} catch (Exception e) {
log.error("Failed to process delayed messages for queue: " + queueName, e);
}
}

/**
* 获取队列长度
* @param queueName 队列名称
* @return 队列长度
*/
public long getQueueLength(String queueName) {
Long length = redisTemplate.opsForList().size(queueName);
return length != null ? length : 0;
}
}

// 消息消费者
@Component
public class MessageConsumer {

@Autowired
private RedisMessageQueue messageQueue;

@EventListener
@Async
public void consumeOrderMessages() {
String queueName = "order_queue";

while (true) {
try {
String message = messageQueue.receiveMessage(queueName, 10);
if (message != null) {
// 处理消息
processOrderMessage(message);
}
} catch (Exception e) {
log.error("Error consuming messages from queue: " + queueName, e);
try {
Thread.sleep(5000); // 错误时等待5秒
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}
}
}

private void processOrderMessage(String message) {
try {
OrderMessage orderMessage = JSON.parseObject(message, OrderMessage.class);
log.info("Processing order message: {}", orderMessage);

// 业务逻辑处理
// ...

} catch (Exception e) {
log.error("Failed to process order message: " + message, e);
}
}

@Data
private static class OrderMessage {
private Long orderId;
private String action;
private Map<String, Object> data;
}
}

总结与最佳实践

Redis应用核心原则

  1. 合理选择数据结构:根据业务场景选择最适合的Redis数据结构
  2. 设置合理的过期时间:避免内存泄漏,合理利用内存资源
  3. 使用连接池:提高连接复用率,减少连接开销
  4. 批量操作优化:使用Pipeline减少网络往返次数
  5. 监控和告警:建立完善的监控体系,及时发现问题

高可用架构建议

  1. 主从复制:实现数据备份和读写分离
  2. Sentinel集群:自动故障转移和服务发现
  3. Redis Cluster:水平扩展和数据分片
  4. 多级缓存:本地缓存 + Redis缓存 + 数据库
  5. 熔断降级:防止缓存故障影响整个系统

性能优化要点

  1. 内存优化:选择合适的数据结构,使用压缩,设置淘汰策略
  2. 网络优化:使用连接池,批量操作,减少序列化开销
  3. 持久化优化:根据业务需求选择RDB或AOF
  4. 集群优化:合理分片,避免热点key,使用Hash Tag

常见问题解决方案

  1. 缓存穿透:布隆过滤器 + 空值缓存
  2. 缓存击穿:分布式锁 + 热点数据永不过期
  3. 缓存雪崩:随机过期时间 + 多级缓存 + 熔断降级
  4. 数据一致性:延时双删 + 消息队列异步更新

Redis作为现代互联网架构的重要组件,其应用场景广泛且技术深度较深。通过合理的架构设计、性能优化和运维管理,Redis能够为业务系统提供高性能、高可用的数据服务支撑。在实际应用中,需要根据具体的业务场景和技术要求,选择合适的Redis部署方案和优化策略。

版权所有,如有侵权请联系我