Google Code Prettify

顯示具有 thread 標籤的文章。 顯示所有文章
顯示具有 thread 標籤的文章。 顯示所有文章

2015年8月15日 星期六

java.util.concurrent - Future & Callable

在 Java 5 之前,寫多執行緒程式,唯一的辦法就是使用 Thread 類別及 Runnable 介面,Java 5 之後新增加 java.util.concurrent 這個 package,提供了豐富的類別、介面,在這之後 Java 的多執行緒程式設計才真的趨於完整,這篇先介紹 Callable 和 Future,類別圖如下。

在看程式之前,先對上圖說明如下:
  • Callable 介面只定義了一個 call() method,實作這個介面的好處是當執行緒執行結束後,可以傳回值,且值的型別可以由我們自定。 
  • FutureTask 並非 Future 唯一實作的類別,之後還會介紹別的實作 Future 介面的類別,在這篇我先介紹 FutureTask。
接下來看範例程式,這個程式會算出費式數列,傳入的參數是指出程式要列出幾個值,例如傳入 10,則列出 0 1 1 2 3 5 8 13 21 34 共 10 個值。
 1 package idv.steven.concurrency;
 2 
 3 import java.util.ArrayList;
 4 import java.util.List;
 5 import java.util.concurrent.Callable;
 6 import java.util.concurrent.ExecutionException;
 7 import java.util.concurrent.FutureTask;
 8 
 9 public class FibonacciDemo implements Callable<List<Long>> {
10     private long number;
11     
12     public FibonacciDemo(long number) {
13         this.number = number;
14     }
15 
16     @Override
17     public List<Long> call() throws Exception {
18         List<Long> fib = new ArrayList<Long>();
19         fib.add(0L);
20         fib.add(1L);
21         for(int i=2; i<number; i++) {
22             Long f = fib.get(fib.size()-1) + fib.get(fib.size()-2);
23             fib.add(f);
24         }
25         
26         return fib;
27     }
28 
29     public static void main(String[] args) {
30         Callable<List<Long>> fibonacci = new FibonacciDemo(10);
31         FutureTask<List<Long>> fibonacciTask = new FutureTask<List<Long>>(fibonacci);
32         
33         Thread t = new Thread(fibonacciTask);
34         t.start();
35         
36         try {
37             t.sleep(10);
38             
39             boolean canceled = fibonacciTask.cancel(false);
40             System.out.println("canceled = " + canceled);
41             
42             if (!fibonacciTask.isCancelled()) {
43                 List<Long> fib = fibonacciTask.get();
44                 for(Long f:fib) {
45                     System.out.print(f + " ");
46                 }
47             }
48             
49 //            if (fibonacciTask.isDone()) {
50 //                List<Long> fib = fibonacciTask.get();
51 //                for(Long f:fib) {
52 //                    System.out.print(f + " ");
53 //                }
54 //            }
55 //            else {
56 //                System.out.println("unfinished");
57 //            }
58         }
59         catch (InterruptedException | ExecutionException  e) {
60             e.printStackTrace();
61         }
62     }
63 }




