Google Code Prettify

顯示具有 多執行緒 標籤的文章。 顯示所有文章
顯示具有 多執行緒 標籤的文章。 顯示所有文章

2015年8月16日 星期日

java.util.concurrent - ExecutorService

Java 1.x 時 Java 可以說只是個寫 applet 的語言,Java 1.2 之後成為寫 web 後端程式最重要的語言,Java 1.4 引入 NIO 讓讀寫更有效率,Java 5 最大的改變應該就是增加 java.util.concurrent 這個 package,對平行處理 (多執行緒) 提供完整的支援,這個 package 的內容一直到 Java 8 都持續增加,使其更加完善。這一篇要介紹的是 ExecutorService,類別圖如下: (如往例,這不是完整的類別圖,只是這篇會說明到的部份。)
ExecutorService 是個介面,繼承了 Executor 介面,Executor 介面只定義了一個稱為 execute() 的 method,實作 Runnable 的類別物件,可以將本身委派給實作 ExecutorService 的類別物件產生另一個執行緒,進行平行運算。
接下來看看範例程式;
 1 package idv.steven.concurrency;
 2 
 3 import java.util.concurrent.ExecutorService;
 4 import java.util.concurrent.Executors;
 5 
 6 public class ExecutorDemo {
 7 
 8     public static void main(String[] args) {
 9         ExecutorService executorService = Executors.newSingleThreadExecutor();
10 
11         executorService.execute(new Runnable() {
12             public void run() {
13                 System.out.println("Asynchronous task");
14             }
15         });
16 
17         executorService.shutdown();
18     }
19 }
程式說明:
  • Executors 是一個工廠類別,定義了許多 static 的 method,用來產生與 java.util.cuncurrent 套件中相關的類別,這裡 (line 9) 用來產生一個 ExecutorService 的物件。
  • 在第 11~15 行中,有一個匿名的 Runnable 物件,當然,如果程式很大,就不要用匿名的寫法,這裡因為只是一行 output,用匿名比較簡單。這個物件傳入 executorService 物件中會產生一個新的 thread。
  • executorService 執行完要自行關閉,即第 17 行呼叫 shutdown(),呼叫 shutdown() 後,就不能再傳別的 Runnable 給這個 executorService 物件了,否則會產生 exception。 
  • 第 9 行也可以改寫成 ExecutorService executorService = Executors.newFixedThreadPool(10); 一般來說,會使用 thread pool 的狀況是,程式會有很多 thread,且這些 thread 的執行時間很短,在這種狀況下要系統一直建立新的 thread 顯然會花很多時間,所以使用 thread pool。另外,任何一個系統也不能無限制的建立 thread,否則各個 thread 間頻繁的 context switch 反而會拖垮系統效能,所以會給它一個最大值。
  • ExecutorService 中的 shutdown() 和 shutdownNow() 這兩個 method 有什麼差別? shutdown() 會執行完已委派的 Runnable 物件後才將 ExecutorService 關閉,shutdownNow 則會立刻關閉,尚未被執行的 Runnable 物件則以 List<Runnable> 傳回。




