数据库性能优化与调优

概述

数据库性能优化是后端开发的核心技能,涉及索引设计、查询优化、事务处理、分库分表等多个方面。本文总结了MySQL数据库优化的关键技术和面试重点。

核心面试问题

1. 索引优化与设计

面试问题:什么是覆盖索引?如何设计高效的复合索引?

索引类型与原理

-- 创建测试表
CREATE TABLE `user_order` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `user_id` bigint(20) NOT NULL COMMENT '用户ID',
  `order_no` varchar(32) NOT NULL COMMENT '订单号',
  `product_id` bigint(20) NOT NULL COMMENT '产品ID',
  `status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '订单状态',
  `amount` decimal(10,2) NOT NULL COMMENT '订单金额',
  `create_time` datetime NOT NULL COMMENT '创建时间',
  `update_time` datetime NOT NULL COMMENT '更新时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `uk_order_no` (`order_no`),
  KEY `idx_user_id` (`user_id`),
  KEY `idx_create_time` (`create_time`),
  -- 复合索引设计
  KEY `idx_user_status_create` (`user_id`, `status`, `create_time`),
  -- 覆盖索引设计
  KEY `idx_cover_query` (`user_id`, `status`, `create_time`, `amount`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

索引优化实践

@Repository
public class UserOrderRepository {

    @Autowired
    private JdbcTemplate jdbcTemplate;

    // 1. 使用覆盖索引避免回表
    public List<OrderSummary> getOrderSummaryByUser(Long userId, Integer status) {
        // 这个查询可以完全使用 idx_cover_query 索引,无需回表
        String sql = """
            SELECT user_id, status, create_time, amount 
            FROM user_order 
            WHERE user_id = ? AND status = ?
            ORDER BY create_time DESC
            LIMIT 10
            """;

        return jdbcTemplate.query(sql, 
            (rs, rowNum) -> new OrderSummary(
                rs.getLong("user_id"),
                rs.getInt("status"),
                rs.getTimestamp("create_time"),
                rs.getBigDecimal("amount")
            ), userId, status);
    }

    // 2. 复合索引最左前缀原则
    public List<UserOrder> getOrdersByConditions(Long userId, Integer status, 
                                                Date startTime, Date endTime) {
        // 使用 idx_user_status_create 索引
        String sql = """
            SELECT * FROM user_order 
            WHERE user_id = ? 
            AND status = ? 
            AND create_time BETWEEN ? AND ?
            ORDER BY create_time DESC
            """;

        return jdbcTemplate.query(sql, new BeanPropertyRowMapper<>(UserOrder.class),
            userId, status, startTime, endTime);
    }

    // 3. 避免索引失效的写法
    public List<UserOrder> searchOrdersOptimized(String orderNoPrefix, 
                                                 Date startDate, Date endDate) {
        StringBuilder sql = new StringBuilder("SELECT * FROM user_order WHERE 1=1");
        List<Object> params = new ArrayList<>();

        // 避免函数操作导致索引失效
        if (orderNoPrefix != null) {
            sql.append(" AND order_no LIKE ?");
            params.add(orderNoPrefix + "%");  // 右模糊匹配可以使用索引
        }

        if (startDate != null) {
            sql.append(" AND create_time >= ?");
            params.add(startDate);
        }

        if (endDate != null) {
            sql.append(" AND create_time < ?");
            params.add(endDate);
        }

        sql.append(" ORDER BY create_time DESC LIMIT 100");

        return jdbcTemplate.query(sql.toString(), 
            new BeanPropertyRowMapper<>(UserOrder.class), params.toArray());
    }

    // 4. 分页查询优化
    public PageResult<UserOrder> getOrdersWithPagination(Long userId, Integer pageNum, Integer pageSize) {
        // 先查询总数(如果需要)
        String countSql = "SELECT COUNT(*) FROM user_order WHERE user_id = ?";
        long total = jdbcTemplate.queryForObject(countSql, Long.class, userId);

        // 使用子查询优化大偏移量分页
        String sql = """
            SELECT o.* FROM user_order o 
            INNER JOIN (
                SELECT id FROM user_order 
                WHERE user_id = ? 
                ORDER BY create_time DESC 
                LIMIT ?, ?
            ) t ON o.id = t.id
            ORDER BY o.create_time DESC
            """;

        int offset = (pageNum - 1) * pageSize;
        List<UserOrder> orders = jdbcTemplate.query(sql, 
            new BeanPropertyRowMapper<>(UserOrder.class), userId, offset, pageSize);

        return new PageResult<>(orders, total, pageNum, pageSize);
    }
}

索引监控与分析

@Component
public class IndexAnalyzer {

    @Autowired
    private JdbcTemplate jdbcTemplate;

    // 分析慢查询
    public List<SlowQuery> analyzeSlowQueries() {
        String sql = """
            SELECT 
                query_time,
                lock_time,
                rows_sent,
                rows_examined,
                sql_text
            FROM mysql.slow_log 
            WHERE start_time > DATE_SUB(NOW(), INTERVAL 1 HOUR)
            ORDER BY query_time DESC
            LIMIT 10
            """;

        return jdbcTemplate.query(sql, (rs, rowNum) -> 
            SlowQuery.builder()
                .queryTime(rs.getTime("query_time"))
                .lockTime(rs.getTime("lock_time"))
                .rowsSent(rs.getLong("rows_sent"))
                .rowsExamined(rs.getLong("rows_examined"))
                .sqlText(rs.getString("sql_text"))
                .build()
        );
    }

    // 分析索引使用情况
    public List<IndexUsage> analyzeIndexUsage(String tableName) {
        String sql = """
            SELECT 
                table_schema,
                table_name,
                index_name,
                column_name,
                cardinality,
                sub_part,
                packed,
                nullable,
                index_type
            FROM information_schema.statistics 
            WHERE table_name = ? AND table_schema = DATABASE()
            ORDER BY table_name, seq_in_index
            """;

        return jdbcTemplate.query(sql, (rs, rowNum) ->
            IndexUsage.builder()
                .tableName(rs.getString("table_name"))
                .indexName(rs.getString("index_name"))
                .columnName(rs.getString("column_name"))
                .cardinality(rs.getLong("cardinality"))
                .indexType(rs.getString("index_type"))
                .build(),
            tableName
        );
    }

    // 检查未使用的索引
    public List<String> findUnusedIndexes(String tableName) {
        String sql = """
            SELECT 
                CONCAT(s.table_schema, '.', s.table_name) as table_name,
                s.index_name
            FROM information_schema.statistics s
            LEFT JOIN performance_schema.table_io_waits_summary_by_index_usage t 
                ON s.table_schema = t.object_schema 
                AND s.table_name = t.object_name 
                AND s.index_name = t.index_name
            WHERE s.table_schema = DATABASE()
            AND s.table_name = ?
            AND s.index_name != 'PRIMARY'
            AND t.index_name IS NULL
            """;

        return jdbcTemplate.queryForList(sql, String.class, tableName);
    }
}

2. 查询优化技巧

面试问题:如何优化复杂的SQL查询?EXPLAIN的各个字段含义?

执行计划分析

@Service
public class QueryOptimizationService {

    @Autowired
    private JdbcTemplate jdbcTemplate;

    // 分析执行计划
    public List<ExplainResult> explainQuery(String sql) {
        String explainSql = "EXPLAIN " + sql;

        return jdbcTemplate.query(explainSql, (rs, rowNum) ->
            ExplainResult.builder()
                .id(rs.getInt("id"))
                .selectType(rs.getString("select_type"))
                .table(rs.getString("table"))
                .partitions(rs.getString("partitions"))
                .type(rs.getString("type"))
                .possibleKeys(rs.getString("possible_keys"))
                .key(rs.getString("key"))
                .keyLen(rs.getString("key_len"))
                .ref(rs.getString("ref"))
                .rows(rs.getLong("rows"))
                .filtered(rs.getDouble("filtered"))
                .extra(rs.getString("Extra"))
                .build()
        );
    }

    // 查询优化示例:订单统计
    public OrderStatistics getOrderStatisticsBad(Long userId, Date startDate, Date endDate) {
        // 不好的写法:多次查询
        String totalAmountSql = """
            SELECT SUM(amount) FROM user_order 
            WHERE user_id = ? AND create_time BETWEEN ? AND ?
            """;

        String totalCountSql = """
            SELECT COUNT(*) FROM user_order 
            WHERE user_id = ? AND create_time BETWEEN ? AND ?
            """;

        String avgAmountSql = """
            SELECT AVG(amount) FROM user_order 
            WHERE user_id = ? AND create_time BETWEEN ? AND ?
            """;

        BigDecimal totalAmount = jdbcTemplate.queryForObject(totalAmountSql, 
            BigDecimal.class, userId, startDate, endDate);
        Long totalCount = jdbcTemplate.queryForObject(totalCountSql, 
            Long.class, userId, startDate, endDate);
        BigDecimal avgAmount = jdbcTemplate.queryForObject(avgAmountSql, 
            BigDecimal.class, userId, startDate, endDate);

        return new OrderStatistics(totalAmount, totalCount, avgAmount);
    }

    public OrderStatistics getOrderStatisticsGood(Long userId, Date startDate, Date endDate) {
        // 好的写法:一次查询
        String sql = """
            SELECT 
                SUM(amount) as total_amount,
                COUNT(*) as total_count,
                AVG(amount) as avg_amount
            FROM user_order 
            WHERE user_id = ? AND create_time BETWEEN ? AND ?
            """;

        return jdbcTemplate.queryForObject(sql, (rs, rowNum) ->
            new OrderStatistics(
                rs.getBigDecimal("total_amount"),
                rs.getLong("total_count"),
                rs.getBigDecimal("avg_amount")
            ), userId, startDate, endDate);
    }

    // 复杂查询优化:JOIN vs 子查询
    public List<UserOrderDetail> getUserOrdersWithProductInfo(Long userId) {
        // 使用 JOIN(通常性能更好)
        String sql = """
            SELECT 
                o.id,
                o.order_no,
                o.amount,
                o.create_time,
                p.product_name,
                p.category_id,
                c.category_name
            FROM user_order o
            INNER JOIN product p ON o.product_id = p.id
            INNER JOIN category c ON p.category_id = c.id
            WHERE o.user_id = ?
            ORDER BY o.create_time DESC
            LIMIT 20
            """;

        return jdbcTemplate.query(sql, (rs, rowNum) ->
            UserOrderDetail.builder()
                .id(rs.getLong("id"))
                .orderNo(rs.getString("order_no"))
                .amount(rs.getBigDecimal("amount"))
                .createTime(rs.getTimestamp("create_time"))
                .productName(rs.getString("product_name"))
                .categoryName(rs.getString("category_name"))
                .build(),
            userId
        );
    }

    // 批量操作优化
    public void batchUpdateOrderStatus(List<Long> orderIds, Integer newStatus) {
        // 使用批量更新而不是逐条更新
        String sql = """
            UPDATE user_order 
            SET status = ?, update_time = NOW() 
            WHERE id = ?
            """;

        List<Object[]> batchArgs = orderIds.stream()
            .map(orderId -> new Object[]{newStatus, orderId})
            .collect(Collectors.toList());

        jdbcTemplate.batchUpdate(sql, batchArgs);
    }

    // 使用临时表优化复杂查询
    public List<UserOrderSummary> getComplexOrderSummary(List<Long> userIds, Date startDate) {
        // 创建临时表
        String createTempSql = """
            CREATE TEMPORARY TABLE temp_user_list (
                user_id BIGINT PRIMARY KEY
            )
            """;
        jdbcTemplate.execute(createTempSql);

        // 批量插入用户ID
        String insertSql = "INSERT INTO temp_user_list (user_id) VALUES (?)";
        List<Object[]> userArgs = userIds.stream()
            .map(userId -> new Object[]{userId})
            .collect(Collectors.toList());
        jdbcTemplate.batchUpdate(insertSql, userArgs);

        // 使用临时表进行复杂查询
        String sql = """
            SELECT 
                t.user_id,
                COUNT(o.id) as order_count,
                SUM(o.amount) as total_amount,
                MAX(o.create_time) as last_order_time
            FROM temp_user_list t
            LEFT JOIN user_order o ON t.user_id = o.user_id 
                AND o.create_time >= ?
            GROUP BY t.user_id
            """;

        List<UserOrderSummary> result = jdbcTemplate.query(sql, (rs, rowNum) ->
            UserOrderSummary.builder()
                .userId(rs.getLong("user_id"))
                .orderCount(rs.getInt("order_count"))
                .totalAmount(rs.getBigDecimal("total_amount"))
                .lastOrderTime(rs.getTimestamp("last_order_time"))
                .build(),
            startDate
        );

        // 清理临时表
        jdbcTemplate.execute("DROP TEMPORARY TABLE temp_user_list");

        return result;
    }
}

3. 事务与锁优化

面试问题:MySQL的事务隔离级别,如何避免死锁?

事务管理实践

@Service
@Transactional
public class TransactionOptimizationService {

    @Autowired
    private JdbcTemplate jdbcTemplate;

    @Autowired
    private PlatformTransactionManager transactionManager;

    // 1. 事务隔离级别控制
    @Transactional(isolation = Isolation.READ_COMMITTED)
    public void updateOrderWithReadCommitted(Long orderId, Integer newStatus) {
        // 读已提交隔离级别,避免脏读,允许不可重复读
        String sql = "UPDATE user_order SET status = ?, update_time = NOW() WHERE id = ?";
        jdbcTemplate.update(sql, newStatus, orderId);
    }

    @Transactional(isolation = Isolation.REPEATABLE_READ)
    public OrderCheckResult checkAndUpdateOrder(Long orderId, Integer expectedStatus, Integer newStatus) {
        // 可重复读隔离级别,避免不可重复读
        String selectSql = "SELECT status FROM user_order WHERE id = ? FOR UPDATE";
        Integer currentStatus = jdbcTemplate.queryForObject(selectSql, Integer.class, orderId);

        if (!expectedStatus.equals(currentStatus)) {
            return OrderCheckResult.failure("订单状态已变更");
        }

        String updateSql = "UPDATE user_order SET status = ?, update_time = NOW() WHERE id = ?";
        int updated = jdbcTemplate.update(updateSql, newStatus, orderId);

        return updated > 0 ? OrderCheckResult.success() : OrderCheckResult.failure("更新失败");
    }

    // 2. 悲观锁 vs 乐观锁
    @Transactional
    public boolean updateInventoryPessimistic(Long productId, Integer quantity) {
        // 悲观锁:使用 SELECT ... FOR UPDATE
        String selectSql = "SELECT stock FROM product WHERE id = ? FOR UPDATE";
        Integer currentStock = jdbcTemplate.queryForObject(selectSql, Integer.class, productId);

        if (currentStock < quantity) {
            return false;
        }

        String updateSql = "UPDATE product SET stock = stock - ? WHERE id = ?";
        return jdbcTemplate.update(updateSql, quantity, productId) > 0;
    }

    @Transactional
    public boolean updateInventoryOptimistic(Long productId, Integer quantity, Long version) {
        // 乐观锁:使用版本号
        String updateSql = """
            UPDATE product 
            SET stock = stock - ?, version = version + 1 
            WHERE id = ? AND version = ? AND stock >= ?
            """;

        return jdbcTemplate.update(updateSql, quantity, productId, version, quantity) > 0;
    }

    // 3. 死锁预防
    @Transactional
    public void transferOrdersBetweenUsers(Long fromUserId, Long toUserId, List<Long> orderIds) {
        // 按照用户ID排序,避免死锁
        Long firstUserId = Math.min(fromUserId, toUserId);
        Long secondUserId = Math.max(fromUserId, toUserId);

        // 按顺序锁定用户记录
        String lockUserSql = "SELECT id FROM user WHERE id = ? FOR UPDATE";
        jdbcTemplate.queryForObject(lockUserSql, Long.class, firstUserId);
        jdbcTemplate.queryForObject(lockUserSql, Long.class, secondUserId);

        // 按照订单ID排序更新
        List<Long> sortedOrderIds = orderIds.stream().sorted().collect(Collectors.toList());

        String updateSql = "UPDATE user_order SET user_id = ? WHERE id = ? AND user_id = ?";
        for (Long orderId : sortedOrderIds) {
            jdbcTemplate.update(updateSql, toUserId, orderId, fromUserId);
        }
    }

    // 4. 分布式事务示例
    public void executeDistributedTransaction() {
        DefaultTransactionDefinition def = new DefaultTransactionDefinition();
        def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
        def.setIsolationLevel(TransactionDefinition.ISOLATION_READ_COMMITTED);
        def.setTimeout(30);

        TransactionStatus status = transactionManager.getTransaction(def);

        try {
            // 执行业务逻辑
            performBusinessLogic();

            // 调用其他系统的服务
            callExternalService();

            transactionManager.commit(status);
        } catch (Exception e) {
            transactionManager.rollback(status);
            throw e;
        }
    }

    private void performBusinessLogic() {
        // 业务逻辑实现
    }

    private void callExternalService() {
        // 调用外部服务
    }
}

4. 分库分表策略

面试问题:如何设计分库分表方案?分库分表后如何处理跨库查询?

分库分表实现

@Component
public class ShardingStrategy {

    private static final int DB_COUNT = 4;  // 分库数量
    private static final int TABLE_COUNT = 16;  // 每个库的分表数量

    // 根据用户ID进行分库分表
    public ShardingInfo getShardingInfo(Long userId) {
        int dbIndex = (int) (userId % DB_COUNT);
        int tableIndex = (int) (userId % TABLE_COUNT);

        String dbName = "order_db_" + dbIndex;
        String tableName = "user_order_" + tableIndex;

        return new ShardingInfo(dbName, tableName);
    }

    // 根据订单ID进行分片
    public ShardingInfo getShardingInfoByOrderId(Long orderId) {
        // 假设订单ID的生成规则包含了用户ID信息
        Long userId = extractUserIdFromOrderId(orderId);
        return getShardingInfo(userId);
    }

    private Long extractUserIdFromOrderId(Long orderId) {
        // 从订单ID中提取用户ID的逻辑
        // 这里简化处理
        return orderId / 1000000;
    }
}

@Service
public class ShardingOrderService {

    @Autowired
    private Map<String, JdbcTemplate> jdbcTemplateMap; // 多数据源

    @Autowired
    private ShardingStrategy shardingStrategy;

    // 单库查询
    public List<UserOrder> getOrdersByUserId(Long userId) {
        ShardingInfo shardInfo = shardingStrategy.getShardingInfo(userId);
        JdbcTemplate jdbcTemplate = jdbcTemplateMap.get(shardInfo.getDbName());

        String sql = "SELECT * FROM " + shardInfo.getTableName() + " WHERE user_id = ?";

        return jdbcTemplate.query(sql, new BeanPropertyRowMapper<>(UserOrder.class), userId);
    }

    // 跨库查询 - 按时间范围
    public List<UserOrder> getOrdersByTimeRange(Date startTime, Date endTime) {
        List<UserOrder> allOrders = new ArrayList<>();

        // 并行查询所有分片
        List<CompletableFuture<List<UserOrder>>> futures = new ArrayList<>();

        for (int dbIndex = 0; dbIndex < 4; dbIndex++) {
            for (int tableIndex = 0; tableIndex < 16; tableIndex++) {
                String dbName = "order_db_" + dbIndex;
                String tableName = "user_order_" + tableIndex;

                CompletableFuture<List<UserOrder>> future = CompletableFuture.supplyAsync(() -> {
                    JdbcTemplate jdbcTemplate = jdbcTemplateMap.get(dbName);
                    String sql = "SELECT * FROM " + tableName + 
                               " WHERE create_time BETWEEN ? AND ? LIMIT 1000";

                    return jdbcTemplate.query(sql, 
                        new BeanPropertyRowMapper<>(UserOrder.class), startTime, endTime);
                });

                futures.add(future);
            }
        }

        // 收集结果
        for (CompletableFuture<List<UserOrder>> future : futures) {
            try {
                allOrders.addAll(future.get(5, TimeUnit.SECONDS));
            } catch (Exception e) {
                // 记录错误但不影响其他分片的查询
                System.err.println("分片查询失败: " + e.getMessage());
            }
        }

        // 排序和分页
        return allOrders.stream()
            .sorted(Comparator.comparing(UserOrder::getCreateTime).reversed())
            .limit(100)
            .collect(Collectors.toList());
    }

    // 分布式事务处理
    @Transactional
    public void createOrderWithSharding(CreateOrderRequest request) {
        ShardingInfo shardInfo = shardingStrategy.getShardingInfo(request.getUserId());
        JdbcTemplate jdbcTemplate = jdbcTemplateMap.get(shardInfo.getDbName());

        String sql = """
            INSERT INTO """ + shardInfo.getTableName() + """ 
            (user_id, order_no, product_id, amount, status, create_time, update_time)
            VALUES (?, ?, ?, ?, ?, NOW(), NOW())
            """;

        jdbcTemplate.update(sql, 
            request.getUserId(),
            request.getOrderNo(),
            request.getProductId(),
            request.getAmount(),
            0  // 初始状态
        );

        // 如果需要更新其他系统的数据,使用消息队列保证最终一致性
        sendOrderCreatedMessage(request);
    }

    private void sendOrderCreatedMessage(CreateOrderRequest request) {
        // 发送消息到MQ,异步处理其他系统的数据更新
        // 例如:更新库存、用户积分等
    }
}

读写分离实现

@Configuration
public class ReadWriteSplitConfig {

    @Bean
    @Primary
    public DataSource routingDataSource() {
        RoutingDataSource routingDataSource = new RoutingDataSource();

        Map<Object, Object> dataSourceMap = new HashMap<>();
        dataSourceMap.put("master", masterDataSource());
        dataSourceMap.put("slave1", slave1DataSource());
        dataSourceMap.put("slave2", slave2DataSource());

        routingDataSource.setTargetDataSources(dataSourceMap);
        routingDataSource.setDefaultTargetDataSource(masterDataSource());

        return routingDataSource;
    }

    @Bean
    public DataSource masterDataSource() {
        HikariConfig config = new HikariConfig();
        config.setJdbcUrl("jdbc:mysql://master:3306/orderdb");
        config.setUsername("root");
        config.setPassword("password");
        config.setMaximumPoolSize(20);
        return new HikariDataSource(config);
    }

    @Bean
    public DataSource slave1DataSource() {
        HikariConfig config = new HikariConfig();
        config.setJdbcUrl("jdbc:mysql://slave1:3306/orderdb");
        config.setUsername("readonly");
        config.setPassword("password");
        config.setMaximumPoolSize(10);
        config.setReadOnly(true);
        return new HikariDataSource(config);
    }

    @Bean
    public DataSource slave2DataSource() {
        HikariConfig config = new HikariConfig();
        config.setJdbcUrl("jdbc:mysql://slave2:3306/orderdb");
        config.setUsername("readonly");
        config.setPassword("password");
        config.setMaximumPoolSize(10);
        config.setReadOnly(true);
        return new HikariDataSource(config);
    }
}

public class RoutingDataSource extends AbstractRoutingDataSource {

    @Override
    protected Object determineCurrentLookupKey() {
        return DataSourceContextHolder.getDataSourceType();
    }
}

public class DataSourceContextHolder {
    private static final ThreadLocal<String> contextHolder = new ThreadLocal<>();

    public static void setDataSourceType(String dataSourceType) {
        contextHolder.set(dataSourceType);
    }

    public static String getDataSourceType() {
        return contextHolder.get();
    }

    public static void clearDataSourceType() {
        contextHolder.remove();
    }
}

@Aspect
@Component
public class ReadWriteSplitAspect {

    private final List<String> slaveDataSources = Arrays.asList("slave1", "slave2");
    private final AtomicInteger counter = new AtomicInteger(0);

    @Around("@annotation(readOnly)")
    public Object routeDataSource(ProceedingJoinPoint joinPoint, ReadOnly readOnly) throws Throwable {
        try {
            if (readOnly.value()) {
                // 读操作:选择从库
                String slaveDataSource = selectSlaveDataSource();
                DataSourceContextHolder.setDataSourceType(slaveDataSource);
            } else {
                // 写操作:使用主库
                DataSourceContextHolder.setDataSourceType("master");
            }

            return joinPoint.proceed();
        } finally {
            DataSourceContextHolder.clearDataSourceType();
        }
    }

    private String selectSlaveDataSource() {
        // 轮询选择从库
        int index = counter.getAndIncrement() % slaveDataSources.size();
        return slaveDataSources.get(index);
    }
}

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface ReadOnly {
    boolean value() default true;
}

5. 连接池与配置优化

面试问题:如何配置数据库连接池?有哪些关键参数需要调优?

连接池配置优化

@Configuration
public class DataSourceOptimizationConfig {

    @Bean
    @ConfigurationProperties(prefix = "app.datasource.hikari")
    public HikariConfig hikariConfig() {
        HikariConfig config = new HikariConfig();

        // 基础连接配置
        config.setJdbcUrl("jdbc:mysql://localhost:3306/orderdb?useSSL=false&serverTimezone=UTC");
        config.setUsername("root");
        config.setPassword("password");
        config.setDriverClassName("com.mysql.cj.jdbc.Driver");

        // 连接池大小配置
        config.setMaximumPoolSize(20);           // 最大连接数
        config.setMinimumIdle(5);                // 最小空闲连接数
        config.setConnectionTimeout(30000);      // 连接超时时间(30秒)
        config.setIdleTimeout(600000);           // 空闲超时时间(10分钟)
        config.setMaxLifetime(1800000);          // 连接最大生命周期(30分钟)
        config.setLeakDetectionThreshold(60000); // 连接泄漏检测阈值(1分钟)

        // 性能优化配置
        config.setAutoCommit(true);
        config.setReadOnly(false);
        config.setTransactionIsolation("TRANSACTION_READ_COMMITTED");

        // MySQL特定优化
        config.addDataSourceProperty("cachePrepStmts", "true");
        config.addDataSourceProperty("prepStmtCacheSize", "250");
        config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");
        config.addDataSourceProperty("useServerPrepStmts", "true");
        config.addDataSourceProperty("rewriteBatchedStatements", "true");
        config.addDataSourceProperty("cacheResultSetMetadata", "true");
        config.addDataSourceProperty("cacheServerConfiguration", "true");
        config.addDataSourceProperty("elideSetAutoCommits", "true");
        config.addDataSourceProperty("maintainTimeStats", "false");

        return config;
    }

    @Bean
    public HikariDataSource dataSource(HikariConfig hikariConfig) {
        return new HikariDataSource(hikariConfig);
    }

    // 连接池监控
    @Bean
    public ConnectionPoolMonitor connectionPoolMonitor(HikariDataSource dataSource) {
        return new ConnectionPoolMonitor(dataSource);
    }
}

@Component
public class ConnectionPoolMonitor {

    private final HikariDataSource dataSource;
    private final MeterRegistry meterRegistry;

    public ConnectionPoolMonitor(HikariDataSource dataSource) {
        this.dataSource = dataSource;
        this.meterRegistry = Metrics.globalRegistry;
    }

    @PostConstruct
    public void init() {
        // 注册连接池指标
        Gauge.builder("hikari.connections.active")
            .description("Active connections")
            .register(meterRegistry, dataSource, ds -> ds.getHikariPoolMXBean().getActiveConnections());

        Gauge.builder("hikari.connections.idle")
            .description("Idle connections")
            .register(meterRegistry, dataSource, ds -> ds.getHikariPoolMXBean().getIdleConnections());

        Gauge.builder("hikari.connections.pending")
            .description("Pending connections")
            .register(meterRegistry, dataSource, ds -> ds.getHikariPoolMXBean().getThreadsAwaitingConnection());

        Gauge.builder("hikari.connections.total")
            .description("Total connections")
            .register(meterRegistry, dataSource, ds -> ds.getHikariPoolMXBean().getTotalConnections());
    }

    @Scheduled(fixedRate = 30000) // 每30秒检查一次
    public void checkConnectionPool() {
        HikariPoolMXBean poolBean = dataSource.getHikariPoolMXBean();

        int activeConnections = poolBean.getActiveConnections();
        int totalConnections = poolBean.getTotalConnections();
        int pendingThreads = poolBean.getThreadsAwaitingConnection();

        // 连接池使用率超过80%时告警
        double utilizationRate = (double) activeConnections / totalConnections;
        if (utilizationRate > 0.8) {
            System.err.println("连接池使用率过高: " + String.format("%.2f%%", utilizationRate * 100));
        }

        // 有线程等待连接时告警
        if (pendingThreads > 0) {
            System.err.println("有 " + pendingThreads + " 个线程正在等待数据库连接");
        }
    }
}

高频面试题目

1. 理论深度题目

Q: InnoDB的MVCC机制是如何实现的?

A: MVCC(多版本并发控制)通过以下机制实现:

  • 事务ID:每个事务都有唯一的事务ID
  • 隐藏列:每行记录包含创建版本号和删除版本号
  • undo log:存储历史版本数据
  • Read View:快照读时生成的可见性视图

Q: 什么情况下会产生死锁?如何避免?

A: 死锁产生条件:

  1. 互斥条件:资源不能被共享
  2. 请求与保持:持有资源的同时请求新资源
  3. 不剥夺条件:资源不能被强制释放
  4. 环路等待:形成环形等待链

避免策略:

  • 超时机制:设置锁等待超时
  • 死锁检测:InnoDB自动检测并回滚
  • 资源排序:按固定顺序获取锁
  • 减少锁持有时间:尽快提交事务

2. 实战应用题目

Q: 如何设计一个高并发的秒杀系统的数据库方案?

答题要点:

  1. 读写分离:查询走从库,下单走主库
  2. 库存前置:Redis预扣库存,异步同步到数据库
  3. 分库分表:按商品ID或用户ID分片
  4. 队列削峰:使用消息队列处理订单
  5. 缓存预热:提前加载热点数据

Q: 大表在线DDL操作如何进行?

答题要点:

  1. pt-online-schema-change工具:在线表结构变更
  2. shadow表技术:创建影子表逐步同步数据
  3. 分批执行:避免长时间锁表
  4. 业务低峰期执行:减少对业务的影响
  5. 备份策略:操作前做好数据备份

总结

数据库优化面试重点:

  1. 索引设计:覆盖索引、复合索引、索引失效场景
  2. 查询优化:执行计划分析、SQL改写技巧
  3. 事务处理:隔离级别、锁机制、死锁处理
  4. 架构设计:分库分表、读写分离、连接池配置
  5. 性能监控:慢查询分析、索引使用情况监控

建议结合实际项目中遇到的数据库性能问题,能够详细描述问题现象、分析过程和解决方案。

powered by Gitbook© 2025 编外计划 | 最后修改: 2025-07-28 18:05:38

results matching ""

    No results matching ""