程式說明如下:

  • FutureTask 有兩個建構式,一個接受實作 Callable 的類別,另一個接受實作 Runnable 的類別,這個程式實作了 Callable (line 9),並指定傳回的值為 List<Long>,也就是所有計算所得的數字。大多數的人會選擇實作 Callable,因為實作 Runnable 的話,還要傳入一個變數,用來儲存傳回值。
  • FutureTask 類別是一個實作 Runnable 介面的類別,要建立執行緒,仍需透過 Thread 類別,所以可以看到第 31 行傳入我們實作 Callable 的類別物件給 FutureTask 後,為了建立一個執行緒,第 33 行再將 FutureTask 類別的物件傳給 Thread,然後在第 34 行啟動一個新的執行緒。
  • 第 17~27 行實作 call() method,計算費式數列後,傳回給主程式。
  • 第 39~47 行及第 49~57 行是不同兩個版本,執行出來的結果是一樣的。
  • 要如何取得 call() method 傳回的值呢? 使用 FutureTask 的 get() method ! 就算是實作 Runnable 介面,也是透過 get() 取得傳回值。(line 43、50)
  • 先說明第一個版本,第 39 行是什麼意思呢? cancel() method 是試著強制中斷 FutureTask 的執行緒! 傳入的參數 true 表示,不管這個執行緒處於什麼狀態,都將它中斷結束,如果傳入的是 false,則是當執行緒已經進入 call() method 且還沒執行完離開 call() method 則不要中斷,否則都中斷。至於傳回值即是 true 表示程式沒有執行完就被中斷,false 則是有執行完。這也是為什麼會有第 37 行睡了 10 毫秒的原因,如果主程式不略微停頓一下,讓 FutureTask 的那個執行緒計算一下費式數列,第 39 行有可能傳回 true,也就是程式被中斷了!
  • 第 42 行判斷是否有被中斷,如果沒有,就是有順利的計算完傳回值,所以第 43 行呼叫 get() 取得傳回值,接下來的 for 迴圈當然就是印出結果。
  • 再來說明第二個版本,同樣的要保留第 37 行,讓費式數列有時間被計算,第 49 行和第 42 行剛好相反,它會判斷執行緒是否有被執行完? 有的話就傳回 true,沒有就傳回 false。傳回 true 的話,第 50 行取得計算結果然後印出來。
Callable 這種會傳回值的架構,在有一個計算很耗時,還蠻好用的,可以先產生一個 Thread 去計算這個值,然後做點別的事,之後再回來取得計算結果。當然,即使用只用 Thread、Runnable 也可以做到同樣的功能,只是會比較麻煩,現在 Java 直接提供相關的 API。

2015年8月13日 星期四

java.util.concurrent.locks - 臨界區間的讀寫

java 在推出時,提供 synchronized、notify、wait 的簡單方式,讓多執行緒程式可以控制臨界區間的存取,到了 Java 5 之後加入  java.util.concurrent 這個 package,又提供了一些 interface (介面) 及 class (類別),這些新的 interface、class 比之前的方法更有彈性、更有效率,這裡簡要的說明 java.util.concurrent.locks 下的主要類別與介面。下圖的 class diagram (類別圖) 僅是主要的 interface、class 及其包含的主要 method,要了解全貌,請參考 Java Documentation

  • ReadWriteLock

ReadWriteLock lock = new ReentrantReadWriteLock();
//...
lock.writeLock().lock(); //取得寫入鎖定
try {
    ...
}
finally {
    lock.writeLock().unlock(); //解除寫入鎖定
}
上面的程式稱不上是一個範例,主要說明使用 ReadWriteLock 的優缺點及注意事項:
  1. 跟 synchronized 比起來,解除了只能針一個 method 或一個 block 設定臨界區間的限制,但是要注意一定要記得 unlock,否則會防礙到其他執行緒進入臨界區間,所以上面的程式,將 unlock() 放在 finally 以確保其最後一定會被執行。
  2. 請回頭看一下類別圖中的 ReadWriteLock,它提供的是有 read lock 和 write lock,這可以改善 synchronized 的效率,synchronized 只要有一個 thread 搶到了 lock,就得等它執行完臨界區間內所有程式,並離開後,其它等著進入臨界區間的 thread 才能進入。現在分出了 read lock 及 write lock,當取得 write lock 的 thread 進入了臨界區間,其它 thread 也只能等待,但是,如果是取得 read lock 的 thread 進入臨界區間,是可以有多個 read lock thread 同時進入,這就改善了效率,又不會讓資料不一致。 
  • Lock
ReentrantLock lock = new ReentrantLock();

