中文字幕精品亚洲无线码二区,国产黄a三级三级三级看三级,亚洲七七久久桃花影院,丰满少妇被猛烈进入,国产小视频在线观看网站

Java并發(fa)編程基(ji)礎:從線程管理(li)到高并發(fa)應用實(shi)踐

本篇主要是多(duo)(duo)線(xian)程的基礎知識,代碼示例較多(duo)(duo),有(you)時間(jian)的可以逐個(ge)分析,具體細(xi)節都放在代碼注釋中了。

1. 理解線程:多任務執行的基石

1.1 什么是線程?

在現代操作系統中,進程是資源分配的基本單位,而線程是CPU調度的最小單位。可以(yi)把進(jin)程想象成一(yi)家公(gong)(gong)司(si),線程就(jiu)是公(gong)(gong)司(si)里的員工(gong)。

/**
 * 演示Java程序天生就是多線程程序
 * 即使最簡單的main方法也會啟動多個系統線程
 */
public class MultiThread {
    public static void main(String[] args) {
        // 獲取Java線程管理MXBean
        ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
        // 不需要獲取同步的monitor和synchronizer信息,僅獲取線程和線程堆棧信息
        ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(false, false);
        
        // 遍歷線程信息
        System.out.println("=== Java程序啟動的線程列表 ===");
        for (ThreadInfo threadInfo : threadInfos) {
            System.out.println("[" + threadInfo.getThreadId() + "] " + 
                             threadInfo.getThreadName());
        }
    }
}

輸出示例:

=== Java程序啟動的線程列表 ===
[4] Signal Dispatcher    // 分發處理發送給JVM信號的線程
[3] Finalizer           // 調用對象finalize方法的線程  
[2] Reference Handler   // 清除Reference的線程
[1] main               // main線程,用戶程序入口

1.2 為什么需要多線程?

三大核心優勢

  1. 充分利用多核處理器 - 避免CPU資源閑置
  2. 提升響應速度 - 后臺任務不阻塞用戶操作
  3. 更好的編程模型 - Java提供一致的多線程API

1.3 線程狀態生命周期

新建(NEW) → 可運行(RUNNABLE) → 運行中
    ↓
超時等待(TIMED_WAITING) ← 等待(WAITING) ← 阻塞(BLOCKED)
    ↓
    終止(TERMINATED)

2. 線程的啟動與安全終止

2.1 正確啟動線程

/**
 * 線程啟動最佳實踐示例
 * 重點:設置有意義的線程名稱,合理設置守護線程標志
 */
public class ThreadStartExample {
    public static void main(String[] args) {
        // 推薦:為線程設置有意義的名稱,便于問題排查
        Thread worker = new Thread(new Task(), "Data-Processor-1");
        worker.setDaemon(false); // 明確設置是否為守護線程
        worker.start(); // 正確啟動方式,不要直接調用run()
        
        System.out.println("主線程繼續執行,不會等待worker線程");
    }
    
    static class Task implements Runnable {
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + " 開始執行");
            try {
                // 模擬工作任務
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                System.out.println("任務被中斷");
            }
            System.out.println(Thread.currentThread().getName() + " 執行完成");
        }
    }
}

2.2 安全終止線程的兩種方式

方式一:使用中斷機制

/**
 * 使用中斷機制安全終止線程
 * 重點:理解中斷異常處理的最佳實踐
 */
public class InterruptExample {
    public static void main(String[] args) throws InterruptedException {
        Thread worker = new Thread(new InterruptibleTask(), "Interruptible-Worker");
        worker.start();
        
        // 主線程等待2秒后中斷工作線程
        TimeUnit.SECONDS.sleep(2);
        System.out.println("主線程發送中斷信號");
        worker.interrupt(); // 發送中斷信號
        
        // 等待工作線程完全退出
        worker.join();
        System.out.println("工作線程已安全退出");
    }
    
    static class InterruptibleTask implements Runnable {
        @Override
        public void run() {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    // 模擬工作 - 這里可能拋出InterruptedException
                    System.out.println("Working...");
                    TimeUnit.MILLISECONDS.sleep(500);
                } catch (InterruptedException e) {
                    /**
                     * 關鍵理解點:為什么需要重新設置中斷狀態?
                     * 
                     * 當線程在阻塞狀態(如sleep、wait、join)時被中斷,
                     * Java會做兩件事:
                     * 1. 拋出InterruptedException
                     * 2. 清除線程的中斷狀態(設為false)
                     * 
                     * 這導致循環條件 !Thread.currentThread().isInterrupted() 
                     * 會繼續為true,線程無法退出。
                     * 
                     * 因此我們需要在捕獲異常后重新設置中斷狀態,
                     * 這樣循環條件就能檢測到中斷,安全退出。
                     */
                    System.out.println("捕獲到中斷異常,重新設置中斷狀態");
                    Thread.currentThread().interrupt(); // 重新設置中斷標志
                }
            }
            System.out.println("線程安全退出,中斷狀態: " + 
                Thread.currentThread().isInterrupted());
        }
    }
}

方式二:使用標志位

/**
 * 使用volatile標志位安全終止線程
 * 適用于沒有阻塞調用或需要更復雜退出邏輯的場景
 */
public class FlagShutdownExample {
    // volatile保證可見性,確保所有線程看到最新的值
    private volatile boolean running = true;
    private final Thread workerThread;
    
    public FlagShutdownExample() {
        this.workerThread = new Thread(this::doWork, "Flag-Controlled-Worker");
    }
    
    public void start() {
        workerThread.start();
    }
    
    /**
     * 優雅停止工作線程
     */
    public void stop() {
        System.out.println("請求停止工作線程");
        running = false;
        // 同時發送中斷,處理可能存在的阻塞情況
        workerThread.interrupt();
    }
    
    /**
     * 工作線程的主循環
     * 同時檢查標志位和中斷狀態,提供雙重保障
     */
    private void doWork() {
        try {
            while (running && !Thread.currentThread().isInterrupted()) {
                // 執行工作任務
                processData();
            }
        } finally {
            // 無論何種方式退出,都執行清理工作
            cleanup();
        }
        System.out.println("工作線程已安全退出");
    }
    
