数据库分库分表实战:大规模数据架构设计与实现

随着业务的快速发展,单一数据库往往无法满足海量数据存储和高并发访问的需求。分库分表作为数据库水平扩展的核心技术,能够有效解决数据库性能瓶颈问题。本文将深入探讨分库分表的设计原则、实现策略和最佳实践。

分库分表基础理论

为什么需要分库分表

单库单表的限制

  1. 存储容量限制:单表数据量过大影响查询性能
  2. 并发访问瓶颈:数据库连接数和锁竞争限制
  3. IO性能瓶颈:磁盘IO成为系统瓶颈
  4. CPU计算瓶颈:复杂查询消耗大量CPU资源
  5. 内存限制:索引和缓存无法全部加载到内存

分库分表的优势

  1. 水平扩展:通过增加数据库实例提升整体性能
  2. 负载分散:将访问压力分散到多个数据库
  3. 故障隔离:单个数据库故障不影响整体服务
  4. 并行处理:支持并行查询和事务处理
  5. 成本优化:可以使用相对便宜的硬件资源

分库分表策略

垂直分库(按业务模块)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
-- 原始单库结构
CREATE DATABASE ecommerce;

-- 用户相关表
CREATE TABLE users (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
username VARCHAR(50) UNIQUE NOT NULL,
email VARCHAR(100) UNIQUE NOT NULL,
password_hash VARCHAR(255) NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE TABLE user_profiles (
user_id BIGINT PRIMARY KEY,
first_name VARCHAR(50),
last_name VARCHAR(50),
phone VARCHAR(20),
address TEXT,
FOREIGN KEY (user_id) REFERENCES users(id)
);

-- 商品相关表
CREATE TABLE products (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
name VARCHAR(200) NOT NULL,
description TEXT,
price DECIMAL(10,2) NOT NULL,
category_id INT,
stock_quantity INT DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE TABLE categories (
id INT PRIMARY KEY AUTO_INCREMENT,
name VARCHAR(100) NOT NULL,
parent_id INT,
level INT DEFAULT 1
);

-- 订单相关表
CREATE TABLE orders (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
user_id BIGINT NOT NULL,
order_number VARCHAR(50) UNIQUE NOT NULL,
total_amount DECIMAL(10,2) NOT NULL,
status ENUM('pending', 'paid', 'shipped', 'delivered', 'cancelled'),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE TABLE order_items (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
order_id BIGINT NOT NULL,
product_id BIGINT NOT NULL,
quantity INT NOT NULL,
unit_price DECIMAL(10,2) NOT NULL,
total_price DECIMAL(10,2) NOT NULL
);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
-- 垂直分库后的结构

-- 用户库 (user_db)
CREATE DATABASE user_db;
USE user_db;

CREATE TABLE users (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
username VARCHAR(50) UNIQUE NOT NULL,
email VARCHAR(100) UNIQUE NOT NULL,
password_hash VARCHAR(255) NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE TABLE user_profiles (
user_id BIGINT PRIMARY KEY,
first_name VARCHAR(50),
last_name VARCHAR(50),
phone VARCHAR(20),
address TEXT
);

-- 商品库 (product_db)
CREATE DATABASE product_db;
USE product_db;

CREATE TABLE products (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
name VARCHAR(200) NOT NULL,
description TEXT,
price DECIMAL(10,2) NOT NULL,
category_id INT,
stock_quantity INT DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE TABLE categories (
id INT PRIMARY KEY AUTO_INCREMENT,
name VARCHAR(100) NOT NULL,
parent_id INT,
level INT DEFAULT 1
);

-- 订单库 (order_db)
CREATE DATABASE order_db;
USE order_db;

CREATE TABLE orders (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
user_id BIGINT NOT NULL, -- 外键关系变为逻辑关系
order_number VARCHAR(50) UNIQUE NOT NULL,
total_amount DECIMAL(10,2) NOT NULL,
status ENUM('pending', 'paid', 'shipped', 'delivered', 'cancelled'),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE TABLE order_items (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
order_id BIGINT NOT NULL,
product_id BIGINT NOT NULL, -- 外键关系变为逻辑关系
quantity INT NOT NULL,
unit_price DECIMAL(10,2) NOT NULL,
total_price DECIMAL(10,2) NOT NULL
);

水平分库分表(按数据特征)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
-- 按用户ID分库分表
-- 分库策略:user_id % 4 确定库
-- 分表策略:(user_id / 4) % 8 确定表

-- 数据库 user_db_0
CREATE DATABASE user_db_0;
USE user_db_0;

-- 用户表分片
CREATE TABLE users_0 (
id BIGINT PRIMARY KEY,
username VARCHAR(50) UNIQUE NOT NULL,
email VARCHAR(100) UNIQUE NOT NULL,
password_hash VARCHAR(255) NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
INDEX idx_username (username),
INDEX idx_email (email)
);

CREATE TABLE users_1 (
id BIGINT PRIMARY KEY,
username VARCHAR(50) UNIQUE NOT NULL,
email VARCHAR(100) UNIQUE NOT NULL,
password_hash VARCHAR(255) NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
INDEX idx_username (username),
INDEX idx_email (email)
);

-- ... users_2 到 users_7

-- 数据库 user_db_1, user_db_2, user_db_3 类似结构

-- 订单表按时间分表
CREATE DATABASE order_db;
USE order_db;

-- 按月分表
CREATE TABLE orders_202401 (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
user_id BIGINT NOT NULL,
order_number VARCHAR(50) UNIQUE NOT NULL,
total_amount DECIMAL(10,2) NOT NULL,
status ENUM('pending', 'paid', 'shipped', 'delivered', 'cancelled'),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
INDEX idx_user_id (user_id),
INDEX idx_order_number (order_number),
INDEX idx_created_at (created_at),
INDEX idx_status (status)
);

CREATE TABLE orders_202402 (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
user_id BIGINT NOT NULL,
order_number VARCHAR(50) UNIQUE NOT NULL,
total_amount DECIMAL(10,2) NOT NULL,
status ENUM('pending', 'paid', 'shipped', 'delivered', 'cancelled'),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
INDEX idx_user_id (user_id),
INDEX idx_order_number (order_number),
INDEX idx_created_at (created_at),
INDEX idx_status (status)
);

-- 继续创建其他月份的表...

分片策略设计

分片键选择原则

1. 高基数原则

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// 好的分片键示例
public class ShardingKeyExamples {

// 用户ID - 高基数,分布均匀
public int getUserShardIndex(Long userId) {
return (int) (userId % SHARD_COUNT);
}

// 订单号 - 高基数,分布均匀
public int getOrderShardIndex(String orderNumber) {
return Math.abs(orderNumber.hashCode()) % SHARD_COUNT;
}

// 复合分片键 - 提高查询效率
public int getCompositeShardIndex(Long userId, String region) {
String compositeKey = userId + "_" + region;
return Math.abs(compositeKey.hashCode()) % SHARD_COUNT;
}
}

// 避免的分片键示例
public class BadShardingKeyExamples {

// 性别 - 基数太低,分布不均
public int getGenderShardIndex(String gender) {
return "male".equals(gender) ? 0 : 1; // 只有两个值
}

// 状态 - 基数低,热点问题
public int getStatusShardIndex(String status) {
switch (status) {
case "active": return 0; // 大部分数据
case "inactive": return 1; // 少量数据
case "deleted": return 2; // 极少数据
default: return 0;
}
}

// 时间戳 - 写入热点问题
public int getTimestampShardIndex(long timestamp) {
// 新数据总是写入最新的分片,造成热点
return (int) ((timestamp / (24 * 60 * 60 * 1000)) % SHARD_COUNT);
}
}

2. 查询友好原则

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Component
public class ShardingStrategy {

private static final int SHARD_COUNT = 16;

/**
* 用户相关数据按用户ID分片
* 优势:用户相关查询都能路由到单个分片
*/
public int getUserDataShard(Long userId) {
return (int) (userId % SHARD_COUNT);
}

/**
* 订单数据复合分片策略
* 按用户ID分片,支持用户维度查询
* 同时考虑时间因素,避免单分片过大
*/
public String getOrderTableName(Long userId, LocalDateTime orderTime) {
int shardIndex = (int) (userId % SHARD_COUNT);
String monthSuffix = orderTime.format(DateTimeFormatter.ofPattern("yyyyMM"));
return String.format("orders_%d_%s", shardIndex, monthSuffix);
}

/**
* 商品数据按类目分片
* 优势:类目相关查询效率高
*/
public int getProductShard(Integer categoryId) {
return categoryId % SHARD_COUNT;
}

/**
* 日志数据按时间分片
* 优势:支持时间范围查询,便于数据归档
*/
public String getLogTableName(LocalDateTime logTime) {
return "logs_" + logTime.format(DateTimeFormatter.ofPattern("yyyyMMdd"));
}
}

分片算法实现

1. 哈希分片

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
@Component
public class HashShardingAlgorithm {

/**
* 简单取模分片
*/
public int simpleModSharding(Long shardingValue, int shardCount) {
return (int) (shardingValue % shardCount);
}

/**
* 一致性哈希分片
* 优势:节点增减时数据迁移量最小
*/
public class ConsistentHashSharding {
private final TreeMap<Long, String> ring = new TreeMap<>();
private final int virtualNodes = 150;

public ConsistentHashSharding(List<String> nodes) {
for (String node : nodes) {
addNode(node);
}
}

private void addNode(String node) {
for (int i = 0; i < virtualNodes; i++) {
String virtualNode = node + "#" + i;
long hash = hash(virtualNode);
ring.put(hash, node);
}
}

public String getNode(String key) {
if (ring.isEmpty()) {
return null;
}

long hash = hash(key);
Map.Entry<Long, String> entry = ring.ceilingEntry(hash);
if (entry == null) {
entry = ring.firstEntry();
}
return entry.getValue();
}

private long hash(String key) {
return Hashing.murmur3_128().hashString(key, StandardCharsets.UTF_8).asLong();
}
}

/**
* 带权重的哈希分片
*/
public class WeightedHashSharding {
private final List<ShardNode> nodes = new ArrayList<>();
private int totalWeight = 0;

public void addNode(String nodeId, int weight) {
nodes.add(new ShardNode(nodeId, weight));
totalWeight += weight;
}

public String getNode(String key) {
long hash = Math.abs(key.hashCode());
int targetWeight = (int) (hash % totalWeight);

int currentWeight = 0;
for (ShardNode node : nodes) {
currentWeight += node.weight;
if (currentWeight > targetWeight) {
return node.nodeId;
}
}
return nodes.get(0).nodeId;
}

private static class ShardNode {
String nodeId;
int weight;

ShardNode(String nodeId, int weight) {
this.nodeId = nodeId;
this.weight = weight;
}
}
}
}

2. 范围分片

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
@Component
public class RangeShardingAlgorithm {

/**
* 时间范围分片
*/
public class TimeRangeSharding {

public String getTableName(LocalDateTime dateTime, String baseTableName) {
// 按月分表
String suffix = dateTime.format(DateTimeFormatter.ofPattern("yyyyMM"));
return baseTableName + "_" + suffix;
}

public List<String> getTableNames(LocalDateTime startTime, LocalDateTime endTime, String baseTableName) {
List<String> tableNames = new ArrayList<>();
LocalDateTime current = startTime.withDayOfMonth(1).withHour(0).withMinute(0).withSecond(0);

while (!current.isAfter(endTime)) {
String tableName = getTableName(current, baseTableName);
tableNames.add(tableName);
current = current.plusMonths(1);
}

return tableNames;
}
}

/**
* 数值范围分片
*/
public class NumericRangeSharding {
private final List<RangeRule> rules = new ArrayList<>();

public NumericRangeSharding() {
// 用户ID范围分片规则
rules.add(new RangeRule(0L, 1000000L, "user_db_0"));
rules.add(new RangeRule(1000001L, 2000000L, "user_db_1"));
rules.add(new RangeRule(2000001L, 3000000L, "user_db_2"));
rules.add(new RangeRule(3000001L, Long.MAX_VALUE, "user_db_3"));
}

public String getDatabase(Long userId) {
for (RangeRule rule : rules) {
if (userId >= rule.minValue && userId <= rule.maxValue) {
return rule.database;
}
}
throw new IllegalArgumentException("No database found for userId: " + userId);
}

private static class RangeRule {
long minValue;
long maxValue;
String database;

RangeRule(long minValue, long maxValue, String database) {
this.minValue = minValue;
this.maxValue = maxValue;
this.database = database;
}
}
}

/**
* 地理位置分片
*/
public class GeographicSharding {
private final Map<String, String> regionMapping = new HashMap<>();

public GeographicSharding() {
regionMapping.put("北京", "db_north");
regionMapping.put("上海", "db_east");
regionMapping.put("广州", "db_south");
regionMapping.put("成都", "db_west");
}

public String getDatabase(String region) {
return regionMapping.getOrDefault(region, "db_default");
}
}
}

分库分表中间件实现

ShardingSphere集成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# application-sharding.yml
spring:
shardingsphere:
datasource:
names: ds0,ds1,ds2,ds3
ds0:
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
jdbc-url: jdbc:mysql://localhost:3306/ecommerce_0?useSSL=false&serverTimezone=UTC
username: root
password: password
ds1:
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
jdbc-url: jdbc:mysql://localhost:3306/ecommerce_1?useSSL=false&serverTimezone=UTC
username: root
password: password
ds2:
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
jdbc-url: jdbc:mysql://localhost:3306/ecommerce_2?useSSL=false&serverTimezone=UTC
username: root
password: password
ds3:
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
jdbc-url: jdbc:mysql://localhost:3306/ecommerce_3?useSSL=false&serverTimezone=UTC
username: root
password: password

rules:
sharding:
tables:
users:
actual-data-nodes: ds$->{0..3}.users_$->{0..7}
database-strategy:
standard:
sharding-column: id
sharding-algorithm-name: user-database-inline
table-strategy:
standard:
sharding-column: id
sharding-algorithm-name: user-table-inline
key-generate-strategy:
column: id
key-generator-name: snowflake

orders:
actual-data-nodes: ds$->{0..3}.orders_$->{202401..202412}
database-strategy:
standard:
sharding-column: user_id
sharding-algorithm-name: order-database-inline
table-strategy:
standard:
sharding-column: created_at
sharding-algorithm-name: order-table-time
key-generate-strategy:
column: id
key-generator-name: snowflake

sharding-algorithms:
user-database-inline:
type: INLINE
props:
algorithm-expression: ds$->{id % 4}

user-table-inline:
type: INLINE
props:
algorithm-expression: users_$->{(id / 4) % 8}

order-database-inline:
type: INLINE
props:
algorithm-expression: ds$->{user_id % 4}

order-table-time:
type: CLASS_BASED
props:
strategy: STANDARD
algorithmClassName: com.example.sharding.OrderTimeShardingAlgorithm

key-generators:
snowflake:
type: SNOWFLAKE
props:
worker-id: 1

props:
sql-show: true
sql-simple: true
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
// 自定义时间分片算法
public class OrderTimeShardingAlgorithm implements StandardShardingAlgorithm<Date> {

@Override
public String doSharding(Collection<String> availableTargetNames,
PreciseShardingValue<Date> shardingValue) {
Date createTime = shardingValue.getValue();
String suffix = new SimpleDateFormat("yyyyMM").format(createTime);

String targetTable = "orders_" + suffix;

// 检查目标表是否存在于可用表名中
for (String tableName : availableTargetNames) {
if (tableName.endsWith(suffix)) {
return tableName;
}
}

// 如果没有找到对应的表,返回默认表或抛出异常
throw new IllegalArgumentException("No table found for date: " + createTime);
}

@Override
public Collection<String> doSharding(Collection<String> availableTargetNames,
RangeShardingValue<Date> shardingValue) {
Set<String> result = new HashSet<>();

Date lowerEndpoint = shardingValue.getValueRange().lowerEndpoint();
Date upperEndpoint = shardingValue.getValueRange().upperEndpoint();

Calendar calendar = Calendar.getInstance();
calendar.setTime(lowerEndpoint);

while (!calendar.getTime().after(upperEndpoint)) {
String suffix = new SimpleDateFormat("yyyyMM").format(calendar.getTime());

for (String tableName : availableTargetNames) {
if (tableName.endsWith(suffix)) {
result.add(tableName);
break;
}
}

calendar.add(Calendar.MONTH, 1);
}

return result;
}

@Override
public void init() {
// 初始化逻辑
}

@Override
public String getType() {
return "ORDER_TIME";
}
}

自定义分片中间件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
@Component
public class CustomShardingDataSource {

private final Map<String, DataSource> dataSourceMap = new HashMap<>();
private final ShardingStrategy shardingStrategy;

public CustomShardingDataSource(ShardingStrategy shardingStrategy) {
this.shardingStrategy = shardingStrategy;
initDataSources();
}

private void initDataSources() {
// 初始化多个数据源
for (int i = 0; i < 4; i++) {
HikariConfig config = new HikariConfig();
config.setJdbcUrl("jdbc:mysql://localhost:3306/ecommerce_" + i);
config.setUsername("root");
config.setPassword("password");
config.setMaximumPoolSize(20);
config.setMinimumIdle(5);
config.setConnectionTimeout(30000);
config.setIdleTimeout(600000);
config.setMaxLifetime(1800000);

HikariDataSource dataSource = new HikariDataSource(config);
dataSourceMap.put("ds" + i, dataSource);
}
}

public DataSource getDataSource(String shardingKey) {
String dataSourceName = shardingStrategy.getDataSourceName(shardingKey);
return dataSourceMap.get(dataSourceName);
}

public Connection getConnection(String shardingKey) throws SQLException {
DataSource dataSource = getDataSource(shardingKey);
return dataSource.getConnection();
}
}

@Service
public class ShardingJdbcTemplate {

private final CustomShardingDataSource shardingDataSource;
private final ShardingStrategy shardingStrategy;

public ShardingJdbcTemplate(CustomShardingDataSource shardingDataSource,
ShardingStrategy shardingStrategy) {
this.shardingDataSource = shardingDataSource;
this.shardingStrategy = shardingStrategy;
}

/**
* 执行分片查询
*/
public <T> List<T> queryForList(String sql, Object[] params,
RowMapper<T> rowMapper, String shardingKey) {
String actualSql = shardingStrategy.rewriteSql(sql, shardingKey);

try (Connection connection = shardingDataSource.getConnection(shardingKey);
PreparedStatement statement = connection.prepareStatement(actualSql)) {

// 设置参数
for (int i = 0; i < params.length; i++) {
statement.setObject(i + 1, params[i]);
}

try (ResultSet resultSet = statement.executeQuery()) {
List<T> results = new ArrayList<>();
while (resultSet.next()) {
results.add(rowMapper.mapRow(resultSet, results.size()));
}
return results;
}
} catch (SQLException e) {
throw new RuntimeException("Failed to execute sharding query", e);
}
}

/**
* 执行分片更新
*/
public int update(String sql, Object[] params, String shardingKey) {
String actualSql = shardingStrategy.rewriteSql(sql, shardingKey);

try (Connection connection = shardingDataSource.getConnection(shardingKey);
PreparedStatement statement = connection.prepareStatement(actualSql)) {

// 设置参数
for (int i = 0; i < params.length; i++) {
statement.setObject(i + 1, params[i]);
}

return statement.executeUpdate();
} catch (SQLException e) {
throw new RuntimeException("Failed to execute sharding update", e);
}
}

/**
* 执行跨分片查询
*/
public <T> List<T> queryAcrossShards(String sql, Object[] params,
RowMapper<T> rowMapper) {
List<T> allResults = new ArrayList<>();
List<String> allShards = shardingStrategy.getAllShards();

// 并行查询所有分片
List<CompletableFuture<List<T>>> futures = allShards.stream()
.map(shard -> CompletableFuture.supplyAsync(() ->
queryForList(sql, params, rowMapper, shard)))
.collect(Collectors.toList());

// 合并结果
for (CompletableFuture<List<T>> future : futures) {
try {
allResults.addAll(future.get());
} catch (Exception e) {
throw new RuntimeException("Failed to execute cross-shard query", e);
}
}

return allResults;
}
}

@Component
public class ShardingStrategy {

private static final int SHARD_COUNT = 4;
private static final int TABLE_COUNT = 8;

public String getDataSourceName(String shardingKey) {
long hash = Math.abs(shardingKey.hashCode());
int shardIndex = (int) (hash % SHARD_COUNT);
return "ds" + shardIndex;
}

public String getTableName(String baseTableName, String shardingKey) {
long hash = Math.abs(shardingKey.hashCode());
int tableIndex = (int) ((hash / SHARD_COUNT) % TABLE_COUNT);
return baseTableName + "_" + tableIndex;
}

public String rewriteSql(String sql, String shardingKey) {
// 简单的SQL重写逻辑
// 实际实现需要更复杂的SQL解析和重写
if (sql.contains("users")) {
String tableName = getTableName("users", shardingKey);
return sql.replace("users", tableName);
}
return sql;
}

public List<String> getAllShards() {
List<String> shards = new ArrayList<>();
for (int i = 0; i < SHARD_COUNT; i++) {
shards.add("ds" + i);
}
return shards;
}
}

数据迁移方案

在线数据迁移

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
@Service
public class OnlineDataMigrationService {

private final JdbcTemplate sourceJdbcTemplate;
private final ShardingJdbcTemplate targetShardingTemplate;
private final RedisTemplate<String, Object> redisTemplate;

/**
* 双写策略迁移
*/
@Component
public class DualWriteStrategy {

private volatile boolean enableDualWrite = false;
private volatile boolean enableReadFromNew = false;

@Transactional
public void insertUser(User user) {
// 写入原库
sourceJdbcTemplate.update(
"INSERT INTO users (username, email, password_hash) VALUES (?, ?, ?)",
user.getUsername(), user.getEmail(), user.getPasswordHash()
);

// 如果启用双写,同时写入新库
if (enableDualWrite) {
try {
String shardingKey = String.valueOf(user.getId());
targetShardingTemplate.update(
"INSERT INTO users (id, username, email, password_hash) VALUES (?, ?, ?, ?)",
new Object[]{user.getId(), user.getUsername(), user.getEmail(), user.getPasswordHash()},
shardingKey
);
} catch (Exception e) {
// 记录双写失败,但不影响主流程
log.error("Dual write failed for user: " + user.getId(), e);
}
}
}

public User findUserById(Long userId) {
String shardingKey = String.valueOf(userId);

if (enableReadFromNew) {
try {
// 从新库读取
List<User> users = targetShardingTemplate.queryForList(
"SELECT * FROM users WHERE id = ?",
new Object[]{userId},
new UserRowMapper(),
shardingKey
);

if (!users.isEmpty()) {
return users.get(0);
}
} catch (Exception e) {
log.error("Read from new database failed, fallback to old", e);
}
}

// 从原库读取
List<User> users = sourceJdbcTemplate.query(
"SELECT * FROM users WHERE id = ?",
new Object[]{userId},
new UserRowMapper()
);

return users.isEmpty() ? null : users.get(0);
}

// 控制迁移流程的方法
public void enableDualWrite() {
this.enableDualWrite = true;
log.info("Dual write enabled");
}

public void enableReadFromNew() {
this.enableReadFromNew = true;
log.info("Read from new database enabled");
}

public void disableDualWrite() {
this.enableDualWrite = false;
log.info("Dual write disabled");
}
}

/**
* 历史数据迁移
*/
@Async
public CompletableFuture<Void> migrateHistoricalData(String tableName,
Long startId, Long endId) {
log.info("Starting migration for table: {}, range: {} - {}", tableName, startId, endId);

int batchSize = 1000;
long currentId = startId;

while (currentId <= endId) {
long batchEndId = Math.min(currentId + batchSize - 1, endId);

try {
migrateBatch(tableName, currentId, batchEndId);

// 记录迁移进度
String progressKey = "migration:progress:" + tableName;
redisTemplate.opsForValue().set(progressKey, currentId);

log.info("Migrated batch: {} - {} for table: {}", currentId, batchEndId, tableName);

// 避免对数据库造成过大压力
Thread.sleep(100);

} catch (Exception e) {
log.error("Migration failed for batch: {} - {}", currentId, batchEndId, e);

// 记录失败的批次,稍后重试
String failedKey = "migration:failed:" + tableName;
redisTemplate.opsForList().leftPush(failedKey, currentId + "-" + batchEndId);
}

currentId = batchEndId + 1;
}

log.info("Migration completed for table: {}", tableName);
return CompletableFuture.completedFuture(null);
}

private void migrateBatch(String tableName, Long startId, Long endId) {
// 从源库查询数据
String selectSql = String.format(
"SELECT * FROM %s WHERE id >= ? AND id <= ? ORDER BY id", tableName
);

List<Map<String, Object>> rows = sourceJdbcTemplate.queryForList(
selectSql, startId, endId
);

// 批量插入到目标分片
for (Map<String, Object> row : rows) {
Long id = (Long) row.get("id");
String shardingKey = String.valueOf(id);

// 构建插入SQL
StringBuilder insertSql = new StringBuilder("INSERT INTO ");
insertSql.append(tableName).append(" (");

StringBuilder valuesSql = new StringBuilder(" VALUES (");
List<Object> values = new ArrayList<>();

boolean first = true;
for (Map.Entry<String, Object> entry : row.entrySet()) {
if (!first) {
insertSql.append(", ");
valuesSql.append(", ");
}
insertSql.append(entry.getKey());
valuesSql.append("?");
values.add(entry.getValue());
first = false;
}

insertSql.append(")").append(valuesSql).append(")");

// 执行插入
targetShardingTemplate.update(
insertSql.toString(),
values.toArray(),
shardingKey
);
}
}

/**
* 数据一致性校验
*/
public void verifyDataConsistency(String tableName, Long startId, Long endId) {
log.info("Starting data consistency verification for table: {}", tableName);

int batchSize = 1000;
long currentId = startId;
int inconsistentCount = 0;

while (currentId <= endId) {
long batchEndId = Math.min(currentId + batchSize - 1, endId);

// 从源库查询
String selectSql = String.format(
"SELECT id, username, email FROM %s WHERE id >= ? AND id <= ?", tableName
);
List<Map<String, Object>> sourceRows = sourceJdbcTemplate.queryForList(
selectSql, currentId, batchEndId
);

// 验证每一行数据
for (Map<String, Object> sourceRow : sourceRows) {
Long id = (Long) sourceRow.get("id");
String shardingKey = String.valueOf(id);

try {
List<Map<String, Object>> targetRows = targetShardingTemplate.queryForList(
"SELECT id, username, email FROM users WHERE id = ?",
new Object[]{id},
(rs, rowNum) -> {
Map<String, Object> row = new HashMap<>();
row.put("id", rs.getLong("id"));
row.put("username", rs.getString("username"));
row.put("email", rs.getString("email"));
return row;
},
shardingKey
);

if (targetRows.isEmpty()) {
log.warn("Data missing in target for id: {}", id);
inconsistentCount++;
} else {
Map<String, Object> targetRow = targetRows.get(0);
if (!sourceRow.equals(targetRow)) {
log.warn("Data inconsistent for id: {}, source: {}, target: {}",
id, sourceRow, targetRow);
inconsistentCount++;
}
}
} catch (Exception e) {
log.error("Verification failed for id: {}", id, e);
inconsistentCount++;
}
}

currentId = batchEndId + 1;
}

log.info("Data consistency verification completed. Inconsistent records: {}", inconsistentCount);
}
}

数据同步工具

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
@Component
public class DataSyncService {

/**
* 基于Binlog的实时同步
*/
@Component
public class BinlogSyncProcessor {

private final ShardingJdbcTemplate shardingTemplate;

public BinlogSyncProcessor(ShardingJdbcTemplate shardingTemplate) {
this.shardingTemplate = shardingTemplate;
}

@EventListener
public void handleInsertEvent(BinlogInsertEvent event) {
try {
String tableName = event.getTableName();
Map<String, Object> data = event.getData();

// 获取分片键
String shardingKey = getShardingKey(tableName, data);

// 构建插入SQL
String sql = buildInsertSql(tableName, data);
Object[] params = data.values().toArray();

// 执行插入
shardingTemplate.update(sql, params, shardingKey);

log.debug("Synced insert event for table: {}, id: {}",
tableName, data.get("id"));

} catch (Exception e) {
log.error("Failed to sync insert event", e);
// 可以将失败的事件放入重试队列
}
}

@EventListener
public void handleUpdateEvent(BinlogUpdateEvent event) {
try {
String tableName = event.getTableName();
Map<String, Object> beforeData = event.getBeforeData();
Map<String, Object> afterData = event.getAfterData();

// 获取分片键
String shardingKey = getShardingKey(tableName, afterData);

// 构建更新SQL
String sql = buildUpdateSql(tableName, afterData, beforeData);
List<Object> params = new ArrayList<>();

// 添加SET子句的参数
for (Map.Entry<String, Object> entry : afterData.entrySet()) {
if (!"id".equals(entry.getKey())) {
params.add(entry.getValue());
}
}

// 添加WHERE子句的参数
params.add(afterData.get("id"));

// 执行更新
shardingTemplate.update(sql, params.toArray(), shardingKey);

log.debug("Synced update event for table: {}, id: {}",
tableName, afterData.get("id"));

} catch (Exception e) {
log.error("Failed to sync update event", e);
}
}

@EventListener
public void handleDeleteEvent(BinlogDeleteEvent event) {
try {
String tableName = event.getTableName();
Map<String, Object> data = event.getData();

// 获取分片键
String shardingKey = getShardingKey(tableName, data);

// 执行删除
String sql = "DELETE FROM " + tableName + " WHERE id = ?";
shardingTemplate.update(sql, new Object[]{data.get("id")}, shardingKey);

log.debug("Synced delete event for table: {}, id: {}",
tableName, data.get("id"));

} catch (Exception e) {
log.error("Failed to sync delete event", e);
}
}

private String getShardingKey(String tableName, Map<String, Object> data) {
// 根据表名和数据确定分片键
if ("users".equals(tableName)) {
return String.valueOf(data.get("id"));
} else if ("orders".equals(tableName)) {
return String.valueOf(data.get("user_id"));
}
return String.valueOf(data.get("id"));
}

private String buildInsertSql(String tableName, Map<String, Object> data) {
StringBuilder sql = new StringBuilder("INSERT INTO ");
sql.append(tableName).append(" (");

StringBuilder values = new StringBuilder(" VALUES (");

boolean first = true;
for (String column : data.keySet()) {
if (!first) {
sql.append(", ");
values.append(", ");
}
sql.append(column);
values.append("?");
first = false;
}

sql.append(")").append(values).append(")");
return sql.toString();
}

private String buildUpdateSql(String tableName, Map<String, Object> afterData,
Map<String, Object> beforeData) {
StringBuilder sql = new StringBuilder("UPDATE ");
sql.append(tableName).append(" SET ");

boolean first = true;
for (Map.Entry<String, Object> entry : afterData.entrySet()) {
if (!"id".equals(entry.getKey())) {
if (!first) {
sql.append(", ");
}
sql.append(entry.getKey()).append(" = ?");
first = false;
}
}

sql.append(" WHERE id = ?");
return sql.toString();
}
}

/**
* 数据补偿机制
*/
@Component
public class DataCompensationService {

@Scheduled(fixedDelay = 60000) // 每分钟执行一次
public void compensateFailedSync() {
// 从Redis获取失败的同步任务
String failedSyncKey = "sync:failed";

while (true) {
String failedTask = (String) redisTemplate.opsForList().rightPop(failedSyncKey);
if (failedTask == null) {
break;
}

try {
// 解析失败任务
SyncTask task = JSON.parseObject(failedTask, SyncTask.class);

// 重新执行同步
retrySyncTask(task);

log.info("Compensated failed sync task: {}", task);

} catch (Exception e) {
log.error("Failed to compensate sync task: {}", failedTask, e);

// 重新放入队列,但增加重试次数
// 如果重试次数过多,可以放入死信队列
}
}
}

private void retrySyncTask(SyncTask task) {
// 根据任务类型执行相应的同步操作
switch (task.getType()) {
case "INSERT":
handleRetryInsert(task);
break;
case "UPDATE":
handleRetryUpdate(task);
break;
case "DELETE":
handleRetryDelete(task);
break;
default:
log.warn("Unknown sync task type: {}", task.getType());
}
}

private void handleRetryInsert(SyncTask task) {
// 实现重试插入逻辑
}

private void handleRetryUpdate(SyncTask task) {
// 实现重试更新逻辑
}

private void handleRetryDelete(SyncTask task) {
// 实现重试删除逻辑
}
}
}

// 同步任务数据结构
@Data
public class SyncTask {
private String type; // INSERT, UPDATE, DELETE
private String tableName;
private Map<String, Object> data;
private Map<String, Object> beforeData; // 用于UPDATE操作
private int retryCount;
private long timestamp;
}

// Binlog事件定义
public abstract class BinlogEvent {
private String tableName;
private long timestamp;

// getters and setters
}

public class BinlogInsertEvent extends BinlogEvent {
private Map<String, Object> data;

// getters and setters
}

public class BinlogUpdateEvent extends BinlogEvent {
private Map<String, Object> beforeData;
private Map<String, Object> afterData;

// getters and setters
}

public class BinlogDeleteEvent extends BinlogEvent {
private Map<String, Object> data;

// getters and setters
}

跨分片查询处理

分布式查询引擎

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
@Service
public class DistributedQueryService {

private final ShardingJdbcTemplate shardingTemplate;
private final ExecutorService queryExecutor;

public DistributedQueryService(ShardingJdbcTemplate shardingTemplate) {
this.shardingTemplate = shardingTemplate;
this.queryExecutor = Executors.newFixedThreadPool(16);
}

/**
* 跨分片聚合查询
*/
public OrderStatistics getOrderStatistics(LocalDateTime startTime, LocalDateTime endTime) {
// 确定需要查询的分片
List<String> targetShards = determineTargetShards(startTime, endTime);

// 并行查询各个分片
List<CompletableFuture<ShardStatistics>> futures = targetShards.stream()
.map(shard -> CompletableFuture.supplyAsync(() ->
queryShardStatistics(shard, startTime, endTime), queryExecutor))
.collect(Collectors.toList());

// 等待所有查询完成并合并结果
List<ShardStatistics> shardResults = futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());

// 聚合统计结果
return aggregateStatistics(shardResults);
}

private List<String> determineTargetShards(LocalDateTime startTime, LocalDateTime endTime) {
Set<String> shards = new HashSet<>();

// 根据时间范围确定需要查询的表
LocalDateTime current = startTime.withDayOfMonth(1);
while (!current.isAfter(endTime)) {
String monthSuffix = current.format(DateTimeFormatter.ofPattern("yyyyMM"));

// 每个月的数据可能分布在多个分片中
for (int i = 0; i < 4; i++) {
shards.add("ds" + i + "_" + monthSuffix);
}

current = current.plusMonths(1);
}

return new ArrayList<>(shards);
}

private ShardStatistics queryShardStatistics(String shard,
LocalDateTime startTime,
LocalDateTime endTime) {
String sql = """
SELECT
COUNT(*) as order_count,
SUM(total_amount) as total_amount,
AVG(total_amount) as avg_amount,
COUNT(DISTINCT user_id) as unique_users
FROM orders
WHERE created_at >= ? AND created_at <= ?
""";

try {
List<ShardStatistics> results = shardingTemplate.queryForList(
sql,
new Object[]{startTime, endTime},
(rs, rowNum) -> {
ShardStatistics stats = new ShardStatistics();
stats.setOrderCount(rs.getLong("order_count"));
stats.setTotalAmount(rs.getBigDecimal("total_amount"));
stats.setAvgAmount(rs.getBigDecimal("avg_amount"));
stats.setUniqueUsers(rs.getLong("unique_users"));
stats.setShardName(shard);
return stats;
},
shard
);

return results.isEmpty() ? new ShardStatistics() : results.get(0);

} catch (Exception e) {
log.error("Failed to query statistics from shard: {}", shard, e);
return new ShardStatistics(); // 返回空统计,避免影响整体查询
}
}

private OrderStatistics aggregateStatistics(List<ShardStatistics> shardResults) {
OrderStatistics result = new OrderStatistics();

long totalOrders = 0;
BigDecimal totalAmount = BigDecimal.ZERO;
Set<Long> allUsers = new HashSet<>();

for (ShardStatistics shard : shardResults) {
totalOrders += shard.getOrderCount();
if (shard.getTotalAmount() != null) {
totalAmount = totalAmount.add(shard.getTotalAmount());
}
// 注意:跨分片的唯一用户数需要特殊处理
}

result.setTotalOrders(totalOrders);
result.setTotalAmount(totalAmount);
result.setAvgAmount(totalOrders > 0 ?
totalAmount.divide(BigDecimal.valueOf(totalOrders), 2, RoundingMode.HALF_UP) :
BigDecimal.ZERO);

return result;
}

/**
* 分页跨分片查询
*/
public PageResult<Order> queryOrdersAcrossShards(OrderQueryRequest request) {
// 确定查询的分片
List<String> targetShards = determineTargetShardsForUser(request.getUserId());

// 计算每个分片需要查询的数量(考虑到排序和分页)
int shardLimit = (request.getPageNumber() + 1) * request.getPageSize();

// 并行查询各个分片
List<CompletableFuture<List<Order>>> futures = targetShards.stream()
.map(shard -> CompletableFuture.supplyAsync(() ->
queryOrdersFromShard(shard, request, shardLimit), queryExecutor))
.collect(Collectors.toList());

// 合并和排序结果
List<Order> allOrders = futures.stream()
.map(CompletableFuture::join)
.flatMap(List::stream)
.sorted((o1, o2) -> o2.getCreatedAt().compareTo(o1.getCreatedAt())) // 按时间倒序
.collect(Collectors.toList());

// 分页处理
int start = request.getPageNumber() * request.getPageSize();
int end = Math.min(start + request.getPageSize(), allOrders.size());

List<Order> pageData = start < allOrders.size() ?
allOrders.subList(start, end) : Collections.emptyList();

// 查询总数(可以缓存以提高性能)
long totalCount = countOrdersAcrossShards(request);

return new PageResult<>(pageData, totalCount, request.getPageNumber(), request.getPageSize());
}

private List<String> determineTargetShardsForUser(Long userId) {
if (userId != null) {
// 如果指定了用户ID,可以精确定位到特定分片
String shardingKey = String.valueOf(userId);
String targetShard = shardingStrategy.getDataSourceName(shardingKey);
return Collections.singletonList(targetShard);
} else {
// 如果没有指定用户ID,需要查询所有分片
return shardingStrategy.getAllShards();
}
}

private List<Order> queryOrdersFromShard(String shard, OrderQueryRequest request, int limit) {
StringBuilder sql = new StringBuilder("SELECT * FROM orders WHERE 1=1");
List<Object> params = new ArrayList<>();

// 添加查询条件
if (request.getUserId() != null) {
sql.append(" AND user_id = ?");
params.add(request.getUserId());
}

if (request.getStatus() != null) {
sql.append(" AND status = ?");
params.add(request.getStatus());
}

if (request.getStartTime() != null) {
sql.append(" AND created_at >= ?");
params.add(request.getStartTime());
}

if (request.getEndTime() != null) {
sql.append(" AND created_at <= ?");
params.add(request.getEndTime());
}

// 添加排序和限制
sql.append(" ORDER BY created_at DESC LIMIT ?");
params.add(limit);

return shardingTemplate.queryForList(
sql.toString(),
params.toArray(),
new OrderRowMapper(),
shard
);
}

private long countOrdersAcrossShards(OrderQueryRequest request) {
List<String> targetShards = determineTargetShardsForUser(request.getUserId());

List<CompletableFuture<Long>> futures = targetShards.stream()
.map(shard -> CompletableFuture.supplyAsync(() ->
countOrdersFromShard(shard, request), queryExecutor))
.collect(Collectors.toList());

return futures.stream()
.map(CompletableFuture::join)
.mapToLong(Long::longValue)
.sum();
}

private long countOrdersFromShard(String shard, OrderQueryRequest request) {
StringBuilder sql = new StringBuilder("SELECT COUNT(*) FROM orders WHERE 1=1");
List<Object> params = new ArrayList<>();

// 添加查询条件(与查询方法保持一致)
if (request.getUserId() != null) {
sql.append(" AND user_id = ?");
params.add(request.getUserId());
}

if (request.getStatus() != null) {
sql.append(" AND status = ?");
params.add(request.getStatus());
}

if (request.getStartTime() != null) {
sql.append(" AND created_at >= ?");
params.add(request.getStartTime());
}

if (request.getEndTime() != null) {
sql.append(" AND created_at <= ?");
params.add(request.getEndTime());
}

List<Long> results = shardingTemplate.queryForList(
sql.toString(),
params.toArray(),
(rs, rowNum) -> rs.getLong(1),
shard
);

return results.isEmpty() ? 0L : results.get(0);
}
}

// 数据传输对象
@Data
public class OrderQueryRequest {
private Long userId;
private String status;
private LocalDateTime startTime;
private LocalDateTime endTime;
private int pageNumber = 0;
private int pageSize = 20;
}

@Data
public class PageResult<T> {
private List<T> data;
private long totalCount;
private int pageNumber;
private int pageSize;
private int totalPages;

public PageResult(List<T> data, long totalCount, int pageNumber, int pageSize) {
this.data = data;
this.totalCount = totalCount;
this.pageNumber = pageNumber;
this.pageSize = pageSize;
this.totalPages = (int) Math.ceil((double) totalCount / pageSize);
}
}

@Data
public class ShardStatistics {
private String shardName;
private long orderCount;
private BigDecimal totalAmount;
private BigDecimal avgAmount;
private long uniqueUsers;
}

@Data
public class OrderStatistics {
private long totalOrders;
private BigDecimal totalAmount;
private BigDecimal avgAmount;
private long uniqueUsers;
}

分布式事务处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
@Service
public class DistributedTransactionService {

/**
* 基于消息的最终一致性
*/
@Component
public class MessageBasedTransaction {

private final RabbitTemplate rabbitTemplate;
private final ShardingJdbcTemplate shardingTemplate;

@Transactional
public void createOrderWithInventoryDeduction(CreateOrderRequest request) {
try {
// 1. 创建订单(本地事务)
Order order = createOrder(request);

// 2. 发送库存扣减消息
InventoryDeductionMessage message = new InventoryDeductionMessage();
message.setOrderId(order.getId());
message.setProductId(request.getProductId());
message.setQuantity(request.getQuantity());
message.setUserId(request.getUserId());

rabbitTemplate.convertAndSend(
"inventory.exchange",
"inventory.deduction",
message
);

log.info("Order created and inventory deduction message sent: {}", order.getId());

} catch (Exception e) {
log.error("Failed to create order", e);
throw new RuntimeException("Order creation failed", e);
}
}

private Order createOrder(CreateOrderRequest request) {
String shardingKey = String.valueOf(request.getUserId());

// 生成订单号
String orderNumber = generateOrderNumber();

// 插入订单
String sql = """
INSERT INTO orders (user_id, order_number, product_id, quantity,
unit_price, total_amount, status, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""";

BigDecimal totalAmount = request.getUnitPrice().multiply(BigDecimal.valueOf(request.getQuantity()));

shardingTemplate.update(
sql,
new Object[]{
request.getUserId(),
orderNumber,
request.getProductId(),
request.getQuantity(),
request.getUnitPrice(),
totalAmount,
"PENDING",
LocalDateTime.now()
},
shardingKey
);

// 查询创建的订单
List<Order> orders = shardingTemplate.queryForList(
"SELECT * FROM orders WHERE order_number = ?",
new Object[]{orderNumber},
new OrderRowMapper(),
shardingKey
);

return orders.get(0);
}

@RabbitListener(queues = "inventory.deduction.result")
public void handleInventoryDeductionResult(InventoryDeductionResult result) {
String shardingKey = String.valueOf(result.getUserId());

if (result.isSuccess()) {
// 库存扣减成功,更新订单状态为已确认
shardingTemplate.update(
"UPDATE orders SET status = 'CONFIRMED' WHERE id = ?",
new Object[]{result.getOrderId()},
shardingKey
);

log.info("Order confirmed: {}", result.getOrderId());
} else {
// 库存扣减失败,取消订单
shardingTemplate.update(
"UPDATE orders SET status = 'CANCELLED', cancel_reason = ? WHERE id = ?",
new Object[]{result.getFailureReason(), result.getOrderId()},
shardingKey
);

log.warn("Order cancelled due to inventory shortage: {}", result.getOrderId());
}
}

private String generateOrderNumber() {
return "ORD" + System.currentTimeMillis() +
String.format("%04d", new Random().nextInt(10000));
}
}

/**
* Saga模式事务协调
*/
@Component
public class SagaTransactionCoordinator {

public void executeOrderSaga(CreateOrderRequest request) {
SagaTransaction saga = SagaTransaction.builder()
.addStep("createOrder",
() -> createOrderStep(request),
orderId -> cancelOrderStep(orderId))
.addStep("deductInventory",
() -> deductInventoryStep(request),
() -> restoreInventoryStep(request))
.addStep("processPayment",
() -> processPaymentStep(request),
() -> refundPaymentStep(request))
.addStep("sendNotification",
() -> sendNotificationStep(request),
() -> {}) // 通知失败不需要补偿
.build();

try {
saga.execute();
log.info("Saga transaction completed successfully");
} catch (SagaExecutionException e) {
log.error("Saga transaction failed, compensation executed", e);
throw new RuntimeException("Order processing failed", e);
}
}

private Long createOrderStep(CreateOrderRequest request) {
// 创建订单的具体实现
Order order = createOrder(request);
return order.getId();
}

private void cancelOrderStep(Long orderId) {
// 取消订单的补偿操作
String shardingKey = getShardingKeyByOrderId(orderId);
shardingTemplate.update(
"UPDATE orders SET status = 'CANCELLED' WHERE id = ?",
new Object[]{orderId},
shardingKey
);
}

private void deductInventoryStep(CreateOrderRequest request) {
// 扣减库存的具体实现
// 这里可能需要调用库存服务的API
}

private void restoreInventoryStep(CreateOrderRequest request) {
// 恢复库存的补偿操作
}

private void processPaymentStep(CreateOrderRequest request) {
// 处理支付的具体实现
}

private void refundPaymentStep(CreateOrderRequest request) {
// 退款的补偿操作
}

private void sendNotificationStep(CreateOrderRequest request) {
// 发送通知的具体实现
}

private String getShardingKeyByOrderId(Long orderId) {
// 根据订单ID获取分片键的逻辑
// 这里需要根据具体的分片策略实现
return String.valueOf(orderId);
}
}
}

// Saga事务框架
public class SagaTransaction {
private final List<SagaStep> steps = new ArrayList<>();
private final List<Object> stepResults = new ArrayList<>();

public static SagaTransactionBuilder builder() {
return new SagaTransactionBuilder();
}

public void execute() throws SagaExecutionException {
int executedSteps = 0;

try {
for (SagaStep step : steps) {
Object result = step.getAction().call();
stepResults.add(result);
executedSteps++;

log.debug("Saga step '{}' executed successfully", step.getName());
}
} catch (Exception e) {
log.error("Saga step failed, starting compensation", e);

// 执行补偿操作(逆序)
for (int i = executedSteps - 1; i >= 0; i--) {
try {
SagaStep step = steps.get(i);
Object stepResult = stepResults.get(i);

if (step.getCompensation() != null) {
step.getCompensation().accept(stepResult);
log.debug("Compensation for step '{}' executed", step.getName());
}
} catch (Exception compensationError) {
log.error("Compensation failed for step: {}", steps.get(i).getName(), compensationError);
}
}

throw new SagaExecutionException("Saga execution failed", e);
}
}

public static class SagaTransactionBuilder {
private final SagaTransaction saga = new SagaTransaction();

public <T> SagaTransactionBuilder addStep(String name,
Callable<T> action,
Consumer<T> compensation) {
saga.steps.add(new SagaStep(name, action, compensation));
return this;
}

public SagaTransaction build() {
return saga;
}
}

private static class SagaStep {
private final String name;
private final Callable<Object> action;
private final Consumer<Object> compensation;

public SagaStep(String name, Callable<?> action, Consumer<?> compensation) {
this.name = name;
this.action = (Callable<Object>) action;
this.compensation = (Consumer<Object>) compensation;
}

// getters
public String getName() { return name; }
public Callable<Object> getAction() { return action; }
public Consumer<Object> getCompensation() { return compensation; }
}
}

public class SagaExecutionException extends Exception {
public SagaExecutionException(String message, Throwable cause) {
super(message, cause);
}
}

性能优化与监控

查询性能优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
@Component
public class ShardingPerformanceOptimizer {

/**
* 查询缓存优化
*/
@Component
public class QueryCacheOptimizer {

private final RedisTemplate<String, Object> redisTemplate;
private final ShardingJdbcTemplate shardingTemplate;

@Cacheable(value = "user_cache", key = "#userId")
public User getUserById(Long userId) {
String shardingKey = String.valueOf(userId);

List<User> users = shardingTemplate.queryForList(
"SELECT * FROM users WHERE id = ?",
new Object[]{userId},
new UserRowMapper(),
shardingKey
);

return users.isEmpty() ? null : users.get(0);
}

/**
* 批量查询优化
*/
public Map<Long, User> getUsersByIds(List<Long> userIds) {
if (userIds.isEmpty()) {
return Collections.emptyMap();
}

// 按分片分组
Map<String, List<Long>> shardGroups = userIds.stream()
.collect(Collectors.groupingBy(id ->
shardingStrategy.getDataSourceName(String.valueOf(id))));

// 并行查询各个分片
List<CompletableFuture<Map<Long, User>>> futures = shardGroups.entrySet().stream()
.map(entry -> CompletableFuture.supplyAsync(() ->
queryUsersFromShard(entry.getKey(), entry.getValue())))
.collect(Collectors.toList());

// 合并结果
Map<Long, User> result = new HashMap<>();
for (CompletableFuture<Map<Long, User>> future : futures) {
result.putAll(future.join());
}

return result;
}

private Map<Long, User> queryUsersFromShard(String shard, List<Long> userIds) {
if (userIds.isEmpty()) {
return Collections.emptyMap();
}

// 构建IN查询
String placeholders = userIds.stream()
.map(id -> "?")
.collect(Collectors.joining(", "));

String sql = "SELECT * FROM users WHERE id IN (" + placeholders + ")";

List<User> users = shardingTemplate.queryForList(
sql,
userIds.toArray(),
new UserRowMapper(),
shard
);

return users.stream()
.collect(Collectors.toMap(User::getId, Function.identity()));
}

/**
* 预热缓存
*/
@EventListener(ApplicationReadyEvent.class)
public void warmUpCache() {
log.info("Starting cache warm-up");

// 预热热点用户数据
List<Long> hotUserIds = getHotUserIds();

hotUserIds.parallelStream().forEach(userId -> {
try {
getUserById(userId);
} catch (Exception e) {
log.warn("Failed to warm up cache for user: {}", userId, e);
}
});

log.info("Cache warm-up completed");
}

private List<Long> getHotUserIds() {
// 从Redis或其他地方获取热点用户ID列表
String hotUsersKey = "hot_users";
Set<Object> hotUserSet = redisTemplate.opsForSet().members(hotUsersKey);

return hotUserSet.stream()
.map(obj -> Long.valueOf(obj.toString()))
.collect(Collectors.toList());
}
}

/**
* 连接池优化
*/
@Configuration
public class DataSourceOptimizationConfig {

@Bean
@ConfigurationProperties("spring.datasource.hikari")
public HikariConfig hikariConfig() {
HikariConfig config = new HikariConfig();

// 连接池大小优化
config.setMaximumPoolSize(20); // 根据并发量调整
config.setMinimumIdle(5); // 保持最小连接数

// 连接超时优化
config.setConnectionTimeout(30000); // 30秒
config.setIdleTimeout(600000); // 10分钟
config.setMaxLifetime(1800000); // 30分钟

// 连接测试优化
config.setConnectionTestQuery("SELECT 1");
config.setValidationTimeout(5000);

// 性能优化参数
config.addDataSourceProperty("cachePrepStmts", "true");
config.addDataSourceProperty("prepStmtCacheSize", "250");
config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");
config.addDataSourceProperty("useServerPrepStmts", "true");
config.addDataSourceProperty("useLocalSessionState", "true");
config.addDataSourceProperty("rewriteBatchedStatements", "true");
config.addDataSourceProperty("cacheResultSetMetadata", "true");
config.addDataSourceProperty("cacheServerConfiguration", "true");
config.addDataSourceProperty("elideSetAutoCommits", "true");
config.addDataSourceProperty("maintainTimeStats", "false");

return config;
}
}
}

监控与告警

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
@Component
public class ShardingMonitoringService {

private final MeterRegistry meterRegistry;
private final ShardingJdbcTemplate shardingTemplate;

/**
* 分片性能监控
*/
@Component
public class ShardPerformanceMonitor {

@EventListener
public void handleQueryEvent(ShardQueryEvent event) {
// 记录查询延迟
Timer.Sample sample = Timer.start(meterRegistry);
sample.stop(Timer.builder("shard.query.duration")
.tag("shard", event.getShardName())
.tag("table", event.getTableName())
.register(meterRegistry));

// 记录查询计数
Counter.builder("shard.query.count")
.tag("shard", event.getShardName())
.tag("table", event.getTableName())
.tag("status", event.isSuccess() ? "success" : "failure")
.register(meterRegistry)
.increment();

// 记录慢查询
if (event.getDuration() > 1000) { // 超过1秒的查询
Counter.builder("shard.slow.query.count")
.tag("shard", event.getShardName())
.tag("table", event.getTableName())
.register(meterRegistry)
.increment();

log.warn("Slow query detected: shard={}, table={}, duration={}ms, sql={}",
event.getShardName(), event.getTableName(), event.getDuration(), event.getSql());
}
}

@Scheduled(fixedRate = 60000) // 每分钟执行
public void collectShardMetrics() {
List<String> allShards = shardingStrategy.getAllShards();

for (String shard : allShards) {
try {
collectShardConnectionMetrics(shard);
collectShardTableMetrics(shard);
} catch (Exception e) {
log.error("Failed to collect metrics for shard: {}", shard, e);
}
}
}

private void collectShardConnectionMetrics(String shard) {
// 获取连接池状态
DataSource dataSource = shardingDataSource.getDataSource(shard);
if (dataSource instanceof HikariDataSource) {
HikariDataSource hikariDS = (HikariDataSource) dataSource;
HikariPoolMXBean poolBean = hikariDS.getHikariPoolMXBean();

Gauge.builder("shard.connection.active")
.tag("shard", shard)
.register(meterRegistry, poolBean, HikariPoolMXBean::getActiveConnections);

Gauge.builder("shard.connection.idle")
.tag("shard", shard)
.register(meterRegistry, poolBean, HikariPoolMXBean::getIdleConnections);

Gauge.builder("shard.connection.total")
.tag("shard", shard)
.register(meterRegistry, poolBean, HikariPoolMXBean::getTotalConnections);

Gauge.builder("shard.connection.waiting")
.tag("shard", shard)
.register(meterRegistry, poolBean, HikariPoolMXBean::getThreadsAwaitingConnection);
}
}

private void collectShardTableMetrics(String shard) {
// 收集表级别的统计信息
String sql = """
SELECT
table_name,
table_rows,
data_length,
index_length,
(data_length + index_length) as total_size
FROM information_schema.tables
WHERE table_schema = DATABASE()
""";

try {
List<TableMetrics> metrics = shardingTemplate.queryForList(
sql,
new Object[]{},
(rs, rowNum) -> {
TableMetrics metric = new TableMetrics();
metric.setTableName(rs.getString("table_name"));
metric.setRowCount(rs.getLong("table_rows"));
metric.setDataSize(rs.getLong("data_length"));
metric.setIndexSize(rs.getLong("index_length"));
metric.setTotalSize(rs.getLong("total_size"));
return metric;
},
shard
);

for (TableMetrics metric : metrics) {
Gauge.builder("shard.table.rows")
.tag("shard", shard)
.tag("table", metric.getTableName())
.register(meterRegistry, metric, TableMetrics::getRowCount);

Gauge.builder("shard.table.size")
.tag("shard", shard)
.tag("table", metric.getTableName())
.tag("type", "data")
.register(meterRegistry, metric, TableMetrics::getDataSize);

Gauge.builder("shard.table.size")
.tag("shard", shard)
.tag("table", metric.getTableName())
.tag("type", "index")
.register(meterRegistry, metric, TableMetrics::getIndexSize);
}
} catch (Exception e) {
log.error("Failed to collect table metrics for shard: {}", shard, e);
}
}
}

/**
* 分片健康检查
*/
@Component
public class ShardHealthChecker {

@Scheduled(fixedRate = 30000) // 每30秒检查一次
public void checkShardHealth() {
List<String> allShards = shardingStrategy.getAllShards();

for (String shard : allShards) {
CompletableFuture.runAsync(() -> checkSingleShardHealth(shard));
}
}

private void checkSingleShardHealth(String shard) {
boolean isHealthy = true;
String errorMessage = null;

try {
// 检查数据库连接
long startTime = System.currentTimeMillis();

List<Integer> result = shardingTemplate.queryForList(
"SELECT 1",
new Object[]{},
(rs, rowNum) -> rs.getInt(1),
shard
);

long responseTime = System.currentTimeMillis() - startTime;

// 记录响应时间
Gauge.builder("shard.health.response_time")
.tag("shard", shard)
.register(meterRegistry, responseTime, Double::valueOf);

// 检查响应时间是否过长
if (responseTime > 5000) { // 超过5秒
isHealthy = false;
errorMessage = "Response time too long: " + responseTime + "ms";
}

} catch (Exception e) {
isHealthy = false;
errorMessage = e.getMessage();
log.error("Health check failed for shard: {}", shard, e);
}

// 更新健康状态
Gauge.builder("shard.health.status")
.tag("shard", shard)
.register(meterRegistry, isHealthy ? 1.0 : 0.0, Double::valueOf);

// 发送告警
if (!isHealthy) {
sendHealthAlert(shard, errorMessage);
}
}

private void sendHealthAlert(String shard, String errorMessage) {
// 发送告警通知(邮件、短信、钉钉等)
AlertMessage alert = new AlertMessage();
alert.setLevel("CRITICAL");
alert.setTitle("分片健康检查失败");
alert.setContent(String.format("分片 %s 健康检查失败: %s", shard, errorMessage));
alert.setTimestamp(LocalDateTime.now());

// 这里可以集成具体的告警系统
log.error("ALERT: Shard health check failed - shard: {}, error: {}", shard, errorMessage);
}
}
}

// 监控相关数据结构
@Data
public class ShardQueryEvent {
private String shardName;
private String tableName;
private String sql;
private long duration;
private boolean success;
private String errorMessage;
}

@Data
public class TableMetrics {
private String tableName;
private long rowCount;
private long dataSize;
private long indexSize;
private long totalSize;
}

@Data
public class AlertMessage {
private String level;
private String title;
private String content;
private LocalDateTime timestamp;
}

最佳实践与总结

分库分表设计原则

  1. 业务驱动原则

    • 根据业务特点选择合适的分片策略
    • 优先考虑查询模式和访问频率
    • 避免过度设计,从简单开始逐步演进
  2. 数据均匀分布原则

    • 选择高基数的分片键
    • 避免数据倾斜和热点问题
    • 定期监控各分片的数据分布情况
  3. 查询友好原则

    • 尽量避免跨分片查询
    • 设计时考虑常用查询场景
    • 合理使用冗余数据减少关联查询
  4. 扩展性原则

    • 预留足够的扩展空间
    • 选择易于扩展的分片算法
    • 考虑未来的数据增长和业务发展

常见问题与解决方案

1. 数据倾斜问题

问题描述:某些分片的数据量远大于其他分片

解决方案

  • 重新评估分片键的选择
  • 使用复合分片键提高数据分布均匀性
  • 实施数据重平衡策略
  • 监控数据分布并及时调整

2. 跨分片查询性能问题

问题描述:需要查询多个分片的数据,性能较差

解决方案

  • 优化查询逻辑,减少跨分片查询
  • 使用数据冗余,在单个分片中存储常用的关联数据
  • 实施查询结果缓存
  • 使用异步查询和结果合并

3. 分布式事务复杂性

问题描述:跨分片的事务处理复杂,一致性难以保证

解决方案

  • 尽量避免跨分片事务
  • 使用最终一致性代替强一致性
  • 实施Saga模式或消息驱动的事务
  • 设计补偿机制处理异常情况

4. 运维复杂性增加

问题描述:多个数据库实例增加了运维难度

解决方案

  • 实施自动化部署和监控
  • 统一配置管理
  • 建立完善的备份和恢复策略
  • 使用容器化技术简化部署

技术选型建议

1. 分库分表中间件选择

  • ShardingSphere:功能完善,社区活跃,适合复杂场景
  • MyCat:轻量级,易于使用,适合中小型项目
  • 自研方案:灵活可控,但开发成本较高

2. 数据库选择

  • MySQL:成熟稳定,生态完善,适合大多数场景
  • PostgreSQL:功能强大,支持复杂查询,适合分析型业务
  • TiDB:分布式数据库,自动分片,适合大规模场景

3. 监控工具选择

  • Prometheus + Grafana:开源监控方案,功能强大
  • 云监控服务:如阿里云监控、腾讯云监控等
  • APM工具:如SkyWalking、Pinpoint等

性能优化要点

  1. 索引优化

    • 为分片键创建合适的索引
    • 避免在分片键上使用函数
    • 定期分析和优化索引使用情况
  2. 连接池优化

    • 合理配置连接池大小
    • 监控连接池使用情况
    • 避免连接泄漏
  3. 缓存策略

    • 实施多级缓存
    • 合理设置缓存过期时间
    • 处理缓存一致性问题
  4. 批量操作优化

    • 使用批量插入和更新
    • 合理控制批次大小
    • 避免长时间锁定

结语

数据库分库分表是解决大规模数据存储和高并发访问的重要技术手段。通过合理的分片策略设计、完善的数据迁移方案、高效的跨分片查询处理以及全面的监控告警机制,可以构建出稳定、高性能的分布式数据架构。

在实施分库分表时,需要充分考虑业务特点、技术复杂性和运维成本,选择最适合的方案。同时,要建立完善的监控和运维体系,确保系统的稳定运行。随着业务的发展和技术的演进,分库分表架构也需要持续优化和改进,以满足不断变化的需求。

通过本文的深入探讨,希望能为读者在设计和实施分库分表方案时提供有价值的参考和指导。

版权所有,如有侵权请联系我