try {
    if (lock.tryLock()) {
        //臨界區間 - do something
    }
}
finally {
    if (lock.isHeldByCurrentThread()) {
        lock.unlock();
    }
}
interface Lock 只有一個實作類別 ReetrantLock,這裡要特別注意 tryLock() 這個 method,它相當有趣,因為當遇到臨界區間時,使用這個 method,如果取得 lock 就傳 true,沒有取得就傳回 false,程式可以依傳回值決定做什麼事,以免有 thread 呆呆的一直等著進臨界區間而浪費時間。
要記得,unlock 前也要先用 isHeldByCurrentThread() 這個 method 判斷一下目前是否有取得 lock,有的話才 unlock。
  • Condition
 前面提到的 lock 是可用來取代 synchronized 的新解法,Condition 的 signal、await 則是與 notify、wait 相當。使用的方法可以參考官網 Condition 的範例如下: 
 1  class BoundedBuffer {
 2    final Lock lock = new ReentrantLock();
 3    final Condition notFull  = lock.newCondition(); 
 4    final Condition notEmpty = lock.newCondition(); 
 5 
 6    final Object[] items = new Object[100];
 7    int putptr, takeptr, count;
 8 
 9    public void put(Object x) throws InterruptedException {
10      lock.lock();
11      try {
12        while (count == items.length)
13          notFull.await();
14        items[putptr] = x;
15        if (++putptr == items.length) putptr = 0;
16        ++count;
17        notEmpty.signal();
18      } finally {
19        lock.unlock();
20      }
21    }
22 
23    public Object take() throws InterruptedException {
24      lock.lock();
25      try {
26        while (count == 0)
27          notEmpty.await();
28        Object x = items[takeptr];
29        if (++takeptr == items.length) takeptr = 0;
30        --count;
31        notFull.signal();
32        return x;
33      } finally {
34        lock.unlock();
35      }
36    }
37  }
這是一個很簡單的"生產者"、"消費者"的例子,當陣列中沒有任何物件時,消費者 (take) 就需等待 (line 27),當陣列滿了時,生產者 (put) 也需等待 (line 13); 如果有新的物件產生生產者會通知消費者 (line 17),如果陣列還有空間,消費者也會通知生產者 (line 31)。
特別注意一下 2~4 行,Condition 是由 lock 的 newCondition() 產生,還有,就如 notify、wait 必需位於 synchronized 區間內一樣,Condition 的 await、signal 也要位於 lock、unlock 之間。




  • StampedLock
前面提到的 ReadWriteLock 可允許多個 readLock 的執行緒同時進入臨界區間,但只允許一個 writeLock 的執行緒進入臨界區間,且當有 writeLock 的執行緒位於臨界區間內,即不允許其它執行緒取得 readLock、writeLock,這會有個問題,當程式有很多讀取的執行緒,只有很少的寫入執行緒,臨界區間大部份時間被取得 readLock  執行緒佔據,寫入的執行緒會很難取得 writeLock 而長期處於等待狀態。為了解決這個問題,Java 8 提供了 StampedLock 這個新類別。
 1 public class BankAccountStampedLock {
 2   private final StampedLock sl = new StampedLock();
 3   private long balance;
 4 
 5   public BankAccountStampedLock(long balance) {
 6     this.balance = balance;
 7   }
 8 
 9   public void deposit(long amount) {
10     long stamp = sl.writeLock();
11     try {
12       balance += amount;
13     } finally {
14       sl.unlockWrite(stamp);
15     }
16   }
17 
18   public void withdraw(long amount) {
19     long stamp = sl.writeLock();
20     try {
21       balance -= amount;
22     } finally {
23       sl.unlockWrite(stamp);
24     }
25   }
26 
27   public long getBalance() {
28     long stamp = sl.readLock();
29     try {
30       return balance;
31     } finally {
32       sl.unlockRead(stamp);
33     }
34   }
35 
36   public long getBalanceOptimisticRead() {
37     long stamp = sl.tryOptimisticRead();
38     long balance = this.balance;
39     if (!sl.validate(stamp)) {
40       stamp = sl.readLock();
41       try {
42         balance = this.balance;
43       } finally {
44         sl.unlockRead(stamp);
45       }
46     }
47     return balance;
48   }
49 }
這是一個存、提款的範例程式,來源是 javaspecialists 網站。這個程式和完全只使用 ReadWriteLock 最大差別在於 getBalanceOptimisticRead(),第 37 行先呼叫 tryOptimisticRead 取得一個樂觀讀取鎖定,第 38 行取得存款餘額,第 39 行判斷看看是否在取得樂觀讀取鎖定後,有臨界區間有取得寫入鎖定的執行緒進入,沒有的話,直接回傳存款餘額 (line 47),萬一有的話,balance 的值有可能已經被改變,所以就要謹慎的取得讀取鎖定 (line 40),再取得真正最新的值後再解鎖 (line 44)。