    private void processData() {
        try {
            // 模擬數據處理
            System.out.println("處理數據中...");
            Thread.sleep(300);
        } catch (InterruptedException e) {
            System.out.println("處理數據時被中斷");
            // 收到中斷,但可能還想繼續處理,所以不重新設置中斷
            // 讓循環條件來檢查running標志
        }
    }
    
    private void cleanup() {
        System.out.println("執行資源清理工作...");
        // 關閉文件、數據庫連接等資源
    }
    
    public static void main(String[] args) throws InterruptedException {
        FlagShutdownExample example = new FlagShutdownExample();
        example.start();
        
        // 運行3秒后停止
        Thread.sleep(3000);
        example.stop();
        
        // 等待工作線程退出
        example.workerThread.join();
    }
}

3. 線程間通信:協作的藝術

3.1 volatile關鍵字:共享狀態可見性

/**
 * volatile關鍵字示例
 * 保證多線程間的可見性,但不保證原子性
 */
public class VolatileExample {
    // volatile確保shutdownRequested的修改對所有線程立即可見
    private volatile boolean shutdownRequested = false;
    private int operationCount = 0; // 非volatile,不保證可見性
    
    public void shutdown() {
        shutdownRequested = true; // 所有線程立即可見
        System.out.println("關閉請求已設置");
    }
    
    public void doWork() {
        while (!shutdownRequested) {
            // 正常工作循環
            operationCount++; // 非原子操作,可能有問題
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                System.out.println("工作被中斷");
                Thread.currentThread().interrupt();
                break;
            }
        }
        System.out.println("工作線程退出,操作次數: " + operationCount);
    }
}

3.2 synchronized關鍵字:互斥訪問

/**
 * synchronized關鍵字示例
 * 保證原子性和可見性,但可能影響性能
 */
public class SynchronizedCounter {
    private int count = 0;
    
    /**
     * 同步方法 - 鎖對象是當前實例(this)
     */
    public synchronized void increment() {
        count++; // 原子操作
    }
    
    /**
     * 同步塊 - 可以更細粒度控制鎖的范圍
     */
    public void decrement() {
        // 只同步關鍵部分,減少鎖持有時間
        synchronized (this) {
            count--;
        }
        // 這里可以執行非同步操作
    }
    
    /**
     * 同步的get方法,保證看到最新值
     */
    public synchronized int getCount() {
        return count;
    }
    
    /**
     * 靜態同步方法 - 鎖對象是類的Class對象
     */
    public static synchronized void staticMethod() {
        // 靜態同步方法使用Class對象作為鎖
    }
}

3.3 等待/通知機制:經典生產者-消費者模式

/**
 * 生產者-消費者模式示例
 * 演示wait/notify機制的正確使用
 */
public class WaitNotifyExample {
    private final Object lock = new Object(); // 共享鎖對象
    private final Queue<String> queue = new LinkedList<>();
    private final int MAX_SIZE = 5;
    
    /**
     * 生產者方法
     */
    public void produce(String data) throws InterruptedException {
        synchronized (lock) {
            // 必須使用while循環檢查條件,避免虛假喚醒
            while (queue.size() >= MAX_SIZE) {
                System.out.println("隊列已滿(" + queue.size() + "),生產者等待");
                lock.wait(); // 釋放鎖并等待
            }
            
            queue.offer(data);
            System.out.println("生產: " + data + ",隊列大小: " + queue.size());
            
            // 通知所有等待的消費者
            lock.notifyAll();
        }
    }
    
    /**
     * 消費者方法
     */
    public String consume() throws InterruptedException {
        synchronized (lock) {
            // 必須使用while循環檢查條件
            while (queue.isEmpty()) {
                System.out.println("隊列為空,消費者等待");
                lock.wait(); // 釋放鎖并等待
            }
            
            String data = queue.poll();
            System.out.println("消費: " + data + ",隊列大小: " + queue.size());
            
            // 通知所有等待的生產者
            lock.notifyAll();
            return data;
        }
    }
    
    /**
     * 測試生產者消費者模式
     */
    public static void main(String[] args) {
        WaitNotifyExample example = new WaitNotifyExample();
        
        // 啟動生產者線程
        Thread producer = new Thread(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    example.produce("Data-" + i);
                    Thread.sleep(200);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }, "Producer");
        
        // 啟動消費者線程
        Thread consumer = new Thread(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    example.consume();
                    Thread.sleep(300);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }, "Consumer");
        
        producer.start();
        consumer.start();
    }
}

等待/通知經典范式

// 消費者范式 - 永遠在循環中調用wait()
synchronized(鎖對象) {
    while(條件不滿足) {
        鎖對象.wait();  // 等待時會釋放鎖
    }
    // 條件滿足,處理業務邏輯
}

// 生產者范式  
synchronized(鎖對象) {
    改變條件;           // 改變等待條件
    鎖對象.notifyAll(); // 通知所有等待線程
}

3.4 Thread.join():線程依賴執行

/**
 * Thread.join()使用示例
 * 實現線程間的順序執行依賴
 */
public class JoinExample {
    public static void main(String[] args) throws InterruptedException {
        System.out.println("主線程開始");
        
        Thread previous = Thread.currentThread();
        
        // 創建5個有依賴關系的線程
        for (int i = 0; i < 5; i++) {
            Thread thread = new Thread(new DependentTask(previous), "Worker-" + i);
            thread.start();
            previous = thread; // 設置依賴鏈
        }
        
        // 主線程先做一些工作
        TimeUnit.SECONDS.sleep(1);
        System.out.println(Thread.currentThread().getName() + " 完成初始化工作");
        
        // 等待所有線程完成(實際上由最后一個Worker-4 join主線程)
    }
    
    static class DependentTask implements Runnable {
        private final Thread dependency; // 依賴的線程
        
        public DependentTask(Thread dependency) {
            this.dependency = dependency;
        }
        
