首頁 > 軟體

Java 並行程式設計要點

2021-01-13 23:00:35

使用執行緒

有三種使用執行緒的方法:

  • 實現 Runnable 介面;
  • 實現 Callable 介面;
  • 繼承 Thread 類。

實現 Runnable 和 Callable 介面的類只能當做一個可以線上程中執行的任務,不是真正意義上的執行緒,因此最後還需要通過 Thread 來呼叫。可以理解為任務是通過執行緒驅動從而執行的。

實現 Runnable 介面

需要實現介面中的 run() 方法。

public class MyRunnable implements Runnable {
    @Override
    public void run() {
        // ...
    }
}

使用 Runnable 範例再建立一個 Thread 範例,然後呼叫 Thread 範例的 start() 方法來啟動執行緒。

public static void main(String[] args) {
    MyRunnable instance = new MyRunnable();
    Thread thread = new Thread(instance);
    thread.start();
}

實現 Callable 介面

與 Runnable 相比,Callable 可以有返回值,返回值通過 FutureTask 進行封裝。

public class MyCallable implements Callable<Integer> {
    public Integer call() {
        return 123;
    }
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
    MyCallable mc = new MyCallable();
    FutureTask<Integer> ft = new FutureTask<>(mc);
    Thread thread = new Thread(ft);
    thread.start();
    System.out.println(ft.get());
}

繼承 Thread 類

同樣也是需要實現 run() 方法,因為 Thread 類也實現了 Runable 介面。

當呼叫 start() 方法啟動一個執行緒時,虛擬機器器會將該執行緒放入就緒佇列中等待被排程,當一個執行緒被排程時會執行該執行緒的 run() 方法。

public class MyThread extends Thread {
    public void run() {
        // ...
    }
}
public static void main(String[] args) {
    MyThread mt = new MyThread();
    mt.start();
}

實現介面 VS 繼承 Thread

實現介面會更好一些,因為:

  • Java 不支援多重繼承,因此繼承了 Thread 類就無法繼承其它類,但是可以實現多個介面;
  • 類可能只要求可執行就行,繼承整個 Thread 類開銷過大。

執行緒機制

Executor

Executor 管理多個非同步任務的執行,而無需程式設計師顯式地管理執行緒的生命週期。這裡的非同步是指多個任務的執行互不干擾,不需要進行同步操作。

主要有三種 Executor:

  • CachedThreadPool:一個任務建立一個執行緒;
  • FixedThreadPool:所有任務只能使用固定大小的執行緒;
  • SingleThreadExecutor:相當於大小為 1 的 FixedThreadPool。
public static void main(String[] args) {
    ExecutorService executorService = Executors.newCachedThreadPool();
    for (int i = 0; i < 5; i++) {
        executorService.execute(new MyRunnable());
    }
    executorService.shutdown();
}

Daemon

守護執行緒是程式執行時在後臺提供服務的執行緒,不屬於程式中不可或缺的部分。

當所有非守護執行緒結束時,程式也就終止,同時會殺死所有守護執行緒。

main() 屬於非守護執行緒。

線上程啟動之前使用 setDaemon() 方法可以將一個執行緒設定為守護執行緒。

public static void main(String[] args) {
    Thread thread = new Thread(new MyRunnable());
    thread.setDaemon(true);
}

sleep()

Thread.sleep(millisec) 方法會休眠當前正在執行的執行緒,millisec 單位為毫秒。

sleep() 可能會丟擲 InterruptedException,因為異常不能跨執行緒傳播回 main() 中,因此必須在本地進行處理。執行緒中丟擲的其它異常也同樣需要在本地進行處理。

