Java并发编程高级应用
概述
并发编程是Java高级开发工程师的核心技能之一。本文涵盖了多线程、线程安全、并发工具类、以及高并发场景下的设计模式等关键面试内容。
核心面试问题
1. 线程基础与生命周期
面试问题:详细描述Java线程的生命周期,以及各状态之间的转换条件?
线程状态详解
public class ThreadStateDemo {
public static void main(String[] args) throws InterruptedException {
Object lock = new Object();
// NEW - 新创建但未启动
Thread newThread = new Thread(() -> {
synchronized (lock) {
try {
// WAITING - 等待状态
lock.wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
System.out.println("新线程状态: " + newThread.getState()); // NEW
// RUNNABLE - 可运行状态
newThread.start();
Thread.sleep(100);
System.out.println("启动后状态: " + newThread.getState()); // WAITING
// BLOCKED - 阻塞状态示例
Thread blockedThread = new Thread(() -> {
synchronized (lock) {
System.out.println("获取到锁");
}
});
blockedThread.start();
Thread.sleep(100);
System.out.println("被阻塞线程状态: " + blockedThread.getState()); // BLOCKED
// 唤醒等待线程
synchronized (lock) {
lock.notify();
}
// TERMINATED - 终止状态
newThread.join();
System.out.println("线程结束状态: " + newThread.getState()); // TERMINATED
}
}
线程创建方式比较
// 1. 继承Thread类
class MyThread extends Thread {
@Override
public void run() {
System.out.println("继承Thread方式");
}
}
// 2. 实现Runnable接口
class MyRunnable implements Runnable {
@Override
public void run() {
System.out.println("实现Runnable方式");
}
}
// 3. 实现Callable接口
class MyCallable implements Callable<String> {
@Override
public String call() throws Exception {
return "实现Callable方式";
}
}
// 4. 使用Lambda表达式
public class ThreadCreationDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 方式1
new MyThread().start();
// 方式2
new Thread(new MyRunnable()).start();
// 方式3
FutureTask<String> futureTask = new FutureTask<>(new MyCallable());
new Thread(futureTask).start();
System.out.println(futureTask.get());
// 方式4
new Thread(() -> System.out.println("Lambda表达式方式")).start();
}
}
2. 线程安全与同步机制
面试问题:除了synchronized,还有哪些保证线程安全的方式?它们的区别是什么?
synchronized深入理解
public class SynchronizedDemo {
private int count = 0;
private final Object lock = new Object();
// 同步实例方法 - 锁住this对象
public synchronized void syncMethod() {
count++;
}
// 同步静态方法 - 锁住Class对象
public static synchronized void staticSyncMethod() {
System.out.println("静态同步方法");
}
// 同步代码块 - 显式指定锁对象
public void syncBlock() {
synchronized (lock) {
count++;
}
}
// 双重检查锁定单例模式
private static volatile SynchronizedDemo instance;
public static SynchronizedDemo getInstance() {
if (instance == null) {
synchronized (SynchronizedDemo.class) {
if (instance == null) {
instance = new SynchronizedDemo();
}
}
}
return instance;
}
}
ReentrantLock高级应用
public class ReentrantLockDemo {
private final ReentrantLock lock = new ReentrantLock();
private final Condition notEmpty = lock.newCondition();
private final Condition notFull = lock.newCondition();
private final Queue<String> queue = new LinkedList<>();
private final int capacity = 10;
// 生产者
public void produce(String item) throws InterruptedException {
lock.lock();
try {
while (queue.size() == capacity) {
notFull.await(); // 队列满时等待
}
queue.offer(item);
System.out.println("生产: " + item);
notEmpty.signalAll(); // 通知消费者
} finally {
lock.unlock();
}
}
// 消费者
public String consume() throws InterruptedException {
lock.lock();
try {
while (queue.isEmpty()) {
notEmpty.await(); // 队列空时等待
}
String item = queue.poll();
System.out.println("消费: " + item);
notFull.signalAll(); // 通知生产者
return item;
} finally {
lock.unlock();
}
}
// 尝试获取锁
public boolean tryOperation() {
if (lock.tryLock()) {
try {
// 执行需要同步的操作
return true;
} finally {
lock.unlock();
}
}
return false; // 获取锁失败
}
// 带超时的锁获取
public boolean timedOperation() throws InterruptedException {
if (lock.tryLock(5, TimeUnit.SECONDS)) {
try {
// 执行操作
return true;
} finally {
lock.unlock();
}
}
return false; // 超时未获取到锁
}
}
原子类的使用
public class AtomicDemo {
private final AtomicInteger count = new AtomicInteger(0);
private final AtomicReference<String> atomicRef = new AtomicReference<>("initial");
private final AtomicStampedReference<String> stampedRef =
new AtomicStampedReference<>("initial", 0);
// 原子递增
public int increment() {
return count.incrementAndGet();
}
// CAS操作
public boolean compareAndSet(int expected, int update) {
return count.compareAndSet(expected, update);
}
// 解决ABA问题
public boolean updateWithStamp(String expected, String update) {
int[] stampHolder = new int[1];
String current = stampedRef.get(stampHolder);
if (current.equals(expected)) {
return stampedRef.compareAndSet(expected, update,
stampHolder[0], stampHolder[0] + 1);
}
return false;
}
// 累加器示例
private final LongAdder longAdder = new LongAdder();
public void addValue(long value) {
longAdder.add(value);
}
public long getSum() {
return longAdder.sum();
}
}
3. 线程池与并发工具
面试问题:ThreadPoolExecutor的核心参数含义,以及如何合理配置线程池?
线程池深入理解
public class ThreadPoolDemo {
// 自定义线程池
private final ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, // corePoolSize - 核心线程数
5, // maximumPoolSize - 最大线程数
60L, // keepAliveTime - 空闲线程存活时间
TimeUnit.SECONDS, // timeUnit - 时间单位
new LinkedBlockingQueue<>(100), // workQueue - 工作队列
new ThreadFactory() { // threadFactory - 线程工厂
private final AtomicInteger threadNumber = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "MyThread-" + threadNumber.getAndIncrement());
t.setDaemon(false);
return t;
}
},
new ThreadPoolExecutor.CallerRunsPolicy() // handler - 拒绝策略
);
// 不同场景的线程池选择
public void demonstrateThreadPools() {
// CPU密集型任务
int cpuCores = Runtime.getRuntime().availableProcessors();
ExecutorService cpuPool = Executors.newFixedThreadPool(cpuCores);
// IO密集型任务
ExecutorService ioPool = Executors.newFixedThreadPool(cpuCores * 2);
// 缓存线程池 - 适合大量短期任务
ExecutorService cachedPool = Executors.newCachedThreadPool();
// 定时任务池
ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(4);
// 单线程池 - 保证任务顺序执行
ExecutorService singlePool = Executors.newSingleThreadExecutor();
}
// 优雅关闭线程池
public void shutdown() {
executor.shutdown();
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow();
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
System.err.println("线程池未能正常关闭");
}
}
} catch (InterruptedException ie) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
CompletableFuture异步编程
public class CompletableFutureDemo {
public void demonstrateAsyncProgramming() {
// 基本异步执行
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
// 模拟耗时操作
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "Hello";
});
// 链式处理
CompletableFuture<String> future2 = future1
.thenApply(s -> s + " World")
.thenApply(String::toUpperCase);
// 异步链式处理
CompletableFuture<Void> future3 = future2
.thenAcceptAsync(System.out::println);
// 组合多个Future
CompletableFuture<String> future4 = CompletableFuture.supplyAsync(() -> "Java");
CompletableFuture<String> future5 = CompletableFuture.supplyAsync(() -> "Programming");
CompletableFuture<String> combined = future4.thenCombine(future5, (a, b) -> a + " " + b);
// 等待所有任务完成
CompletableFuture<Void> allOf = CompletableFuture.allOf(future1, future2, future4, future5);
// 等待任一任务完成
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(future1, future4);
// 异常处理
CompletableFuture<String> futureWithException = CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("随机异常");
}
return "成功";
}).exceptionally(throwable -> {
System.out.println("处理异常: " + throwable.getMessage());
return "默认值";
});
}
// 实际应用场景:并行调用多个服务
public CompletableFuture<String> getUserInfo(String userId) {
CompletableFuture<String> userBasic = CompletableFuture.supplyAsync(() ->
getUserBasicInfo(userId));
CompletableFuture<String> userDetail = CompletableFuture.supplyAsync(() ->
getUserDetailInfo(userId));
CompletableFuture<String> userPreference = CompletableFuture.supplyAsync(() ->
getUserPreference(userId));
return userBasic.thenCombine(userDetail, (basic, detail) -> basic + detail)
.thenCombine(userPreference, (info, pref) -> info + pref);
}
private String getUserBasicInfo(String userId) {
// 模拟远程调用
return "BasicInfo[" + userId + "]";
}
private String getUserDetailInfo(String userId) {
return "DetailInfo[" + userId + "]";
}
private String getUserPreference(String userId) {
return "Preference[" + userId + "]";
}
}
4. 并发集合与工具类
面试问题:ConcurrentHashMap的实现原理,以及其他并发集合的特点?
ConcurrentHashMap深入分析
public class ConcurrentHashMapDemo {
private final ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
public void demonstrateConcurrentOperations() {
// 基本操作
map.put("key1", 1);
map.putIfAbsent("key2", 2);
// 原子更新操作
map.compute("key1", (key, val) -> val == null ? 1 : val + 1);
map.computeIfAbsent("key3", k -> k.length());
map.computeIfPresent("key1", (key, val) -> val * 2);
// 批量操作
map.forEach((key, value) -> System.out.println(key + "=" + value));
// 并行计算
int sum = map.reduceValues(1, Integer::sum);
// 条件操作
Integer result = map.search(1, (key, value) -> value > 5 ? key + ":" + value : null);
}
// 性能对比测试
public void performanceTest() {
Map<String, Integer> hashMap = new HashMap<>();
Map<String, Integer> syncMap = Collections.synchronizedMap(new HashMap<>());
ConcurrentHashMap<String, Integer> concurrentMap = new ConcurrentHashMap<>();
// 测试并发写入性能
ExecutorService executor = Executors.newFixedThreadPool(10);
long start = System.currentTimeMillis();
for (int i = 0; i < 10; i++) {
executor.submit(() -> {
for (int j = 0; j < 10000; j++) {
concurrentMap.put("key" + j, j);
}
});
}
// 等待完成并计算时间
}
}
其他并发工具类
public class ConcurrentUtilsDemo {
// CountDownLatch - 等待多个线程完成
public void demonstrateCountDownLatch() throws InterruptedException {
int threadCount = 5;
CountDownLatch latch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
new Thread(() -> {
try {
// 模拟工作
Thread.sleep(1000);
System.out.println("线程完成工作");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
latch.countDown();
}
}).start();
}
latch.await(); // 等待所有线程完成
System.out.println("所有线程都完成了");
}
// CyclicBarrier - 线程间同步点
public void demonstrateCyclicBarrier() {
int threadCount = 3;
CyclicBarrier barrier = new CyclicBarrier(threadCount, () -> {
System.out.println("所有线程都到达屏障点,执行屏障动作");
});
for (int i = 0; i < threadCount; i++) {
new Thread(() -> {
try {
System.out.println("线程到达屏障点");
barrier.await();
System.out.println("线程通过屏障,继续执行");
} catch (InterruptedException | BrokenBarrierException e) {
Thread.currentThread().interrupt();
}
}).start();
}
}
// Semaphore - 控制并发访问数量
public void demonstrateSemaphore() {
Semaphore semaphore = new Semaphore(2); // 最多2个线程同时访问
for (int i = 0; i < 5; i++) {
new Thread(() -> {
try {
semaphore.acquire();
System.out.println("线程获取到许可,开始执行");
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
System.out.println("线程释放许可");
semaphore.release();
}
}).start();
}
}
// Exchanger - 两个线程间数据交换
public void demonstrateExchanger() {
Exchanger<String> exchanger = new Exchanger<>();
new Thread(() -> {
try {
String data = "线程1的数据";
String received = exchanger.exchange(data);
System.out.println("线程1发送: " + data + ", 接收: " + received);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
new Thread(() -> {
try {
String data = "线程2的数据";
String received = exchanger.exchange(data);
System.out.println("线程2发送: " + data + ", 接收: " + received);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
}
}
5. 死锁与性能优化
面试问题:如何检测和避免死锁?有哪些性能优化的手段?
死锁检测与预防
public class DeadlockDemo {
private final Object lock1 = new Object();
private final Object lock2 = new Object();
// 可能导致死锁的代码
public void method1() {
synchronized (lock1) {
System.out.println("获取lock1");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
synchronized (lock2) {
System.out.println("获取lock2");
}
}
}
public void method2() {
synchronized (lock2) {
System.out.println("获取lock2");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
synchronized (lock1) {
System.out.println("获取lock1");
}
}
}
// 死锁检测
public static void detectDeadlock() {
ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
long[] deadlockedThreads = threadBean.findDeadlockedThreads();
if (deadlockedThreads != null) {
ThreadInfo[] threadInfos = threadBean.getThreadInfo(deadlockedThreads);
for (ThreadInfo threadInfo : threadInfos) {
System.out.println("死锁线程: " + threadInfo.getThreadName());
}
}
}
// 避免死锁的方法:按顺序获取锁
private static final Object globalLock1 = new Object();
private static final Object globalLock2 = new Object();
public void safeMethod1() {
synchronized (globalLock1) {
synchronized (globalLock2) {
// 按固定顺序获取锁
}
}
}
public void safeMethod2() {
synchronized (globalLock1) { // 相同的顺序
synchronized (globalLock2) {
// 按固定顺序获取锁
}
}
}
}
高频面试题目
1. 理论深度题目
Q: volatile关键字的作用原理,与synchronized的区别?
A: volatile的作用:
- 可见性:确保变量修改对所有线程立即可见
- 禁止重排序:防止编译器和CPU重排序
- 不保证原子性:不能替代synchronized
区别:
- volatile轻量级,synchronized重量级
- volatile只能修饰变量,synchronized可以修饰方法和代码块
- volatile不会阻塞线程,synchronized可能阻塞
Q: 什么是happens-before规则?
A: happens-before是Java内存模型的核心概念:
- 程序顺序规则:单线程内按程序顺序执行
- 监视器锁规则:unlock操作happens-before后续的lock操作
- volatile变量规则:volatile写happens-before后续的volatile读
- 传递性规则:A happens-before B,B happens-before C,则A happens-before C
2. 实战应用题目
Q: 如何设计一个线程安全的单例模式?
// 枚举单例(推荐)
public enum SingletonEnum {
INSTANCE;
public void doSomething() {
// 业务逻辑
}
}
// 静态内部类单例
public class SingletonInnerClass {
private SingletonInnerClass() {}
private static class Holder {
private static final SingletonInnerClass INSTANCE = new SingletonInnerClass();
}
public static SingletonInnerClass getInstance() {
return Holder.INSTANCE;
}
}
Q: 如何实现一个生产者-消费者模式?
答题要点:
- 选择合适的数据结构(阻塞队列)
- 使用wait/notify或Condition
- 考虑多生产者、多消费者场景
- 处理中断和异常
总结
并发编程面试重点:
- 基础理论:线程状态、内存模型、happens-before
- 同步机制:synchronized、Lock、原子类的使用场景
- 并发工具:线程池、CompletableFuture、并发集合
- 问题排查:死锁检测、性能调优
- 实战经验:高并发场景下的设计和优化
建议准备具体的并发编程项目经验,能够描述遇到的问题和解决方案。