        @Override
        public void run() {
            try {
                // 等待依賴的線程執行完成
                System.out.println(Thread.currentThread().getName() + " 等待 " + dependency.getName());
                dependency.join();
                
                // 依賴線程完成后開始自己的工作
                System.out.println(Thread.currentThread().getName() + " 開始工作");
                TimeUnit.MILLISECONDS.sleep(500); // 模擬工作
                System.out.println(Thread.currentThread().getName() + " 完成工作");
                
            } catch (InterruptedException e) {
                System.out.println(Thread.currentThread().getName() + " 被中斷");
                Thread.currentThread().interrupt();
            }
        }
    }
}

3.5 ThreadLocal深入解析:線程局部變量

/**
 * ThreadLocal深度解析
 * 理解原理、使用場景和內存泄漏防護
 */
public class ThreadLocalExample {
    
    /**
     * ThreadLocal基本使用:每個線程獨立的SimpleDateFormat
     * 避免SimpleDateFormat的線程安全問題
     */
    private static final ThreadLocal<SimpleDateFormat> DATE_FORMATTER =
        ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
    
    /**
     * ThreadLocal用于用戶上下文傳遞
     * 在Web應用中非常有用,避免在方法參數中傳遞用戶信息
     */
    private static final ThreadLocal<UserContext> USER_CONTEXT = 
        new ThreadLocal<>();
    
    /**
     * ThreadLocal用于事務上下文
     */
    private static final ThreadLocal<TransactionContext> TRANSACTION_CONTEXT =
        new ThreadLocal<>();
    
    /**
     * 可繼承的ThreadLocal:子線程可以繼承父線程的值
     */
    private static final InheritableThreadLocal<String> INHERITABLE_CONTEXT =
        new InheritableThreadLocal<>();
    
    /**
     * 處理用戶請求的示例方法
     */
    public void processRequest(User user) {
        // 設置用戶上下文到當前線程
        USER_CONTEXT.set(new UserContext(user));
        
        try {
            // 使用線程安全的日期格式化
            String timestamp = DATE_FORMATTER.get().format(new Date());
            System.out.println(Thread.currentThread().getName() + 
                " - 用戶: " + user.getName() + ", 時間: " + timestamp);
            
            // 執行業務邏輯 - 任何方法都可以獲取用戶上下文,無需傳遞參數
            doBusinessLogic();
            
        } finally {
            /**
             * 關鍵:必須清理ThreadLocal,防止內存泄漏!
             * 
             * 原因:
             * 1. ThreadLocalMap的key是弱引用,會被GC回收
             * 2. 但value是強引用,不會被自動回收
             * 3. 如果線程長時間存活(如線程池中的線程),會導致value無法釋放
             * 4. 調用remove()方法顯式清理
             */
            USER_CONTEXT.remove();
            DATE_FORMATTER.remove(); // 清理所有使用的ThreadLocal
        }
    }
    
    private void doBusinessLogic() {
        // 在任何地方都可以獲取用戶上下文,無需方法參數傳遞
        UserContext context = USER_CONTEXT.get();
        if (context != null) {
            System.out.println("執行業務邏輯,用戶: " + context.getUser().getName());
        }
        
        // 使用線程安全的日期格式化
        String now = DATE_FORMATTER.get().format(new Date());
        System.out.println("業務執行時間: " + now);
    }
    
    /**
     * 演示ThreadLocal的內存泄漏問題
     */
    public void demonstrateMemoryLeak() {
        // 錯誤的用法:不清理ThreadLocal
        ThreadLocal<byte[]> leakyLocal = new ThreadLocal<>();
        leakyLocal.set(new byte[1024 * 1024]); // 1MB數據
        
        // 如果沒有調用 leakyLocal.remove(), 即使leakyLocal=null,
        // 線程的ThreadLocalMap中仍然保留著這個Entry
        // 在線程池場景下,線程重用會導致內存不斷增長
    }
    
    /**
     * ThreadLocal最佳實踐:使用try-finally確保清理
     */
    public void bestPractice(User user) {
        USER_CONTEXT.set(new UserContext(user));
        try {
            // 業務處理
            doBusinessLogic();
        } finally {
            // 確保清理,即使在業務邏輯中發生異常
            USER_CONTEXT.remove();
        }
    }
    
    /**
     * 測試多線程環境下的ThreadLocal
     */
    public static void main(String[] args) throws InterruptedException {
        ThreadLocalExample example = new ThreadLocalExample();
        
        // 創建多個線程,每個線程有獨立的ThreadLocal值
        Thread[] threads = new Thread[3];
        for (int i = 0; i < threads.length; i++) {
            final int userId = i;
            threads[i] = new Thread(() -> {
                User user = new User("User-" + userId);
                example.processRequest(user);
            }, "Thread-" + i);
            
            threads[i].start();
        }
        
        // 等待所有線程完成
        for (Thread thread : threads) {
            thread.join();
        }
        
        System.out.println("所有線程執行完成");
    }
    
    // 輔助類定義
    static class UserContext {
        private final User user;
        public UserContext(User user) { this.user = user; }
        public User getUser() { return user; }
    }
    
    static class User {
        private final String name;
        public User(String name) { this.name = name; }
        public String getName() { return name; }
    }
    
    static class TransactionContext {
        // 事務相關信息
    }
}

/**
 * ThreadLocal高級用法:自定義ThreadLocal子類
 */
class AdvancedThreadLocal<T> extends ThreadLocal<T> {
    
    /**
     * 初始值 - 當線程第一次調用get()時,如果還沒有設置值,會調用此方法
     */
    @Override
    protected T initialValue() {
        System.out.println(Thread.currentThread().getName() + " - 初始化ThreadLocal值");
        return null; // 返回默認初始值
    }
    
    /**
     * 子線程值繼承 - 僅對InheritableThreadLocal有效
     * 當創建新線程時,可以控制如何從父線程繼承值
     */
    protected T childValue(T parentValue) {
        System.out.println("子線程繼承父線程的值: " + parentValue);
        return parentValue; // 直接繼承,也可以進行轉換
    }
}

4. 線程應用實例:從理論到實踐