public void run() {
    try {
        Thread.sleep(3000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

yield()

對靜態方法 Thread.yield() 的呼叫宣告了當前執行緒已經完成了生命週期中最重要的部分,可以切換給其它執行緒來執行。該方法只是對執行緒排程器的一個建議,而且也只是建議具有相同優先順序的其它執行緒可以執行。

public void run() {
    Thread.yield();
}

中斷執行緒

一個執行緒執行完畢之後會自動結束,如果在執行過程中發生異常也會提前結束。

InterruptedException

通過呼叫一個執行緒的 interrupt() 來中斷該執行緒,如果該執行緒處於阻塞、限期等待或者無限期等待狀態,那麼就會丟擲 InterruptedException,從而提前結束該執行緒。但是不能中斷 I/O 阻塞和 synchronized 鎖阻塞。

對於以下程式碼,在 main() 中啟動一個執行緒之後再中斷它,由於執行緒中呼叫了 Thread.sleep() 方法,因此會丟擲一個 InterruptedException,從而提前結束執行緒,不執行之後的語句。

public class InterruptExample {

    private static class MyThread1 extends Thread {
        @Override
        public void run() {
            try {
                Thread.sleep(2000);
                System.out.println("Thread run");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
public static void main(String[] args) throws InterruptedException {
    Thread thread1 = new MyThread1();
    thread1.start();
    thread1.interrupt();
    System.out.println("Main run");
}
Main run
java.lang.InterruptedException: sleep interrupted
    at java.lang.Thread.sleep(Native Method)
    at InterruptExample.lambda$main$0(InterruptExample.java:5)
    at InterruptExample$$Lambda$1/713338599.run(Unknown Source)
    at java.lang.Thread.run(Thread.java:745)

interrupted()

如果一個執行緒的 run() 方法執行一個無限迴圈,並且沒有執行 sleep() 等會丟擲 InterruptedException 的操作,那麼呼叫執行緒的 interrupt() 方法就無法使執行緒提前結束。

但是呼叫 interrupt() 方法會設定執行緒的中斷標記,此時呼叫 interrupted() 方法會返回 true。因此可以在迴圈體中使用 interrupted() 方法來判斷執行緒是否處於中斷狀態,從而提前結束執行緒。

public class InterruptExample {

    private static class MyThread2 extends Thread {
        @Override
        public void run() {
            while (!interrupted()) {
                // ..
            }
            System.out.println("Thread end");
        }
    }
}
public static void main(String[] args) throws InterruptedException {
    Thread thread2 = new MyThread2();
    thread2.start();
    thread2.interrupt();
}
Thread end

Executor 的中斷操作

呼叫 Executor 的 shutdown() 方法會等待執行緒都執行完畢之後再關閉,但是如果呼叫的是 shutdownNow() 方法,則相當於呼叫每個執行緒的 interrupt() 方法。

以下使用 Lambda 建立執行緒,相當於建立了一個匿名內部執行緒。

public static void main(String[] args) {
    ExecutorService executorService = Executors.newCachedThreadPool();
    executorService.execute(() -> {
        try {
            Thread.sleep(2000);
            System.out.println("Thread run");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });
    executorService.shutdownNow();
    System.out.println("Main run");
}
Main run
java.lang.InterruptedException: sleep interrupted
    at java.lang.Thread.sleep(Native Method)
    at ExecutorInterruptExample.lambda$main$0(ExecutorInterruptExample.java:9)
    at ExecutorInterruptExample$$Lambda$1/1160460865.run(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

如果只想中斷 Executor 中的一個執行緒,可以通過使用 submit() 方法來提交一個執行緒,它會返回一個 Future<?> 物件,通過呼叫該物件的 cancel(true) 方法就可以中斷執行緒。

Future<?> future = executorService.submit(() -> {
    // ..
});
future.cancel(true);

互斥同步

Java 提供了兩種鎖機制來控制多個執行緒對共用資源的互斥存取,第一個是 JVM 實現的 synchronized,而另一個是 JDK 實現的 ReentrantLock。

synchronized

1. 同步一個程式碼塊

public void func() {
    synchronized (this) {
        // ...
    }
}

它只作用於同一個物件,如果呼叫兩個物件上的同步程式碼塊,就不會進行同步。

對於以下程式碼,使用 ExecutorService 執行了兩個執行緒,由於呼叫的是同一個物件的同步程式碼塊,因此這兩個執行緒會進行同步,當一個執行緒進入同步語句塊時,另一個執行緒就必須等待。

public class SynchronizedExample {

    public void func1() {
        synchronized (this) {
            for (int i = 0; i < 10; i++) {
                System.out.print(i + " ");
            }
        }
    }
}
public static void main(String[] args) {
    SynchronizedExample e1 = new SynchronizedExample();
    ExecutorService executorService = Executors.newCachedThreadPool();
    executorService.execute(() -> e1.func1());
    executorService.execute(() -> e1.func1());
}
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9

對於以下程式碼,兩個執行緒呼叫了不同物件的同步程式碼塊,因此這兩個執行緒就不需要同步。從輸出結果可以看出,兩個執行緒交叉執行。

public static void main(String[] args) {
    SynchronizedExample e1 = new SynchronizedExample();
    SynchronizedExample e2 = new SynchronizedExample();
    ExecutorService executorService = Executors.newCachedThreadPool();
    executorService.execute(() -> e1.func1());
    executorService.execute(() -> e2.func1());
}
0 0 1 1 2 2 3 3 4 4 5 5 6 6 7 7 8 8 9 9

2. 同步一個方法

public synchronized void func () {
    // ...
}

它和同步程式碼塊一樣,作用於同一個物件。

3. 同步一個類

public void func() {
    synchronized (SynchronizedExample.class) {
        // ...
    }
}

作用於整個類,也就是說兩個執行緒呼叫同一個類的不同物件上的這種同步語句,也會進行同步。

public class SynchronizedExample {

    public void func2() {
        synchronized (SynchronizedExample.class) {
            for (int i = 0; i < 10; i++) {
                System.out.print(i + " ");
            }
        }
    }
}
public static void main(String[] args) {
    SynchronizedExample e1 = new SynchronizedExample();
    SynchronizedExample e2 = new SynchronizedExample();
    ExecutorService executorService = Executors.newCachedThreadPool();
    executorService.execute(() -> e1.func2());
    executorService.execute(() -> e2.func2());
}
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9

4. 同步一個靜態方法

public synchronized static void fun() {
    // ...
}

作用於整個類。

ReentrantLock

ReentrantLock 是 java.util.concurrent(J.U.C)包中的鎖。

public class LockExample {

    private Lock lock = new ReentrantLock();

    public void func() {
        lock.lock();
        try {
            for (int i = 0; i < 10; i++) {
                System.out.print(i + " ");
            }
        } finally {
            lock.unlock(); // 確保釋放鎖,從而避免發生死鎖。
        }
    }
}
public static void main(String[] args) {
    LockExample lockExample = new LockExample();
    ExecutorService executorService = Executors.newCachedThreadPool();
    executorService.execute(() -> lockExample.func());
    executorService.execute(() -> lockExample.func());
}
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9

比較

1. 鎖的實現

synchronized 是 JVM 實現的,而 ReentrantLock 是 JDK 實現的。

2. 效能

新版本 Java 對 synchronized 進行了很多優化,例如自旋鎖等,synchronized 與 ReentrantLock 大致相同。

3. 等待可中斷

當持有鎖的執行緒長期不釋放鎖的時候,正在等待的執行緒可以選擇放棄等待,改為處理其他事情。

ReentrantLock 可中斷,而 synchronized 不行。

4. 公平鎖

公平鎖是指多個執行緒在等待同一個鎖時,必須按照申請鎖的時間順序來依次獲得鎖。

synchronized 中的鎖是非公平的,ReentrantLock 預設情況下也是非公平的,但是也可以是公平的。

5. 鎖繫結多個條件

一個 ReentrantLock 可以同時繫結多個 Condition 物件。

使用選擇

除非需要使用 ReentrantLock 的高階功能,否則優先使用 synchronized。這是因為 synchronized 是 JVM 實現的一種鎖機制,JVM 原生地支援它,而 ReentrantLock 不是所有的 JDK 版本都支援。並且使用 synchronized 不用擔心沒有釋放鎖而導致死鎖問題,因為 JVM 會確保鎖的釋放。

執行緒之間的共同作業

當多個執行緒可以一起工作去解決某個問題時,如果某些部分必須在其它部分之前完成,那麼就需要對執行緒進行協調。

join()

線上程中呼叫另一個執行緒的 join() 方法,會將當前執行緒掛起,而不是忙等待,直到目標執行緒結束。

對於以下程式碼,雖然 b 執行緒先啟動,但是因為在 b 執行緒中呼叫了 a 執行緒的 join() 方法,b 執行緒會等待 a 執行緒結束才繼續執行,因此最後能夠保證 a 執行緒的輸出先於 b 執行緒的輸出。

public class JoinExample {

    private class A extends Thread {
        @Override
        public void run() {
            System.out.println("A");
        }
    }

    private class B extends Thread {

        private A a;

        B(A a) {
            this.a = a;
        }

        @Override
        public void run() {
            try {
                a.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("B");
        }
    }

    public void test() {
        A a = new A();
        B b = new B(a);
        b.start();
        a.start();
    }
}
public static void main(String[] args) {
    JoinExample example = new JoinExample();
    example.test();
}
A
B

wait()、notify()、notifyAll()

呼叫 wait() 使得執行緒等待某個條件滿足,執行緒在等待時會被掛起,當其他執行緒的執行使得這個條件滿足時,其它執行緒會呼叫 notify() 或者 notifyAll() 來喚醒掛起的執行緒。

它們都屬於 Object 的一部分,而不屬於 Thread。

只能用在同步方法或者同步控制塊中使用,否則會在執行時丟擲 IllegalMonitorStateException。

使用 wait() 掛起期間,執行緒會釋放鎖。這是因為,如果沒有釋放鎖,那麼其它執行緒就無法進入物件的同步方法或者同步控制塊中,那麼就無法執行 notify() 或者 notifyAll() 來喚醒掛起的執行緒,造成死鎖。

public class WaitNotifyExample {

    public synchronized void before() {
        System.out.println("before");
        notifyAll();
    }

    public synchronized void after() {
        try {
            wait();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("after");
    }
}
public static void main(String[] args) {
    ExecutorService executorService = Executors.newCachedThreadPool();
    WaitNotifyExample example = new WaitNotifyExample();
    executorService.execute(() -> example.after());
    executorService.execute(() -> example.before());
}
before
after

wait() 和 sleep() 的區別

  • wait() 是 Object 的方法,而 sleep() 是 Thread 的靜態方法;
  • wait() 會釋放鎖,sleep() 不會。

await()、signal()、signalAll()

java.util.concurrent 類庫中提供了 Condition 類來實現執行緒之間的協調,可以在 Condition 上呼叫 await() 方法使執行緒等待,其它執行緒呼叫 signal() 或 signalAll() 方法喚醒等待的執行緒。

相比於 wait() 這種等待方式,await() 可以指定等待的條件,因此更加靈活。

使用 Lock 來獲取一個 Condition 物件。

public class AwaitSignalExample {

    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    public void before() {
        lock.lock();
        try {
            System.out.println("before");
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }

    public void after() {
        lock.lock();
        try {
            condition.await();
            System.out.println("after");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}
public static void main(String[] args) {
    ExecutorService executorService = Executors.newCachedThreadPool();
    AwaitSignalExample example = new AwaitSignalExample();
    executorService.execute(() -> example.after());
    executorService.execute(() -> example.before());
}
before
after

執行緒狀態

一個執行緒只能處於一種狀態,並且這裡的執行緒狀態特指 Java 虛擬機器器的執行緒狀態,不能反映執行緒在特定作業系統下的狀態。

新建(New)

建立後尚未啟動。

可執行(Runable)

正在 Java 虛擬機器器中執行。但是在作業系統層面,它可能處於執行狀態,也可能等待資源排程(例如處理器資源),資源排程完成就進入執行狀態。所以該狀態的可執行是指可以被執行,具體有沒有執行要看底層作業系統的資源排程。

阻塞(Blocked)

請求獲取 monitor lock 從而進入 synchronized 函數或者程式碼塊,但是其它執行緒已經佔用了該 monitor lock,所以出於阻塞狀態。要結束該狀態進入從而 RUNABLE 需要其他執行緒釋放 monitor lock。

無限期等待(Waiting)

等待其它執行緒顯式地喚醒。

阻塞和等待的區別在於,阻塞是被動的,它是在等待獲取 monitor lock。而等待是主動的,通過呼叫 Object.wait() 等方法進入。

進入方法 退出方法
沒有設定 Timeout 引數的 Object.wait() 方法 Object.notify() / Object.notifyAll()
沒有設定 Timeout 引數的 Thread.join() 方法 被呼叫的執行緒執行完畢
LockSupport.park() 方法 LockSupport.unpark(Thread)

限期等待(Timed waiting)

無需等待其它執行緒顯式地喚醒,在一定時間之後會被系統自動喚醒。

進入方法 退出方法
Thread.sleep() 方法 時間結束
設定了 Timeout 引數的 Object.wait() 方法 時間結束 / Object.notify() / Object.notifyAll()
設定了 Timeout 引數的 Thread.join() 方法 時間結束 / 被呼叫的執行緒執行完畢
LockSupport.parkNanos() 方法 LockSupport.unpark(Thread)
LockSupport.parkUntil() 方法 LockSupport.unpark(Thread)

呼叫 Thread.sleep() 方法使執行緒進入限期等待狀態時,常常用「使一個執行緒睡眠」進行描述。呼叫 Object.wait() 方法使執行緒進入限期等待或者無限期等待時,常常用「掛起一個執行緒」進行描述。睡眠和掛起是用來描述行為,而阻塞和等待用來描述狀態。

死亡(Terminated)

可以是執行緒結束任務之後自己結束,或者產生了異常而結束。

執行緒安全

多個執行緒不管以何種方式存取某個類,並且在主調程式碼中不需要進行同步,都能表現正確的行為。

執行緒安全有以下幾種實現方式:

不可變

不可變(Immutable)的物件一定是執行緒安全的,不需要再採取任何的執行緒安全保障措施。只要一個不可變的物件被正確地構建出來,永遠也不會看到它在多個執行緒之中處於不一致的狀態。多執行緒環境下,應當儘量使物件成為不可變,來滿足執行緒安全。

不可變的型別:

  • final 關鍵字修飾的基本資料型別
  • String
  • 列舉型別
  • Number 部分子類,如 Long 和 Double 等數值包裝型別,BigInteger 和 BigDecimal 等巨量資料型別。但同為 Number 的原子類 AtomicInteger 和 AtomicLong 則是可變的。

對於集合型別,可以使用 Collections.unmodifiableXXX() 方法來獲取一個不可變的集合。

public class ImmutableExample {
    public static void main(String[] args) {
        Map<String, Integer> map = new HashMap<>();
        Map<String, Integer> unmodifiableMap = Collections.unmodifiableMap(map);
        unmodifiableMap.put("a", 1);
    }
}
Exception in thread "main" java.lang.UnsupportedOperationException
    at java.util.Collections$UnmodifiableMap.put(Collections.java:1457)
    at ImmutableExample.main(ImmutableExample.java:9)

Collections.unmodifiableXXX() 先對原始的集合進行拷貝,需要對集合進行修改的方法都直接丟擲異常。

public V put(K key, V value) {
    throw new UnsupportedOperationException();
}

互斥同步

synchronized 和 ReentrantLock。

非阻塞同步

互斥同步最主要的問題就是執行緒阻塞和喚醒所帶來的效能問題,因此這種同步也稱為阻塞同步。

互斥同步屬於一種悲觀的並行策略,總是認為只要不去做正確的同步措施,那就肯定會出現問題。無論共用資料是否真的會出現競爭,它都要進行加鎖(這裡討論的是概念模型,實際上虛擬機器器會優化掉很大一部分不必要的加鎖)、使用者態核心態轉換、維護鎖計數器和檢查是否有被阻塞的執行緒需要喚醒等操作。

隨著硬體指令集的發展,我們可以使用基於衝突檢測的樂觀並行策略:先進行操作,如果沒有其它執行緒爭用共用資料,那操作就成功了,否則採取補償措施(不斷地重試,直到成功為止)。這種樂觀的並行策略的許多實現都不需要將執行緒阻塞,因此這種同步操作稱為非阻塞同步。

1. CAS

樂觀鎖需要操作和衝突檢測這兩個步驟具備原子性,這裡就不能再使用互斥同步來保證了,只能靠硬體來完成。硬體支援的原子性操作最典型的是:比較並交換(Compare-and-Swap,CAS)。CAS 指令需要有 3 個運算元,分別是記憶體地址 V、舊的預期值 A 和新值 B。當執行操作時,只有當 V 的值等於 A,才將 V 的值更新為 B。

2. AtomicInteger

J.U.C 包裡面的整數原子類 AtomicInteger 的方法呼叫了 Unsafe 類的 CAS 操作。

以下程式碼使用了 AtomicInteger 執行了自增的操作。

private AtomicInteger cnt = new AtomicInteger();

public void add() {
    cnt.incrementAndGet();
}

以下程式碼是 incrementAndGet() 的原始碼,它呼叫了 Unsafe 的 getAndAddInt() 。

public final int incrementAndGet() {
    return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}

以下程式碼是 getAndAddInt() 原始碼,var1 指示物件記憶體地址,var2 指示該欄位相對物件記憶體地址的偏移,var4 指示操作需要加的數值,這裡為 1。通過 getIntVolatile(var1, var2) 得到舊的預期值,通過呼叫 compareAndSwapInt() 來進行 CAS 比較,如果該欄位記憶體地址中的值等於 var5,那麼就更新記憶體地址為 var1+var2 的變數為 var5+var4。

可以看到 getAndAddInt() 在一個迴圈中進行,發生衝突的做法是不斷的進行重試。

public final int getAndAddInt(Object var1, long var2, int var4) {
    int var5;
    do {
        var5 = this.getIntVolatile(var1, var2);
    } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

    return var5;
}

3. ABA

如果一個變數初次讀取的時候是 A 值,它的值被改成了 B,後來又被改回為 A,那 CAS 操作就會誤認為它從來沒有被改變過。

J.U.C 包提供了一個帶有標記的原子參照類 AtomicStampedReference 來解決這個問題,它可以通過控制變數值的版本來保證 CAS 的正確性。大部分情況下 ABA 問題不會影響程式並行的正確性,如果需要解決 ABA 問題,改用傳統的互斥同步可能會比原子類更高效。

無同步方案

要保證執行緒安全,並不是一定就要進行同步。如果一個方法本來就不涉及共用資料,那它自然就無須任何同步措施去保證正確性。

1. 棧封閉

多個執行緒存取同一個方法的區域性變數時,不會出現執行緒安全問題,因為區域性變數儲存在虛擬機器器棧中,屬於執行緒私有的。

public class StackClosedExample {
    public void add100() {
        int cnt = 0;
        for (int i = 0; i < 100; i++) {
            cnt++;
        }
        System.out.println(cnt);
    }
}
public static void main(String[] args) {
    StackClosedExample example = new StackClosedExample();
    ExecutorService executorService = Executors.newCachedThreadPool();
    executorService.execute(() -> example.add100());
    executorService.execute(() -> example.add100());
    executorService.shutdown();
}
100
100

2. 執行緒本地儲存(Thread Local Storage)

如果一段程式碼中所需要的資料必須與其他程式碼共用,那就看看這些共用資料的程式碼是否能保證在同一個執行緒中執行。如果能保證,我們就可以把共用資料的可見範圍限制在同一個執行緒之內,這樣,無須同步也能保證執行緒之間不出現資料爭用的問題。

符合這種特點的應用並不少見,大部分使用消費佇列的架構模式(如「生產者-消費者」模式)都會將產品的消費過程儘量在一個執行緒中消費完。其中最重要的一個應用範例就是經典 Web 互動模型中的「一個請求對應一個伺服器執行緒」(Thread-per-Request)的處理方式,這種處理方式的廣泛應用使得很多 Web 伺服器端應用都可以使用執行緒本地儲存來解決執行緒安全問題。

可以使用 java.lang.ThreadLocal 類來實現執行緒本地儲存功能。

對於以下程式碼,thread1 中設定 threadLocal 為 1,而 thread2 設定 threadLocal 為 2。過了一段時間之後,thread1 讀取 threadLocal 依然是 1,不受 thread2 的影響。

public class ThreadLocalExample {
    public static void main(String[] args) {
        ThreadLocal threadLocal = new ThreadLocal();
        Thread thread1 = new Thread(() -> {
            threadLocal.set(1);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(threadLocal.get());
            threadLocal.remove();
        });
        Thread thread2 = new Thread(() -> {
            threadLocal.set(2);
            threadLocal.remove();
        });
        thread1.start();
        thread2.start();
    }
}
1

為了理解 ThreadLocal,先看以下程式碼:

public class ThreadLocalExample1 {
    public static void main(String[] args) {
        ThreadLocal threadLocal1 = new ThreadLocal();
        ThreadLocal threadLocal2 = new ThreadLocal();
        Thread thread1 = new Thread(() -> {
            threadLocal1.set(1);
            threadLocal2.set(1);
        });
        Thread thread2 = new Thread(() -> {
            threadLocal1.set(2);
            threadLocal2.set(2);
        });
        thread1.start();
        thread2.start();
    }
}

它所對應的底層結構圖為:

每個 Thread 都有一個 ThreadLocal.ThreadLocalMap 物件。

/* ThreadLocal values pertaining to this thread. This map is maintained
 * by the ThreadLocal class. */
ThreadLocal.ThreadLocalMap threadLocals = null;

當呼叫一個 ThreadLocal 的 set(T value) 方法時,先得到當前執行緒的 ThreadLocalMap 物件,然後將 ThreadLocal->value 鍵值對插入到該 Map 中。

public void set(T value) {
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null)
        map.set(this, value);
    else
        createMap(t, value);
}

get() 方法類似。

public T get() {
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null) {
        ThreadLocalMap.Entry e = map.getEntry(this);
        if (e != null) {
            @SuppressWarnings("unchecked")
            T result = (T)e.value;
            return result;
        }
    }
    return setInitialValue();
}

ThreadLocal 從理論上講並不是用來解決多執行緒並行問題的,因為根本不存在多執行緒競爭。

在一些場景 (尤其是使用執行緒池) 下,由於 ThreadLocal.ThreadLocalMap 的底層資料結構導致 ThreadLocal 有記憶體漏失的情況,應該儘可能在每次使用 ThreadLocal 後手動呼叫 remove(),以避免出現 ThreadLocal 經典的記憶體漏失甚至是造成自身業務混亂的風險。

3. 可重入程式碼(Reentrant Code)

這種程式碼也叫做純程式碼(Pure Code),可以在程式碼執行的任何時刻中斷它,轉而去執行另外一段程式碼(包括遞迴呼叫它本身),而在控制權返回後,原來的程式不會出現任何錯誤。

可重入程式碼有一些共同的特徵,例如不依賴儲存在堆上的資料和公用的系統資源、用到的狀態量都由引數中傳入、不呼叫非可重入的方法等。

執行緒不安全範例

如果多個執行緒對同一個共用資料進行存取而不採取同步操作的話,那麼操作的結果是不一致的。

以下程式碼演示了 1000 個執行緒同時對 cnt 執行自增操作,操作結束之後它的值有可能小於 1000。

public class ThreadUnsafeExample {

    private int cnt = 0;

    public void add() {
        cnt++;
    }

    public int get() {
        return cnt;
    }
}
public static void main(String[] args) throws InterruptedException {
    final int threadSize = 1000;
    ThreadUnsafeExample example = new ThreadUnsafeExample();
    final CountDownLatch countDownLatch = new CountDownLatch(threadSize);
    ExecutorService executorService = Executors.newCachedThreadPool();
    for (int i = 0; i < threadSize; i++) {
        executorService.execute(() -> {
            example.add();
            countDownLatch.countDown();
        });
    }
    countDownLatch.await();
    executorService.shutdown();
    System.out.println(example.get());
}
997

J.U.C - AQS

java.util.concurrent(J.U.C)大大提高了並行效能,AQS 被認為是 J.U.C 的核心。

CountDownLatch

用來控制一個或者多個執行緒等待多個執行緒。

維護了一個計數器 cnt,每次呼叫 countDown() 方法會讓計數器的值減 1,減到 0 的時候,那些因為呼叫 await() 方法而在等待的執行緒就會被喚醒。

public class CountdownLatchExample {

    public static void main(String[] args) throws InterruptedException {
        final int totalThread = 10;
        CountDownLatch countDownLatch = new CountDownLatch(totalThread);
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < totalThread; i++) {
            executorService.execute(() -> {
                System.out.print("run..");
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        System.out.println("end");
        executorService.shutdown();
    }
}
run..run..run..run..run..run..run..run..run..run..end

CyclicBarrier

用來控制多個執行緒互相等待,只有當多個執行緒都到達時,這些執行緒才會繼續執行。

和 CountdownLatch 相似,都是通過維護計數器來實現的。執行緒執行 await() 方法之後計數器會減 1,並進行等待,直到計數器為 0,所有呼叫 await() 方法而在等待的執行緒才能繼續執行。

CyclicBarrier 和 CountdownLatch 的一個區別是,CyclicBarrier 的計數器通過呼叫 reset() 方法可以迴圈使用,所以它才叫做迴圈屏障。

CyclicBarrier 有兩個建構函式,其中 parties 指示計數器的初始值,barrierAction 在所有執行緒都到達屏障的時候會執行一次。

public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
}

public CyclicBarrier(int parties) {
    this(parties, null);
}
public class CyclicBarrierExample {

    public static void main(String[] args) {
        final int totalThread = 10;
        CyclicBarrier cyclicBarrier = new CyclicBarrier(totalThread);
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < totalThread; i++) {
            executorService.execute(() -> {
                System.out.print("before..");
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.print("after..");
            });
        }
        executorService.shutdown();
    }
}
before..before..before..before..before..before..before..before..before..before..after..after..after..after..after..after..after..after..after..after..

Semaphore

Semaphore 類似於作業系統中的號誌,可以控制對互斥資源的存取執行緒數。

以下程式碼模擬了對某個服務的並行請求,每次只能有 3 個使用者端同時存取,請求總數為 10。

public class SemaphoreExample {

    public static void main(String[] args) {
        final int clientCount = 3;
        final int totalRequestCount = 10;
        Semaphore semaphore = new Semaphore(clientCount);
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < totalRequestCount; i++) {
            executorService.execute(()->{
                try {
                    semaphore.acquire();
                    System.out.print(semaphore.availablePermits() + " ");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    semaphore.release();
                }
            });
        }
        executorService.shutdown();
    }
}
2 1 2 2 2 2 2 1 2 2

J.U.C - 其它元件

FutureTask

在介紹 Callable 時我們知道它可以有返回值,返回值通過 Future 進行封裝。FutureTask 實現了 RunnableFuture 介面,該介面繼承自 Runnable 和 Future 介面,這使得 FutureTask 既可以當做一個任務執行,也可以有返回值。

public class FutureTask<V> implements RunnableFuture<V>
public interface RunnableFuture<V> extends Runnable, Future<V>

FutureTask 可用於非同步獲取執行結果或取消執行任務的場景。當一個計算任務需要執行很長時間,那麼就可以用 FutureTask 來封裝這個任務,主執行緒在完成自己的任務之後再去獲取結果。

public class FutureTaskExample {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask<Integer> futureTask = new FutureTask<Integer>(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                int result = 0;
                for (int i = 0; i < 100; i++) {
                    Thread.sleep(10);
                    result += i;
                }
                return result;
            }
        });

        Thread computeThread = new Thread(futureTask);
        computeThread.start();

        Thread otherThread = new Thread(() -> {
            System.out.println("other task is running...");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        otherThread.start();
        System.out.println(futureTask.get());
    }
}
other task is running...
4950

BlockingQueue

java.util.concurrent.BlockingQueue 介面有以下阻塞佇列的實現:

  • FIFO 佇列 :LinkedBlockingQueue、ArrayBlockingQueue(固定長度)
  • 優先順序佇列 :PriorityBlockingQueue

提供了阻塞的 take() 和 put() 方法:如果佇列為空 take() 將阻塞,直到佇列中有內容;如果佇列為滿 put() 將阻塞,直到佇列有空閒位置。

使用 BlockingQueue 實現生產者消費者問題

public class ProducerConsumer {

    private static BlockingQueue<String> queue = new ArrayBlockingQueue<>(5);

    private static class Producer extends Thread {
        @Override
        public void run() {
            try {
                queue.put("product");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.print("produce..");
        }
    }

    private static class Consumer extends Thread {

        @Override
        public void run() {
            try {
                String product = queue.take();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.print("consume..");
        }
    }
}
public static void main(String[] args) {
    for (int i = 0; i < 2; i++) {
        Producer producer = new Producer();
        producer.start();
    }
    for (int i = 0; i < 5; i++) {
        Consumer consumer = new Consumer();
        consumer.start();
    }
    for (int i = 0; i < 3; i++) {
        Producer producer = new Producer();
        producer.start();
    }
}
produce..produce..consume..consume..produce..consume..produce..consume..produce..consume..

ForkJoin

主要用於平行計算中,和 MapReduce 原理類似,都是把大的計算任務拆分成多個小任務平行計算。

public class ForkJoinExample extends RecursiveTask<Integer> {

    private final int threshold = 5;
    private int first;
    private int last;

    public ForkJoinExample(int first, int last) {
        this.first = first;
        this.last = last;
    }

    @Override
    protected Integer compute() {
        int result = 0;
        if (last - first <= threshold) {
            // 任務足夠小則直接計算
            for (int i = first; i <= last; i++) {
                result += i;
            }
        } else {
            // 拆分成小任務
            int middle = first + (last - first) / 2;
            ForkJoinExample leftTask = new ForkJoinExample(first, middle);
            ForkJoinExample rightTask = new ForkJoinExample(middle + 1, last);
            leftTask.fork();
            rightTask.fork();
            result = leftTask.join() + rightTask.join();
        }
        return result;
    }
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
    ForkJoinExample example = new ForkJoinExample(1, 10000);
    ForkJoinPool forkJoinPool = new ForkJoinPool();
    Future result = forkJoinPool.submit(example);
    System.out.println(result.get());
}

ForkJoin 使用 ForkJoinPool 來啟動,它是一個特殊的執行緒池,執行緒數量取決於 CPU 核數。

public class ForkJoinPool extends AbstractExecutorService

ForkJoinPool 實現了工作竊取演演算法來提高 CPU 的利用率。每個執行緒都維護了一個雙端佇列,用來儲存需要執行的任務。工作竊取演演算法允許空閒的執行緒從其它執行緒的雙端佇列中竊取一個任務來執行。竊取的任務必須是最晚的任務,避免和佇列所屬執行緒發生競爭。例如下圖中,Thread2 從 Thread1 的佇列中拿出最晚的 Task1 任務,Thread1 會拿出 Task2 來執行,這樣就避免發生競爭。但是如果佇列中只有一個任務時還是會發生競爭。

多執行緒開發良好的實踐

  • 給執行緒起個有意義的名字,這樣可以方便找 Bug。
  • 縮小同步範圍,從而減少鎖爭用。例如對於 synchronized,應該儘量使用同步塊而不是同步方法。
  • 多用同步工具少用 wait() 和 notify()。首先,CountDownLatch, CyclicBarrier, Semaphore 和 Exchanger 這些同步類簡化了編碼操作,而用 wait() 和 notify() 很難實現複雜控制流;其次,這些同步類是由最好的企業編寫和維護,在後續的 JDK 中還會不斷優化和完善。
  • 使用 BlockingQueue 實現生產者消費者問題。
  • 多用並行集合少用同步集合,例如應該使用 ConcurrentHashMap 而不是 Hashtable。
  • 使用本地變數和不可變類來保證執行緒安全。
  • 使用執行緒池而不是直接建立執行緒,這是因為建立執行緒代價很高,執行緒池可以有效地利用有限的執行緒來啟動任務。

本文發於:https://antoniopeng.com


IT145.com E-mail:sddin#qq.com