上面的程式我們要來改寫一下,注意看類別圖,ExecutorService 中有兩個 submit() method,一個傳入的參數是  Runnable 物件,另一個傳入的是 Callable 物件,使用 submit 也可以產生一個新的執行緒,先看一下傳入 Runnable 物件的程式。
 1 package idv.steven.concurrency;
 2 
 3 import java.util.concurrent.ExecutorService;
 4 import java.util.concurrent.Executors;
 5 import java.util.concurrent.Future;
 6 
 7 public class ExecutorDemo {
 8 
 9     public static void main(String[] args) {
10         ExecutorService executorService = Executors.newSingleThreadExecutor();
11 
12         Future future = executorService.submit(new Runnable() {
13             public void run() {
14                 System.out.println("Asynchronous task");
15             }
16         });
17 
18         executorService.shutdown();
19     }
20 }
修改的部份是第 12~ 16 行,得到的結果與第一個程式是完全一樣的! 再來看一下傳入 Callable 物件的程式。
 1 package idv.steven.concurrency;
 2 
 3 import java.util.concurrent.Callable;
 4 import java.util.concurrent.ExecutionException;
 5 import java.util.concurrent.ExecutorService;
 6 import java.util.concurrent.Executors;
 7 import java.util.concurrent.Future;
 8 
 9 public class ExecutorDemo {
10 
11     public static void main(String[] args) throws InterruptedException, ExecutionException {
12         ExecutorService executorService = Executors.newSingleThreadExecutor();
13 
14         Future future = executorService.submit(new Callable(){
15             public Object call() throws Exception {
16                 System.out.println("Asynchronous Callable");
17                 return "Callable Result";
18             }
19         });
20 
21         System.out.println("future.get() = " + future.get());
22 
23         executorService.shutdown();
24     }
25 }
傳入 Runnable 物件和傳入 Callable 物件最大的差別就在於,傳入 Callable 物件可以有傳回值,所以當 21 行呼叫 future.get() 時,可以取得 call() 的傳回值 (line 17),如果是傳入 Runnable 物件,呼叫 get() method 會傳回 null。

2015年8月13日 星期四

java.util.concurrent.locks - 臨界區間的讀寫

java 在推出時,提供 synchronized、notify、wait 的簡單方式,讓多執行緒程式可以控制臨界區間的存取,到了 Java 5 之後加入  java.util.concurrent 這個 package,又提供了一些 interface (介面) 及 class (類別),這些新的 interface、class 比之前的方法更有彈性、更有效率,這裡簡要的說明 java.util.concurrent.locks 下的主要類別與介面。下圖的 class diagram (類別圖) 僅是主要的 interface、class 及其包含的主要 method,要了解全貌,請參考 Java Documentation

  • ReadWriteLock

ReadWriteLock lock = new ReentrantReadWriteLock();
//...
lock.writeLock().lock(); //取得寫入鎖定
try {
    ...
}
finally {
    lock.writeLock().unlock(); //解除寫入鎖定
}
上面的程式稱不上是一個範例,主要說明使用 ReadWriteLock 的優缺點及注意事項:
  1. 跟 synchronized 比起來,解除了只能針一個 method 或一個 block 設定臨界區間的限制,但是要注意一定要記得 unlock,否則會防礙到其他執行緒進入臨界區間,所以上面的程式,將 unlock() 放在 finally 以確保其最後一定會被執行。
  2. 請回頭看一下類別圖中的 ReadWriteLock,它提供的是有 read lock 和 write lock,這可以改善 synchronized 的效率,synchronized 只要有一個 thread 搶到了 lock,就得等它執行完臨界區間內所有程式,並離開後,其它等著進入臨界區間的 thread 才能進入。現在分出了 read lock 及 write lock,當取得 write lock 的 thread 進入了臨界區間,其它 thread 也只能等待,但是,如果是取得 read lock 的 thread 進入臨界區間,是可以有多個 read lock thread 同時進入,這就改善了效率,又不會讓資料不一致。 
  • Lock
ReentrantLock lock = new ReentrantLock();