4.1 等待超時模式:避免無限期等待

/**
 * 等待超時模式實現
 * 在等待/通知機制基礎上增加超時控制
 */
public class TimeoutWait<T> {
    private T result;
    
    /**
     * 帶超時的獲取方法
     * @param timeoutMs 超時時間(毫秒)
     * @return 結果,超時返回null
     */
    public synchronized T get(long timeoutMs) throws InterruptedException {
        long endTime = System.currentTimeMillis() + timeoutMs;
        long remaining = timeoutMs;
        
        // 循環檢查條件和剩余時間
        while (result == null && remaining > 0) {
            wait(remaining); // 等待剩余時間
            remaining = endTime - System.currentTimeMillis(); // 更新剩余時間
        }
        
        return result; // 可能為null(超時)
    }
    
    /**
     * 設置結果并通知所有等待線程
     */
    public synchronized void set(T value) {
        this.result = value;
        notifyAll(); // 通知所有等待的線程
    }
    
    /**
     * 演示超時等待的使用
     */
    public static void main(String[] args) throws InterruptedException {
        TimeoutWait<String> waitObject = new TimeoutWait<>();
        
        // 消費者線程 - 等待結果,最多等3秒
        Thread consumer = new Thread(() -> {
            try {
                System.out.println("消費者開始等待結果...");
                String result = waitObject.get(3000);
                if (result != null) {
                    System.out.println("消費者收到結果: " + result);
                } else {
                    System.out.println("消費者等待超時");
                }
            } catch (InterruptedException e) {
                System.out.println("消費者被中斷");
            }
        });
        
        // 生產者線程 - 2秒后產生結果
        Thread producer = new Thread(() -> {
            try {
                Thread.sleep(2000); // 模擬生產耗時
                waitObject.set("生產完成的數據");
                System.out.println("生產者完成工作");
            } catch (InterruptedException e) {
                System.out.println("生產者被中斷");
            }
        });
        
        consumer.start();
        producer.start();
        
        consumer.join();
        producer.join();
    }
}

4.2 數據庫連接池實現

/**
 * 簡易數據庫連接池實現
 * 演示資源池化和等待超時模式的實際應用
 */
public class SimpleConnectionPool {
    private final LinkedList<Connection> pool = new LinkedList<>();
    private final int maxSize;
    private int createdCount = 0;
    
    public SimpleConnectionPool(int initialSize, int maxSize) {
        this.maxSize = maxSize;
        // 初始化連接池
        for (int i = 0; i < initialSize; i++) {
            pool.add(createConnection());
        }
        System.out.println("連接池初始化完成,初始連接數: " + initialSize);
    }
    
    /**
     * 獲取連接,支持超時
     */
    public Connection getConnection(long timeoutMs) throws InterruptedException, TimeoutException {
        synchronized (pool) {
            // 如果池中有可用連接,立即返回
            if (!pool.isEmpty()) {
                return pool.removeFirst();
            }
            
            // 池為空,但還可以創建新連接
            if (createdCount < maxSize) {
                Connection conn = createConnection();
                System.out.println("創建新連接,當前連接數: " + createdCount);
                return conn;
            }
            
            // 等待可用連接
            long endTime = System.currentTimeMillis() + timeoutMs;
            long remaining = timeoutMs;
            
            while (pool.isEmpty() && remaining > 0) {
                System.out.println(Thread.currentThread().getName() + " 等待連接,剩余時間: " + remaining + "ms");
                pool.wait(remaining);
                remaining = endTime - System.currentTimeMillis();
            }
            
            if (!pool.isEmpty()) {
                return pool.removeFirst();
            }
            throw new TimeoutException("獲取連接超時,等待 " + timeoutMs + "ms");
        }
    }
    
    /**
     * 歸還連接到池中
     */
    public void releaseConnection(Connection conn) {
        if (conn != null) {
            synchronized (pool) {
                if (pool.size() < maxSize) {
                    pool.addLast(conn);
                    pool.notifyAll(); // 通知等待的線程
                    System.out.println("連接已歸還,當前池大小: " + pool.size());
                } else {
                    // 連接數超過上限,關閉連接
                    closeConnection(conn);
                    createdCount--;
                    System.out.println("連接池已滿,關閉連接");
                }
            }
        }
    }
    
    /**
     * 創建新連接
     */
    private Connection createConnection() {
        createdCount++;
        // 這里應該是真實的數據庫連接創建邏輯
        System.out.println("創建第 " + createdCount + " 個連接");
        return new MockConnection();
    }
    
    /**
     * 關閉連接
     */
    private void closeConnection(Connection conn) {
        try {
            conn.close();
        } catch (Exception e) {
            System.err.println("關閉連接失敗: " + e.getMessage());
        }
    }
    
    /**
     * 獲取連接池狀態
     */
    public synchronized void printStatus() {
        System.out.println("連接池狀態 - 池中連接: " + pool.size() + 
                          ", 總創建數: " + createdCount + 
                          ", 最大限制: " + maxSize);
    }
    
    // 模擬數據庫連接
    static class MockConnection implements Connection {
        private final String id = UUID.randomUUID().toString().substring(0, 8);
        
        @Override
        public void close() {
            System.out.println("關閉連接: " + id);
        }
        
        @Override
        public String toString() {
            return "MockConnection{" + "id='" + id + '\'' + '}';
        }
        
        // 其他Connection接口方法...
        @Override public void commit() {}
        @Override public void rollback() {}
        // ... 簡化實現
    }
    
    static class TimeoutException extends Exception {
        public TimeoutException(String message) { super(message); }
    }
}

4.3 線程池核心技術實現

/**
 * 簡易線程池實現
 * 理解線程池的核心原理和工作機制
 */
public class SimpleThreadPool implements Executor {
    private final BlockingQueue<Runnable> workQueue;
    private final List<WorkerThread> workers;
    private volatile boolean isShutdown = false;
    private final int poolSize;
    
