数据库性能优化与调优
概述
数据库性能优化是后端开发的核心技能,涉及索引设计、查询优化、事务处理、分库分表等多个方面。本文总结了MySQL数据库优化的关键技术和面试重点。
核心面试问题
1. 索引优化与设计
面试问题:什么是覆盖索引?如何设计高效的复合索引?
索引类型与原理
-- 创建测试表
CREATE TABLE `user_order` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`user_id` bigint(20) NOT NULL COMMENT '用户ID',
`order_no` varchar(32) NOT NULL COMMENT '订单号',
`product_id` bigint(20) NOT NULL COMMENT '产品ID',
`status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '订单状态',
`amount` decimal(10,2) NOT NULL COMMENT '订单金额',
`create_time` datetime NOT NULL COMMENT '创建时间',
`update_time` datetime NOT NULL COMMENT '更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_order_no` (`order_no`),
KEY `idx_user_id` (`user_id`),
KEY `idx_create_time` (`create_time`),
-- 复合索引设计
KEY `idx_user_status_create` (`user_id`, `status`, `create_time`),
-- 覆盖索引设计
KEY `idx_cover_query` (`user_id`, `status`, `create_time`, `amount`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
索引优化实践
@Repository
public class UserOrderRepository {
@Autowired
private JdbcTemplate jdbcTemplate;
// 1. 使用覆盖索引避免回表
public List<OrderSummary> getOrderSummaryByUser(Long userId, Integer status) {
// 这个查询可以完全使用 idx_cover_query 索引,无需回表
String sql = """
SELECT user_id, status, create_time, amount
FROM user_order
WHERE user_id = ? AND status = ?
ORDER BY create_time DESC
LIMIT 10
""";
return jdbcTemplate.query(sql,
(rs, rowNum) -> new OrderSummary(
rs.getLong("user_id"),
rs.getInt("status"),
rs.getTimestamp("create_time"),
rs.getBigDecimal("amount")
), userId, status);
}
// 2. 复合索引最左前缀原则
public List<UserOrder> getOrdersByConditions(Long userId, Integer status,
Date startTime, Date endTime) {
// 使用 idx_user_status_create 索引
String sql = """
SELECT * FROM user_order
WHERE user_id = ?
AND status = ?
AND create_time BETWEEN ? AND ?
ORDER BY create_time DESC
""";
return jdbcTemplate.query(sql, new BeanPropertyRowMapper<>(UserOrder.class),
userId, status, startTime, endTime);
}
// 3. 避免索引失效的写法
public List<UserOrder> searchOrdersOptimized(String orderNoPrefix,
Date startDate, Date endDate) {
StringBuilder sql = new StringBuilder("SELECT * FROM user_order WHERE 1=1");
List<Object> params = new ArrayList<>();
// 避免函数操作导致索引失效
if (orderNoPrefix != null) {
sql.append(" AND order_no LIKE ?");
params.add(orderNoPrefix + "%"); // 右模糊匹配可以使用索引
}
if (startDate != null) {
sql.append(" AND create_time >= ?");
params.add(startDate);
}
if (endDate != null) {
sql.append(" AND create_time < ?");
params.add(endDate);
}
sql.append(" ORDER BY create_time DESC LIMIT 100");
return jdbcTemplate.query(sql.toString(),
new BeanPropertyRowMapper<>(UserOrder.class), params.toArray());
}
// 4. 分页查询优化
public PageResult<UserOrder> getOrdersWithPagination(Long userId, Integer pageNum, Integer pageSize) {
// 先查询总数(如果需要)
String countSql = "SELECT COUNT(*) FROM user_order WHERE user_id = ?";
long total = jdbcTemplate.queryForObject(countSql, Long.class, userId);
// 使用子查询优化大偏移量分页
String sql = """
SELECT o.* FROM user_order o
INNER JOIN (
SELECT id FROM user_order
WHERE user_id = ?
ORDER BY create_time DESC
LIMIT ?, ?
) t ON o.id = t.id
ORDER BY o.create_time DESC
""";
int offset = (pageNum - 1) * pageSize;
List<UserOrder> orders = jdbcTemplate.query(sql,
new BeanPropertyRowMapper<>(UserOrder.class), userId, offset, pageSize);
return new PageResult<>(orders, total, pageNum, pageSize);
}
}
索引监控与分析
@Component
public class IndexAnalyzer {
@Autowired
private JdbcTemplate jdbcTemplate;
// 分析慢查询
public List<SlowQuery> analyzeSlowQueries() {
String sql = """
SELECT
query_time,
lock_time,
rows_sent,
rows_examined,
sql_text
FROM mysql.slow_log
WHERE start_time > DATE_SUB(NOW(), INTERVAL 1 HOUR)
ORDER BY query_time DESC
LIMIT 10
""";
return jdbcTemplate.query(sql, (rs, rowNum) ->
SlowQuery.builder()
.queryTime(rs.getTime("query_time"))
.lockTime(rs.getTime("lock_time"))
.rowsSent(rs.getLong("rows_sent"))
.rowsExamined(rs.getLong("rows_examined"))
.sqlText(rs.getString("sql_text"))
.build()
);
}
// 分析索引使用情况
public List<IndexUsage> analyzeIndexUsage(String tableName) {
String sql = """
SELECT
table_schema,
table_name,
index_name,
column_name,
cardinality,
sub_part,
packed,
nullable,
index_type
FROM information_schema.statistics
WHERE table_name = ? AND table_schema = DATABASE()
ORDER BY table_name, seq_in_index
""";
return jdbcTemplate.query(sql, (rs, rowNum) ->
IndexUsage.builder()
.tableName(rs.getString("table_name"))
.indexName(rs.getString("index_name"))
.columnName(rs.getString("column_name"))
.cardinality(rs.getLong("cardinality"))
.indexType(rs.getString("index_type"))
.build(),
tableName
);
}
// 检查未使用的索引
public List<String> findUnusedIndexes(String tableName) {
String sql = """
SELECT
CONCAT(s.table_schema, '.', s.table_name) as table_name,
s.index_name
FROM information_schema.statistics s
LEFT JOIN performance_schema.table_io_waits_summary_by_index_usage t
ON s.table_schema = t.object_schema
AND s.table_name = t.object_name
AND s.index_name = t.index_name
WHERE s.table_schema = DATABASE()
AND s.table_name = ?
AND s.index_name != 'PRIMARY'
AND t.index_name IS NULL
""";
return jdbcTemplate.queryForList(sql, String.class, tableName);
}
}
2. 查询优化技巧
面试问题:如何优化复杂的SQL查询?EXPLAIN的各个字段含义?
执行计划分析
@Service
public class QueryOptimizationService {
@Autowired
private JdbcTemplate jdbcTemplate;
// 分析执行计划
public List<ExplainResult> explainQuery(String sql) {
String explainSql = "EXPLAIN " + sql;
return jdbcTemplate.query(explainSql, (rs, rowNum) ->
ExplainResult.builder()
.id(rs.getInt("id"))
.selectType(rs.getString("select_type"))
.table(rs.getString("table"))
.partitions(rs.getString("partitions"))
.type(rs.getString("type"))
.possibleKeys(rs.getString("possible_keys"))
.key(rs.getString("key"))
.keyLen(rs.getString("key_len"))
.ref(rs.getString("ref"))
.rows(rs.getLong("rows"))
.filtered(rs.getDouble("filtered"))
.extra(rs.getString("Extra"))
.build()
);
}
// 查询优化示例:订单统计
public OrderStatistics getOrderStatisticsBad(Long userId, Date startDate, Date endDate) {
// 不好的写法:多次查询
String totalAmountSql = """
SELECT SUM(amount) FROM user_order
WHERE user_id = ? AND create_time BETWEEN ? AND ?
""";
String totalCountSql = """
SELECT COUNT(*) FROM user_order
WHERE user_id = ? AND create_time BETWEEN ? AND ?
""";
String avgAmountSql = """
SELECT AVG(amount) FROM user_order
WHERE user_id = ? AND create_time BETWEEN ? AND ?
""";
BigDecimal totalAmount = jdbcTemplate.queryForObject(totalAmountSql,
BigDecimal.class, userId, startDate, endDate);
Long totalCount = jdbcTemplate.queryForObject(totalCountSql,
Long.class, userId, startDate, endDate);
BigDecimal avgAmount = jdbcTemplate.queryForObject(avgAmountSql,
BigDecimal.class, userId, startDate, endDate);
return new OrderStatistics(totalAmount, totalCount, avgAmount);
}
public OrderStatistics getOrderStatisticsGood(Long userId, Date startDate, Date endDate) {
// 好的写法:一次查询
String sql = """
SELECT
SUM(amount) as total_amount,
COUNT(*) as total_count,
AVG(amount) as avg_amount
FROM user_order
WHERE user_id = ? AND create_time BETWEEN ? AND ?
""";
return jdbcTemplate.queryForObject(sql, (rs, rowNum) ->
new OrderStatistics(
rs.getBigDecimal("total_amount"),
rs.getLong("total_count"),
rs.getBigDecimal("avg_amount")
), userId, startDate, endDate);
}
// 复杂查询优化:JOIN vs 子查询
public List<UserOrderDetail> getUserOrdersWithProductInfo(Long userId) {
// 使用 JOIN(通常性能更好)
String sql = """
SELECT
o.id,
o.order_no,
o.amount,
o.create_time,
p.product_name,
p.category_id,
c.category_name
FROM user_order o
INNER JOIN product p ON o.product_id = p.id
INNER JOIN category c ON p.category_id = c.id
WHERE o.user_id = ?
ORDER BY o.create_time DESC
LIMIT 20
""";
return jdbcTemplate.query(sql, (rs, rowNum) ->
UserOrderDetail.builder()
.id(rs.getLong("id"))
.orderNo(rs.getString("order_no"))
.amount(rs.getBigDecimal("amount"))
.createTime(rs.getTimestamp("create_time"))
.productName(rs.getString("product_name"))
.categoryName(rs.getString("category_name"))
.build(),
userId
);
}
// 批量操作优化
public void batchUpdateOrderStatus(List<Long> orderIds, Integer newStatus) {
// 使用批量更新而不是逐条更新
String sql = """
UPDATE user_order
SET status = ?, update_time = NOW()
WHERE id = ?
""";
List<Object[]> batchArgs = orderIds.stream()
.map(orderId -> new Object[]{newStatus, orderId})
.collect(Collectors.toList());
jdbcTemplate.batchUpdate(sql, batchArgs);
}
// 使用临时表优化复杂查询
public List<UserOrderSummary> getComplexOrderSummary(List<Long> userIds, Date startDate) {
// 创建临时表
String createTempSql = """
CREATE TEMPORARY TABLE temp_user_list (
user_id BIGINT PRIMARY KEY
)
""";
jdbcTemplate.execute(createTempSql);
// 批量插入用户ID
String insertSql = "INSERT INTO temp_user_list (user_id) VALUES (?)";
List<Object[]> userArgs = userIds.stream()
.map(userId -> new Object[]{userId})
.collect(Collectors.toList());
jdbcTemplate.batchUpdate(insertSql, userArgs);
// 使用临时表进行复杂查询
String sql = """
SELECT
t.user_id,
COUNT(o.id) as order_count,
SUM(o.amount) as total_amount,
MAX(o.create_time) as last_order_time
FROM temp_user_list t
LEFT JOIN user_order o ON t.user_id = o.user_id
AND o.create_time >= ?
GROUP BY t.user_id
""";
List<UserOrderSummary> result = jdbcTemplate.query(sql, (rs, rowNum) ->
UserOrderSummary.builder()
.userId(rs.getLong("user_id"))
.orderCount(rs.getInt("order_count"))
.totalAmount(rs.getBigDecimal("total_amount"))
.lastOrderTime(rs.getTimestamp("last_order_time"))
.build(),
startDate
);
// 清理临时表
jdbcTemplate.execute("DROP TEMPORARY TABLE temp_user_list");
return result;
}
}
3. 事务与锁优化
面试问题:MySQL的事务隔离级别,如何避免死锁?
事务管理实践
@Service
@Transactional
public class TransactionOptimizationService {
@Autowired
private JdbcTemplate jdbcTemplate;
@Autowired
private PlatformTransactionManager transactionManager;
// 1. 事务隔离级别控制
@Transactional(isolation = Isolation.READ_COMMITTED)
public void updateOrderWithReadCommitted(Long orderId, Integer newStatus) {
// 读已提交隔离级别,避免脏读,允许不可重复读
String sql = "UPDATE user_order SET status = ?, update_time = NOW() WHERE id = ?";
jdbcTemplate.update(sql, newStatus, orderId);
}
@Transactional(isolation = Isolation.REPEATABLE_READ)
public OrderCheckResult checkAndUpdateOrder(Long orderId, Integer expectedStatus, Integer newStatus) {
// 可重复读隔离级别,避免不可重复读
String selectSql = "SELECT status FROM user_order WHERE id = ? FOR UPDATE";
Integer currentStatus = jdbcTemplate.queryForObject(selectSql, Integer.class, orderId);
if (!expectedStatus.equals(currentStatus)) {
return OrderCheckResult.failure("订单状态已变更");
}
String updateSql = "UPDATE user_order SET status = ?, update_time = NOW() WHERE id = ?";
int updated = jdbcTemplate.update(updateSql, newStatus, orderId);
return updated > 0 ? OrderCheckResult.success() : OrderCheckResult.failure("更新失败");
}
// 2. 悲观锁 vs 乐观锁
@Transactional
public boolean updateInventoryPessimistic(Long productId, Integer quantity) {
// 悲观锁:使用 SELECT ... FOR UPDATE
String selectSql = "SELECT stock FROM product WHERE id = ? FOR UPDATE";
Integer currentStock = jdbcTemplate.queryForObject(selectSql, Integer.class, productId);
if (currentStock < quantity) {
return false;
}
String updateSql = "UPDATE product SET stock = stock - ? WHERE id = ?";
return jdbcTemplate.update(updateSql, quantity, productId) > 0;
}
@Transactional
public boolean updateInventoryOptimistic(Long productId, Integer quantity, Long version) {
// 乐观锁:使用版本号
String updateSql = """
UPDATE product
SET stock = stock - ?, version = version + 1
WHERE id = ? AND version = ? AND stock >= ?
""";
return jdbcTemplate.update(updateSql, quantity, productId, version, quantity) > 0;
}
// 3. 死锁预防
@Transactional
public void transferOrdersBetweenUsers(Long fromUserId, Long toUserId, List<Long> orderIds) {
// 按照用户ID排序,避免死锁
Long firstUserId = Math.min(fromUserId, toUserId);
Long secondUserId = Math.max(fromUserId, toUserId);
// 按顺序锁定用户记录
String lockUserSql = "SELECT id FROM user WHERE id = ? FOR UPDATE";
jdbcTemplate.queryForObject(lockUserSql, Long.class, firstUserId);
jdbcTemplate.queryForObject(lockUserSql, Long.class, secondUserId);
// 按照订单ID排序更新
List<Long> sortedOrderIds = orderIds.stream().sorted().collect(Collectors.toList());
String updateSql = "UPDATE user_order SET user_id = ? WHERE id = ? AND user_id = ?";
for (Long orderId : sortedOrderIds) {
jdbcTemplate.update(updateSql, toUserId, orderId, fromUserId);
}
}
// 4. 分布式事务示例
public void executeDistributedTransaction() {
DefaultTransactionDefinition def = new DefaultTransactionDefinition();
def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
def.setIsolationLevel(TransactionDefinition.ISOLATION_READ_COMMITTED);
def.setTimeout(30);
TransactionStatus status = transactionManager.getTransaction(def);
try {
// 执行业务逻辑
performBusinessLogic();
// 调用其他系统的服务
callExternalService();
transactionManager.commit(status);
} catch (Exception e) {
transactionManager.rollback(status);
throw e;
}
}
private void performBusinessLogic() {
// 业务逻辑实现
}
private void callExternalService() {
// 调用外部服务
}
}
4. 分库分表策略
面试问题:如何设计分库分表方案?分库分表后如何处理跨库查询?
分库分表实现
@Component
public class ShardingStrategy {
private static final int DB_COUNT = 4; // 分库数量
private static final int TABLE_COUNT = 16; // 每个库的分表数量
// 根据用户ID进行分库分表
public ShardingInfo getShardingInfo(Long userId) {
int dbIndex = (int) (userId % DB_COUNT);
int tableIndex = (int) (userId % TABLE_COUNT);
String dbName = "order_db_" + dbIndex;
String tableName = "user_order_" + tableIndex;
return new ShardingInfo(dbName, tableName);
}
// 根据订单ID进行分片
public ShardingInfo getShardingInfoByOrderId(Long orderId) {
// 假设订单ID的生成规则包含了用户ID信息
Long userId = extractUserIdFromOrderId(orderId);
return getShardingInfo(userId);
}
private Long extractUserIdFromOrderId(Long orderId) {
// 从订单ID中提取用户ID的逻辑
// 这里简化处理
return orderId / 1000000;
}
}
@Service
public class ShardingOrderService {
@Autowired
private Map<String, JdbcTemplate> jdbcTemplateMap; // 多数据源
@Autowired
private ShardingStrategy shardingStrategy;
// 单库查询
public List<UserOrder> getOrdersByUserId(Long userId) {
ShardingInfo shardInfo = shardingStrategy.getShardingInfo(userId);
JdbcTemplate jdbcTemplate = jdbcTemplateMap.get(shardInfo.getDbName());
String sql = "SELECT * FROM " + shardInfo.getTableName() + " WHERE user_id = ?";
return jdbcTemplate.query(sql, new BeanPropertyRowMapper<>(UserOrder.class), userId);
}
// 跨库查询 - 按时间范围
public List<UserOrder> getOrdersByTimeRange(Date startTime, Date endTime) {
List<UserOrder> allOrders = new ArrayList<>();
// 并行查询所有分片
List<CompletableFuture<List<UserOrder>>> futures = new ArrayList<>();
for (int dbIndex = 0; dbIndex < 4; dbIndex++) {
for (int tableIndex = 0; tableIndex < 16; tableIndex++) {
String dbName = "order_db_" + dbIndex;
String tableName = "user_order_" + tableIndex;
CompletableFuture<List<UserOrder>> future = CompletableFuture.supplyAsync(() -> {
JdbcTemplate jdbcTemplate = jdbcTemplateMap.get(dbName);
String sql = "SELECT * FROM " + tableName +
" WHERE create_time BETWEEN ? AND ? LIMIT 1000";
return jdbcTemplate.query(sql,
new BeanPropertyRowMapper<>(UserOrder.class), startTime, endTime);
});
futures.add(future);
}
}
// 收集结果
for (CompletableFuture<List<UserOrder>> future : futures) {
try {
allOrders.addAll(future.get(5, TimeUnit.SECONDS));
} catch (Exception e) {
// 记录错误但不影响其他分片的查询
System.err.println("分片查询失败: " + e.getMessage());
}
}
// 排序和分页
return allOrders.stream()
.sorted(Comparator.comparing(UserOrder::getCreateTime).reversed())
.limit(100)
.collect(Collectors.toList());
}
// 分布式事务处理
@Transactional
public void createOrderWithSharding(CreateOrderRequest request) {
ShardingInfo shardInfo = shardingStrategy.getShardingInfo(request.getUserId());
JdbcTemplate jdbcTemplate = jdbcTemplateMap.get(shardInfo.getDbName());
String sql = """
INSERT INTO """ + shardInfo.getTableName() + """
(user_id, order_no, product_id, amount, status, create_time, update_time)
VALUES (?, ?, ?, ?, ?, NOW(), NOW())
""";
jdbcTemplate.update(sql,
request.getUserId(),
request.getOrderNo(),
request.getProductId(),
request.getAmount(),
0 // 初始状态
);
// 如果需要更新其他系统的数据,使用消息队列保证最终一致性
sendOrderCreatedMessage(request);
}
private void sendOrderCreatedMessage(CreateOrderRequest request) {
// 发送消息到MQ,异步处理其他系统的数据更新
// 例如:更新库存、用户积分等
}
}
读写分离实现
@Configuration
public class ReadWriteSplitConfig {
@Bean
@Primary
public DataSource routingDataSource() {
RoutingDataSource routingDataSource = new RoutingDataSource();
Map<Object, Object> dataSourceMap = new HashMap<>();
dataSourceMap.put("master", masterDataSource());
dataSourceMap.put("slave1", slave1DataSource());
dataSourceMap.put("slave2", slave2DataSource());
routingDataSource.setTargetDataSources(dataSourceMap);
routingDataSource.setDefaultTargetDataSource(masterDataSource());
return routingDataSource;
}
@Bean
public DataSource masterDataSource() {
HikariConfig config = new HikariConfig();
config.setJdbcUrl("jdbc:mysql://master:3306/orderdb");
config.setUsername("root");
config.setPassword("password");
config.setMaximumPoolSize(20);
return new HikariDataSource(config);
}
@Bean
public DataSource slave1DataSource() {
HikariConfig config = new HikariConfig();
config.setJdbcUrl("jdbc:mysql://slave1:3306/orderdb");
config.setUsername("readonly");
config.setPassword("password");
config.setMaximumPoolSize(10);
config.setReadOnly(true);
return new HikariDataSource(config);
}
@Bean
public DataSource slave2DataSource() {
HikariConfig config = new HikariConfig();
config.setJdbcUrl("jdbc:mysql://slave2:3306/orderdb");
config.setUsername("readonly");
config.setPassword("password");
config.setMaximumPoolSize(10);
config.setReadOnly(true);
return new HikariDataSource(config);
}
}
public class RoutingDataSource extends AbstractRoutingDataSource {
@Override
protected Object determineCurrentLookupKey() {
return DataSourceContextHolder.getDataSourceType();
}
}
public class DataSourceContextHolder {
private static final ThreadLocal<String> contextHolder = new ThreadLocal<>();
public static void setDataSourceType(String dataSourceType) {
contextHolder.set(dataSourceType);
}
public static String getDataSourceType() {
return contextHolder.get();
}
public static void clearDataSourceType() {
contextHolder.remove();
}
}
@Aspect
@Component
public class ReadWriteSplitAspect {
private final List<String> slaveDataSources = Arrays.asList("slave1", "slave2");
private final AtomicInteger counter = new AtomicInteger(0);
@Around("@annotation(readOnly)")
public Object routeDataSource(ProceedingJoinPoint joinPoint, ReadOnly readOnly) throws Throwable {
try {
if (readOnly.value()) {
// 读操作:选择从库
String slaveDataSource = selectSlaveDataSource();
DataSourceContextHolder.setDataSourceType(slaveDataSource);
} else {
// 写操作:使用主库
DataSourceContextHolder.setDataSourceType("master");
}
return joinPoint.proceed();
} finally {
DataSourceContextHolder.clearDataSourceType();
}
}
private String selectSlaveDataSource() {
// 轮询选择从库
int index = counter.getAndIncrement() % slaveDataSources.size();
return slaveDataSources.get(index);
}
}
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface ReadOnly {
boolean value() default true;
}
5. 连接池与配置优化
面试问题:如何配置数据库连接池?有哪些关键参数需要调优?
连接池配置优化
@Configuration
public class DataSourceOptimizationConfig {
@Bean
@ConfigurationProperties(prefix = "app.datasource.hikari")
public HikariConfig hikariConfig() {
HikariConfig config = new HikariConfig();
// 基础连接配置
config.setJdbcUrl("jdbc:mysql://localhost:3306/orderdb?useSSL=false&serverTimezone=UTC");
config.setUsername("root");
config.setPassword("password");
config.setDriverClassName("com.mysql.cj.jdbc.Driver");
// 连接池大小配置
config.setMaximumPoolSize(20); // 最大连接数
config.setMinimumIdle(5); // 最小空闲连接数
config.setConnectionTimeout(30000); // 连接超时时间(30秒)
config.setIdleTimeout(600000); // 空闲超时时间(10分钟)
config.setMaxLifetime(1800000); // 连接最大生命周期(30分钟)
config.setLeakDetectionThreshold(60000); // 连接泄漏检测阈值(1分钟)
// 性能优化配置
config.setAutoCommit(true);
config.setReadOnly(false);
config.setTransactionIsolation("TRANSACTION_READ_COMMITTED");
// MySQL特定优化
config.addDataSourceProperty("cachePrepStmts", "true");
config.addDataSourceProperty("prepStmtCacheSize", "250");
config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");
config.addDataSourceProperty("useServerPrepStmts", "true");
config.addDataSourceProperty("rewriteBatchedStatements", "true");
config.addDataSourceProperty("cacheResultSetMetadata", "true");
config.addDataSourceProperty("cacheServerConfiguration", "true");
config.addDataSourceProperty("elideSetAutoCommits", "true");
config.addDataSourceProperty("maintainTimeStats", "false");
return config;
}
@Bean
public HikariDataSource dataSource(HikariConfig hikariConfig) {
return new HikariDataSource(hikariConfig);
}
// 连接池监控
@Bean
public ConnectionPoolMonitor connectionPoolMonitor(HikariDataSource dataSource) {
return new ConnectionPoolMonitor(dataSource);
}
}
@Component
public class ConnectionPoolMonitor {
private final HikariDataSource dataSource;
private final MeterRegistry meterRegistry;
public ConnectionPoolMonitor(HikariDataSource dataSource) {
this.dataSource = dataSource;
this.meterRegistry = Metrics.globalRegistry;
}
@PostConstruct
public void init() {
// 注册连接池指标
Gauge.builder("hikari.connections.active")
.description("Active connections")
.register(meterRegistry, dataSource, ds -> ds.getHikariPoolMXBean().getActiveConnections());
Gauge.builder("hikari.connections.idle")
.description("Idle connections")
.register(meterRegistry, dataSource, ds -> ds.getHikariPoolMXBean().getIdleConnections());
Gauge.builder("hikari.connections.pending")
.description("Pending connections")
.register(meterRegistry, dataSource, ds -> ds.getHikariPoolMXBean().getThreadsAwaitingConnection());
Gauge.builder("hikari.connections.total")
.description("Total connections")
.register(meterRegistry, dataSource, ds -> ds.getHikariPoolMXBean().getTotalConnections());
}
@Scheduled(fixedRate = 30000) // 每30秒检查一次
public void checkConnectionPool() {
HikariPoolMXBean poolBean = dataSource.getHikariPoolMXBean();
int activeConnections = poolBean.getActiveConnections();
int totalConnections = poolBean.getTotalConnections();
int pendingThreads = poolBean.getThreadsAwaitingConnection();
// 连接池使用率超过80%时告警
double utilizationRate = (double) activeConnections / totalConnections;
if (utilizationRate > 0.8) {
System.err.println("连接池使用率过高: " + String.format("%.2f%%", utilizationRate * 100));
}
// 有线程等待连接时告警
if (pendingThreads > 0) {
System.err.println("有 " + pendingThreads + " 个线程正在等待数据库连接");
}
}
}
高频面试题目
1. 理论深度题目
Q: InnoDB的MVCC机制是如何实现的?
A: MVCC(多版本并发控制)通过以下机制实现:
- 事务ID:每个事务都有唯一的事务ID
- 隐藏列:每行记录包含创建版本号和删除版本号
- undo log:存储历史版本数据
- Read View:快照读时生成的可见性视图
Q: 什么情况下会产生死锁?如何避免?
A: 死锁产生条件:
- 互斥条件:资源不能被共享
- 请求与保持:持有资源的同时请求新资源
- 不剥夺条件:资源不能被强制释放
- 环路等待:形成环形等待链
避免策略:
- 超时机制:设置锁等待超时
- 死锁检测:InnoDB自动检测并回滚
- 资源排序:按固定顺序获取锁
- 减少锁持有时间:尽快提交事务
2. 实战应用题目
Q: 如何设计一个高并发的秒杀系统的数据库方案?
答题要点:
- 读写分离:查询走从库,下单走主库
- 库存前置:Redis预扣库存,异步同步到数据库
- 分库分表:按商品ID或用户ID分片
- 队列削峰:使用消息队列处理订单
- 缓存预热:提前加载热点数据
Q: 大表在线DDL操作如何进行?
答题要点:
- pt-online-schema-change工具:在线表结构变更
- shadow表技术:创建影子表逐步同步数据
- 分批执行:避免长时间锁表
- 业务低峰期执行:减少对业务的影响
- 备份策略:操作前做好数据备份
总结
数据库优化面试重点:
- 索引设计:覆盖索引、复合索引、索引失效场景
- 查询优化:执行计划分析、SQL改写技巧
- 事务处理:隔离级别、锁机制、死锁处理
- 架构设计:分库分表、读写分离、连接池配置
- 性能监控:慢查询分析、索引使用情况监控
建议结合实际项目中遇到的数据库性能问题,能够详细描述问题现象、分析过程和解决方案。