try {
    if (lock.tryLock()) {
        //臨界區間 - do something
    }
}
finally {
    if (lock.isHeldByCurrentThread()) {
        lock.unlock();
    }
}
interface Lock 只有一個實作類別 ReetrantLock,這裡要特別注意 tryLock() 這個 method,它相當有趣,因為當遇到臨界區間時,使用這個 method,如果取得 lock 就傳 true,沒有取得就傳回 false,程式可以依傳回值決定做什麼事,以免有 thread 呆呆的一直等著進臨界區間而浪費時間。
要記得,unlock 前也要先用 isHeldByCurrentThread() 這個 method 判斷一下目前是否有取得 lock,有的話才 unlock。
  • Condition
 前面提到的 lock 是可用來取代 synchronized 的新解法,Condition 的 signal、await 則是與 notify、wait 相當。使用的方法可以參考官網 Condition 的範例如下: 
 1  class BoundedBuffer {
 2    final Lock lock = new ReentrantLock();
 3    final Condition notFull  = lock.newCondition(); 
 4    final Condition notEmpty = lock.newCondition(); 
 5 
 6    final Object[] items = new Object[100];
 7    int putptr, takeptr, count;
 8 
 9    public void put(Object x) throws InterruptedException {
10      lock.lock();
11      try {
12        while (count == items.length)
13          notFull.await();
14        items[putptr] = x;
15        if (++putptr == items.length) putptr = 0;
16        ++count;
17        notEmpty.signal();
18      } finally {
19        lock.unlock();
20      }
21    }
22 
23    public Object take() throws InterruptedException {
24      lock.lock();
25      try {
26        while (count == 0)
27          notEmpty.await();
28        Object x = items[takeptr];
29        if (++takeptr == items.length) takeptr = 0;
30        --count;
31        notFull.signal();
32        return x;
33      } finally {
34        lock.unlock();
35      }
36    }
37  }
這是一個很簡單的"生產者"、"消費者"的例子,當陣列中沒有任何物件時,消費者 (take) 就需等待 (line 27),當陣列滿了時,生產者 (put) 也需等待 (line 13); 如果有新的物件產生生產者會通知消費者 (line 17),如果陣列還有空間,消費者也會通知生產者 (line 31)。
特別注意一下 2~4 行,Condition 是由 lock 的 newCondition() 產生,還有,就如 notify、wait 必需位於 synchronized 區間內一樣,Condition 的 await、signal 也要位於 lock、unlock 之間。




  • StampedLock
前面提到的 ReadWriteLock 可允許多個 readLock 的執行緒同時進入臨界區間,但只允許一個 writeLock 的執行緒進入臨界區間,且當有 writeLock 的執行緒位於臨界區間內,即不允許其它執行緒取得 readLock、writeLock,這會有個問題,當程式有很多讀取的執行緒,只有很少的寫入執行緒,臨界區間大部份時間被取得 readLock  執行緒佔據,寫入的執行緒會很難取得 writeLock 而長期處於等待狀態。為了解決這個問題,Java 8 提供了 StampedLock 這個新類別。
 1 public class BankAccountStampedLock {
 2   private final StampedLock sl = new StampedLock();
 3   private long balance;
 4 
 5   public BankAccountStampedLock(long balance) {
 6     this.balance = balance;
 7   }
 8 
 9   public void deposit(long amount) {
10     long stamp = sl.writeLock();
11     try {
12       balance += amount;
13     } finally {
14       sl.unlockWrite(stamp);
15     }
16   }
17 
18   public void withdraw(long amount) {
19     long stamp = sl.writeLock();
20     try {
21       balance -= amount;
22     } finally {
23       sl.unlockWrite(stamp);
24     }
25   }
26 
27   public long getBalance() {
28     long stamp = sl.readLock();
29     try {
30       return balance;
31     } finally {
32       sl.unlockRead(stamp);
33     }
34   }
35 
36   public long getBalanceOptimisticRead() {
37     long stamp = sl.tryOptimisticRead();
38     long balance = this.balance;
39     if (!sl.validate(stamp)) {
40       stamp = sl.readLock();
41       try {
42         balance = this.balance;
43       } finally {
44         sl.unlockRead(stamp);
45       }
46     }
47     return balance;
48   }
49 }
這是一個存、提款的範例程式,來源是 javaspecialists 網站。這個程式和完全只使用 ReadWriteLock 最大差別在於 getBalanceOptimisticRead(),第 37 行先呼叫 tryOptimisticRead 取得一個樂觀讀取鎖定,第 38 行取得存款餘額,第 39 行判斷看看是否在取得樂觀讀取鎖定後,有臨界區間有取得寫入鎖定的執行緒進入,沒有的話,直接回傳存款餘額 (line 47),萬一有的話,balance 的值有可能已經被改變,所以就要謹慎的取得讀取鎖定 (line 40),再取得真正最新的值後再解鎖 (line 44)。