    /**
     * 創建線程池
     */
    public SimpleThreadPool(int poolSize) {
        this.poolSize = poolSize;
        this.workQueue = new LinkedBlockingQueue<>();
        this.workers = new ArrayList<>(poolSize);
        
        System.out.println("初始化線程池,大小: " + poolSize);
        
        // 創建工作線程
        for (int i = 0; i < poolSize; i++) {
            WorkerThread worker = new WorkerThread("Pool-Worker-" + i);
            workers.add(worker);
            worker.start();
        }
    }
    
    /**
     * 提交任務到線程池
     */
    @Override
    public void execute(Runnable task) {
        if (isShutdown) {
            throw new RejectedExecutionException("線程池已關閉,拒絕新任務");
        }
        
        if (task == null) {
            throw new NullPointerException("任務不能為null");
        }
        
        try {
            workQueue.put(task); // 阻塞直到有空間
            System.out.println("任務已提交,隊列大小: " + workQueue.size());
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RejectedExecutionException("提交任務時被中斷", e);
        }
    }
    
    /**
     * 優雅關閉線程池
     */
    public void shutdown() {
        System.out.println("開始關閉線程池...");
        isShutdown = true;
        
        // 中斷所有工作線程
        for (WorkerThread worker : workers) {
            worker.interrupt();
        }
    }
    
    /**
     * 強制關閉線程池
     */
    public void shutdownNow() {
        shutdown();
        workQueue.clear(); // 清空等待隊列
    }
    
    /**
     * 等待線程池完全終止
     */
    public boolean awaitTermination(long timeout, TimeUnit unit) 
            throws InterruptedException {
        long endTime = System.currentTimeMillis() + unit.toMillis(timeout);
        
        for (WorkerThread worker : workers) {
            long remaining = endTime - System.currentTimeMillis();
            if (remaining <= 0) {
                return false; // 超時
            }
            worker.join(remaining);
        }
        
        return true;
    }
    
    /**
     * 獲取線程池狀態
     */
    public void printStatus() {
        System.out.println("線程池狀態 - 工作線程: " + workers.size() + 
                          ", 等待任務: " + workQueue.size() + 
                          ", 已關閉: " + isShutdown);
    }
    
    /**
     * 工作線程實現
     */
    private class WorkerThread extends Thread {
        public WorkerThread(String name) {
            super(name);
        }
        
        @Override
        public void run() {
            System.out.println(getName() + " 開始運行");
            
            while (!isShutdown || !workQueue.isEmpty()) {
                try {
                    // 從隊列獲取任務,支持超時以便檢查關閉狀態
                    Runnable task = workQueue.poll(1, TimeUnit.SECONDS);
                    
                    if (task != null) {
                        System.out.println(getName() + " 開始執行任務");
                        task.run();
                        System.out.println(getName() + " 任務執行完成");
                    }
                    
                } catch (InterruptedException e) {
                    // 響應中斷,退出線程
                    System.out.println(getName() + " 收到中斷信號");
                    break;
                } catch (Exception e) {
                    // 捕獲任務執行異常,避免工作線程退出
                    System.err.println(getName() + " 任務執行異常: " + e.getMessage());
                }
            }
            
            System.out.println(getName() + " 退出");
        }
    }
    
    /**
     * 測試線程池
     */
    public static void main(String[] args) throws InterruptedException {
        SimpleThreadPool pool = new SimpleThreadPool(3);
        
        // 提交10個任務
        for (int i = 0; i < 10; i++) {
            final int taskId = i;
            pool.execute(() -> {
                System.out.println(Thread.currentThread().getName() + 
                    " 執行任務 " + taskId);
                try {
                    Thread.sleep(1000); // 模擬任務執行
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }
        
        // 查看狀態
        pool.printStatus();
        
        // 等待任務執行
        Thread.sleep(5000);
        
        // 關閉線程池
        pool.shutdown();
        if (pool.awaitTermination(3, TimeUnit.SECONDS)) {
            System.out.println("線程池已完全關閉");
        } else {
            System.out.println("線程池關閉超時,強制關閉");
            pool.shutdownNow();
        }
    }
}

4.4 基于線程池的Web服務器

/**
 * 基于線程池的簡易Web服務器
 * 演示線程池在實際項目中的應用
 */
public class SimpleHttpServer {
    private final ExecutorService threadPool;
    private final ServerSocket serverSocket;
    private final String basePath;
    private volatile boolean isRunning = false;
    
    /**
     * 創建HTTP服務器
     */
    public SimpleHttpServer(int port, int poolSize, String basePath) throws IOException {
        this.threadPool = Executors.newFixedThreadPool(poolSize);
        this.serverSocket = new ServerSocket(port);
        this.basePath = basePath;
        
        // 確保基礎路徑存在
        File baseDir = new File(basePath);
        if (!baseDir.exists() || !baseDir.isDirectory()) {
            throw new IllegalArgumentException("基礎路徑不存在或不是目錄: " + basePath);
        }
    }
    
    /**
     * 啟動服務器
     */
    public void start() {
        if (isRunning) {
            throw new IllegalStateException("服務器已經在運行");
        }
        
        isRunning = true;
        System.out.println("HTTP服務器啟動,端口: " + serverSocket.getLocalPort() + 
                          ", 基礎路徑: " + basePath);
        
        // 主接受循環
        Thread acceptorThread = new Thread(this::acceptConnections, "Server-Acceptor");
        acceptorThread.setDaemon(false);
        acceptorThread.start();
    }
    
    /**
     * 接受客戶端連接
     */
    private void acceptConnections() {
        while (isRunning) {
            try {
                Socket clientSocket = serverSocket.accept();
                System.out.println("接受客戶端連接: " + 
                    clientSocket.getInetAddress().getHostAddress());
                
                // 提交到線程池處理
                threadPool.execute(new HttpHandler(clientSocket, basePath));
                
            } catch (IOException e) {
                if (isRunning) {
                    System.err.println("接受連接錯誤: " + e.getMessage());
                }
                // 服務器關閉時的異常是正常的
            }
        }
        System.out.println("服務器停止接受新連接");
    }
    
    /**
     * 停止服務器
     */
    public void stop() {
        System.out.println("正在停止服務器...");
        isRunning = false;
        
        try {
            serverSocket.close();
        } catch (IOException e) {
            System.err.println("關閉ServerSocket錯誤: " + e.getMessage());
        }
        
        // 優雅關閉線程池
        threadPool.shutdown();
        try {
            if (!threadPool.awaitTermination(5, TimeUnit.SECONDS)) {
                System.out.println("強制關閉線程池");
                threadPool.shutdownNow();
            }
        } catch (InterruptedException e) {
            threadPool.shutdownNow();
            Thread.currentThread().interrupt();
        }
        
        System.out.println("服務器已停止");
    }
    
    /**
     * HTTP請求處理器
     */
    private static class HttpHandler implements Runnable {
        private final Socket socket;
        private final String basePath;
        
        public HttpHandler(Socket socket, String basePath) {
            this.socket = socket;
            this.basePath = basePath;
        }
        
        @Override
        public void run() {
            // 使用ThreadLocal記錄請求上下文
            ThreadLocal<String> requestId = ThreadLocal.withInitial(
                () -> UUID.randomUUID().toString().substring(0, 8)
            );
            
            try (BufferedReader in = new BufferedReader(
                    new InputStreamReader(socket.getInputStream()));
                 PrintWriter out = new PrintWriter(socket.getOutputStream())) {
                
                String requestIdValue = requestId.get();
                System.out.println("[" + requestIdValue + "] 開始處理請求");
                
                // 解析HTTP請求
                String requestLine = in.readLine();
                if (requestLine == null || requestLine.isEmpty()) {
                    sendError(out, 400, "Bad Request");
                    return;
                }
                
                String[] parts = requestLine.split(" ");
                if (parts.length < 2) {
                    sendError(out, 400, "Bad Request");
                    return;
                }
                
                String method = parts[0];
                String path = parts[1];
                
                System.out.println("[" + requestIdValue + "] " + method + " " + path);
                
                // 只處理GET請求
                if (!"GET".equals(method)) {
                    sendError(out, 405, "Method Not Allowed");
                    return;
                }
                
                // 處理請求路徑
                handleRequest(path, out, requestIdValue);
                
            } catch (IOException e) {
                System.err.println("處理請求IO錯誤: " + e.getMessage());
            } finally {
                // 清理ThreadLocal
                requestId.remove();
                
                try {
                    socket.close();
                } catch (IOException e) {
                    // 忽略關閉異常
                }
            }
        }
        
        private void handleRequest(String path, PrintWriter out, String requestId) {
            try {
                // 簡單路徑安全校驗
                if (path.contains("..")) {
                    sendError(out, 403, "Forbidden");
                    return;
                }
                
                // 默認頁面
                if ("/".equals(path)) {
                    path = "/index.html";
                }
                
                File file = new File(basePath + path);
                
                // 文件不存在
                if (!file.exists() || !file.isFile()) {
                    sendError(out, 404, "Not Found");
                    return;
                }
                
                // 安全檢查:確保文件在基礎路徑內
                if (!file.getCanonicalPath().startsWith(new File(basePath).getCanonicalPath())) {
                    sendError(out, 403, "Forbidden");
                    return;
                }
                
                // 根據文件類型設置Content-Type
                String contentType = getContentType(file.getName());
                
                // 讀取文件內容
                byte[] content = Files.readAllBytes(file.toPath());
                
                // 發送HTTP響應
                out.println("HTTP/1.1 200 OK");
                out.println("Server: SimpleHttpServer");
                out.println("Content-Type: " + contentType);
                out.println("Content-Length: " + content.length);
                out.println("Connection: close");
                out.println(); // 空行分隔頭部和主體
                out.flush();
                
                // 發送文件內容
                socket.getOutputStream().write(content);
                socket.getOutputStream().flush();
                
                System.out.println("[" + requestId + "] 響應發送完成,文件: " + file.getName());
                
            } catch (IOException e) {
                System.err.println("[" + requestId + "] 處理請求錯誤: " + e.getMessage());
                sendError(out, 500, "Internal Server Error");
            }
        }
        
        private String getContentType(String filename) {
            if (filename.endsWith(".html") || filename.endsWith(".htm")) {
                return "text/html; charset=UTF-8";
            } else if (filename.endsWith(".css")) {
                return "text/css";
            } else if (filename.endsWith(".js")) {
                return "application/javascript";
            } else if (filename.endsWith(".jpg") || filename.endsWith(".jpeg")) {
                return "image/jpeg";
            } else if (filename.endsWith(".png")) {
                return "image/png";
            } else {
                return "application/octet-stream";
            }
        }
        
        private void sendError(PrintWriter out, int code, String message) {
            out.println("HTTP/1.1 " + code + " " + message);
            out.println("Content-Type: text/html");
            out.println("Connection: close");
            out.println();
            out.println("<html><body><h1>" + code + " " + message + "</h1></body></html>");
            out.flush();
        }
    }
    
    /**
     * 啟動服務器示例
     */
    public static void main(String[] args) {
        try {
            // 創建服務器,端口8080,線程池大小10,基礎路徑為當前目錄
            SimpleHttpServer server = new SimpleHttpServer(8080, 10, ".");
            server.start();
            
            System.out.println("服務器已啟動,訪問 //localhost:8080/");
            System.out.println("按Enter鍵停止服務器...");
            
            // 等待用戶輸入停止服務器
            System.in.read();
            server.stop();
            
        } catch (Exception e) {
            System.err.println("服務器啟動失敗: " + e.getMessage());
            e.printStackTrace();
        }
    }
}

5. 性能優化與最佳實踐

5.1 線程池大小配置策略

/**
 * 線程池配置策略
 * 根據任務類型合理配置線程池參數
 */
public class ThreadPoolConfig {
    
    /**
     * CPU密集型任務配置
     * 特點:大量計算,很少IO等待
     * 策略:線程數 ≈ CPU核心數,避免過多線程競爭CPU
     */
    public static ExecutorService newCpuIntensivePool() {
        int coreCount = Runtime.getRuntime().availableProcessors();
        int threadCount = coreCount + 1; // +1 確保CPU不會空閑
        System.out.println("CPU密集型線程池: " + threadCount + " 線程");
        return Executors.newFixedThreadPool(threadCount);
    }
    
    /**
     * IO密集型任務配置
     * 特點:大量等待(網絡、磁盤IO)
     * 策略:線程數 ≈ CPU核心數 * (1 + 等待時間/計算時間)
     */
    public static ExecutorService newIoIntensivePool() {
        int coreCount = Runtime.getRuntime().availableProcessors();
        int threadCount = coreCount * 2; // 經驗值,可根據實際情況調整
        System.out.println("IO密集型線程池: " + threadCount + " 線程");
        return Executors.newFixedThreadPool(threadCount);
    }
    
    /**
     * 混合型任務配置
     * 根據CPU和IO比例動態調整
     */
    public static ExecutorService newMixedPool(double cpuRatio, double ioRatio) {
        int coreCount = Runtime.getRuntime().availableProcessors();
        int threadCount = (int) (coreCount * cpuRatio + ioRatio);
        threadCount = Math.max(1, Math.min(threadCount, 100)); // 合理范圍限制
        System.out.println("混合型線程池: " + threadCount + " 線程");
        return Executors.newFixedThreadPool(threadCount);
    }
    
    /**
     * 自定義線程池 - 更精細的控制
     */
    public static ThreadPoolExecutor newCustomPool(int corePoolSize, 
                                                  int maxPoolSize,
                                                  long keepAliveTime,
                                                  int queueSize) {
        return new ThreadPoolExecutor(
            corePoolSize,
            maxPoolSize,
            keepAliveTime,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(queueSize),
            new CustomThreadFactory(),
            new CustomRejectionPolicy()
        );
    }
    
    /**
     * 自定義線程工廠,設置更有意義的線程名稱
     */
    static class CustomThreadFactory implements ThreadFactory {
        private final AtomicInteger counter = new AtomicInteger(1);
        
        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r, "CustomPool-Thread-" + counter.getAndIncrement());
            thread.setDaemon(false);
            thread.setPriority(Thread.NORM_PRIORITY);
            return thread;
        }
    }
    
    /**
     * 自定義拒絕策略
     */
    static class CustomRejectionPolicy implements RejectedExecutionHandler {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            System.err.println("任務被拒絕,當前活躍線程: " + executor.getActiveCount() + 
                             ", 隊列大小: " + executor.getQueue().size());
            // 可以記錄日志、發送告警等
            throw new RejectedExecutionException("線程池已滿,拒絕新任務");
        }
    }
}

5.2 避免常見陷阱

1. 死鎖預防與檢測

/**
 * 死鎖預防示例
 * 演示如何避免和檢測死鎖
 */
public class DeadlockPrevention {
    
    /**
     * 死鎖產生示例 - 錯誤的鎖順序
     */
    public static class DeadlockExample {
        private final Object lock1 = new Object();
        private final Object lock2 = new Object();
        
        public void method1() {
            synchronized (lock1) {
                System.out.println(Thread.currentThread().getName() + " 獲得 lock1");
                try { Thread.sleep(100); } catch (InterruptedException e) {}
                
                synchronized (lock2) {  // 可能死鎖
                    System.out.println(Thread.currentThread().getName() + " 獲得 lock2");
                }
            }
        }
        
        public void method2() {
            synchronized (lock2) {  // 不同的鎖順序
                System.out.println(Thread.currentThread().getName() + " 獲得 lock2");
                try { Thread.sleep(100); } catch (InterruptedException e) {}
                
                synchronized (lock1) {  // 可能死鎖
                    System.out.println(Thread.currentThread().getName() + " 獲得 lock1");
                }
            }
        }
    }
    
    /**
     * 死鎖預防 - 統一的鎖順序
     */
    public static class DeadlockPreventionExample {
        private final Object lock1 = new Object();
        private final Object lock2 = new Object();
        
        /**
         * 使用統一的鎖獲取順序來預防死鎖
         * 總是先獲取lock1,再獲取lock2
         */
        public void method1() {
            synchronized (lock1) {
                System.out.println(Thread.currentThread().getName() + " 獲得 lock1");
                try { Thread.sleep(100); } catch (InterruptedException e) {}
                
                synchronized (lock2) {
                    System.out.println(Thread.currentThread().getName() + " 獲得 lock2");
                    // 業務邏輯
                }
            }
        }
        
        public void method2() {
            synchronized (lock1) {  // 相同的鎖順序
                System.out.println(Thread.currentThread().getName() + " 獲得 lock1");
                try { Thread.sleep(100); } catch (InterruptedException e) {}
                
                synchronized (lock2) {
                    System.out.println(Thread.currentThread().getName() + " 獲得 lock2");
                    // 業務邏輯
                }
            }
        }
    }
    
    /**
     * 使用tryLock避免死鎖
     */
    public static class TryLockExample {
        private final Lock lock1 = new ReentrantLock();
        private final Lock lock2 = new ReentrantLock();
        
        public boolean tryDoWork() {
            // 嘗試獲取第一個鎖
            if (lock1.tryLock()) {
                try {
                    System.out.println(Thread.currentThread().getName() + " 獲得 lock1");
                    
                    // 嘗試獲取第二個鎖
                    if (lock2.tryLock()) {
                        try {
                            System.out.println(Thread.currentThread().getName() + " 獲得 lock2");
                            // 執行業務邏輯
                            return true;
                        } finally {
                            lock2.unlock();
                        }
                    }
                } finally {
                    lock1.unlock();
                }
            }
            return false; // 獲取鎖失敗
        }
    }
    
    /**
     * 死鎖檢測工具
     */
    public static void detectDeadlock() {
        ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
        long[] threadIds = threadBean.findDeadlockedThreads();
        
        if (threadIds != null) {
            System.err.println("檢測到死鎖!");
            ThreadInfo[] threadInfos = threadBean.getThreadInfo(threadIds);
            
            for (ThreadInfo threadInfo : threadInfos) {
                System.err.println("死鎖線程: " + threadInfo.getThreadName());
                System.err.println("等待鎖: " + threadInfo.getLockName());
                System.err.println("被線程持有: " + threadInfo.getLockOwnerName());
            }
        } else {
            System.out.println("未檢測到死鎖");
        }
    }
}

2. 資源清理最佳實踐

/**
 * 資源清理最佳實踐
 * 演示如何正確管理和清理多線程資源
 */
public class ResourceCleanup implements AutoCloseable {
    private final ExecutorService executor;
    private final List<AutoCloseable> resources;
    
    public ResourceCleanup(int threadPoolSize) {
        this.executor = Executors.newFixedThreadPool(threadPoolSize);
        this.resources = new ArrayList<>();
        System.out.println("資源管理器初始化完成,線程池大小: " + threadPoolSize);
    }
    
    /**
     * 提交任務
     */
    public <T> Future<T> submit(Callable<T> task) {
        return executor.submit(task);
    }
    
    /**
     * 注冊需要管理的資源
     */
    public void registerResource(AutoCloseable resource) {
        synchronized (resources) {
            resources.add(resource);
        }
    }
    
    /**
     * 實現AutoCloseable,支持try-with-resources
     */
    @Override
    public void close() {
        System.out.println("開始清理資源...");
        
        // 1. 關閉線程池
        shutdownExecutor();
        
        // 2. 關閉所有注冊的資源
        closeRegisteredResources();
        
        System.out.println("資源清理完成");
    }
    
    /**
     * 優雅關閉線程池
     */
    private void shutdownExecutor() {
        executor.shutdown(); // 停止接受新任務
        try {
            // 等待現有任務完成
            if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
                System.out.println("線程池關閉超時,嘗試強制關閉");
                // 取消所有未開始的任務
                executor.shutdownNow();
                
                // 再次等待
                if (!executor.awaitTermination(3, TimeUnit.SECONDS)) {
                    System.err.println("線程池無法完全關閉");
                }
            }
        } catch (InterruptedException e) {
            // 重新中斷并強制關閉
            executor.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
    
    /**
     * 關閉所有注冊的資源
     */
    private void closeRegisteredResources() {
        synchronized (resources) {
            for (AutoCloseable resource : resources) {
                try {
                    resource.close();
                } catch (Exception e) {
                    System.err.println("關閉資源時出錯: " + e.getMessage());
                    // 繼續關閉其他資源,不拋出異常
                }
            }
            resources.clear();
        }
    }
    
    /**
     * 使用示例
     */
    public static void main(String[] args) {
        // 使用try-with-resources確保資源清理
        try (ResourceCleanup manager = new ResourceCleanup(3)) {
            
            // 注冊一些資源
            manager.registerResource(() -> System.out.println("關閉數據庫連接"));
            manager.registerResource(() -> System.out.println("關閉網絡連接"));
            
            // 提交任務
            Future<String> future = manager.submit(() -> {
                Thread.sleep(1000);
                return "任務完成";
            });
            
            // 獲取結果
            String result = future.get();
            System.out.println("任務結果: " + result);
            
        } catch (Exception e) {
            System.err.println("執行出錯: " + e.getMessage());
        }
        // 這里會自動調用close()方法清理資源
    }
}

6. 總結與核心要點

6.1 關鍵知識點回顧

1. 中斷異常處理的核心理解

try {
    Thread.sleep(1000);
} catch (InterruptedException e) {
    /**
     * 必須重新設置中斷狀態的原因:
     * 1. 當阻塞方法拋出InterruptedException時,會清除線程的中斷狀態
     * 2. 如果不重新設置,調用者無法知道線程曾被中斷
     * 3. 這破壞了中斷的傳播機制
     */
    Thread.currentThread().interrupt(); // 恢復中斷狀態
    // 或者直接拋出異常:throw new RuntimeException(e);
}

2. ThreadLocal內存管理

public void usingThreadLocal() {
    try {
        threadLocal.set(someValue);
        // 使用threadLocal
    } finally {
        /**
         * 必須清理ThreadLocal的原因:
         * 1. ThreadLocalMap使用弱引用作為key,但value是強引用
         * 2. 如果線程長時間存活(線程池),value不會被GC回收
         * 3. 導致內存泄漏,特別是存儲大對象時
         */
        threadLocal.remove(); // 必須調用!
    }
}

6.2 最佳實踐清單

  1. 線程命名:為所有線程設置有意義的名字
  2. 異常處理:在Runnable.run()中捕獲所有異常
  3. 資源清理:使用try-finally或try-with-resources
  4. 中斷響應:合理處理InterruptedException
  5. 鎖順序:統一鎖獲取順序避免死鎖
  6. 線程池:優先使用線程池而非直接創建線程
  7. volatile:僅用于簡單的狀態標志
  8. ThreadLocal清理:使用后必須調用remove()

6.3 性能調優建議

場景 推薦配置 說明
CPU密集型 線程數 = CPU核心數 + 1 減少線程切換開銷
IO密集型 線程數 = CPU核心數 × 2 充分利用等待時間
混合型 根據監控動態調整 結合實際負載

6.4 常見問題排查

  1. 死鎖檢測:使用jstack或ThreadMXBean
  2. 內存泄漏:檢查ThreadLocal使用,特別是線程池場景
  3. CPU過高:檢查是否存在忙等待或過多線程競爭
  4. 響應慢:檢查鎖競爭、IO阻塞或線程池配置

掌握這些Java并發編程的基礎知識和最佳實踐,能夠幫助開發者構建出高性能、高可靠的多線程應用程序。記住,并發編程的核心在于正確的同步、合理的資源管理和清晰的線程通信

posted @ 2025-10-27 19:52  佛祖讓我來巡山  閱讀(153)  評論(1)    收藏  舉報

佛祖讓我來巡山博客站 - 創建于 2018-08-15

開發工(gong)程師個人站,內容主要是網(wang)站開發方面的技術文章,大部分來自學習或(huo)工(gong)作,部分來源于(yu)網(wang)絡,希(xi)望對大家有所幫助。