Google Code Prettify

顯示具有 平行處理 標籤的文章。 顯示所有文章
顯示具有 平行處理 標籤的文章。 顯示所有文章

2015年8月16日 星期日

java.util.concurrent - ExecutorService

Java 1.x 時 Java 可以說只是個寫 applet 的語言,Java 1.2 之後成為寫 web 後端程式最重要的語言,Java 1.4 引入 NIO 讓讀寫更有效率,Java 5 最大的改變應該就是增加 java.util.concurrent 這個 package,對平行處理 (多執行緒) 提供完整的支援,這個 package 的內容一直到 Java 8 都持續增加,使其更加完善。這一篇要介紹的是 ExecutorService,類別圖如下: (如往例,這不是完整的類別圖,只是這篇會說明到的部份。)
ExecutorService 是個介面,繼承了 Executor 介面,Executor 介面只定義了一個稱為 execute() 的 method,實作 Runnable 的類別物件,可以將本身委派給實作 ExecutorService 的類別物件產生另一個執行緒,進行平行運算。
接下來看看範例程式;
 1 package idv.steven.concurrency;
 2 
 3 import java.util.concurrent.ExecutorService;
 4 import java.util.concurrent.Executors;
 5 
 6 public class ExecutorDemo {
 7 
 8     public static void main(String[] args) {
 9         ExecutorService executorService = Executors.newSingleThreadExecutor();
10 
11         executorService.execute(new Runnable() {
12             public void run() {
13                 System.out.println("Asynchronous task");
14             }
15         });
16 
17         executorService.shutdown();
18     }
19 }
程式說明:
  • Executors 是一個工廠類別,定義了許多 static 的 method,用來產生與 java.util.cuncurrent 套件中相關的類別,這裡 (line 9) 用來產生一個 ExecutorService 的物件。
  • 在第 11~15 行中,有一個匿名的 Runnable 物件,當然,如果程式很大,就不要用匿名的寫法,這裡因為只是一行 output,用匿名比較簡單。這個物件傳入 executorService 物件中會產生一個新的 thread。
  • executorService 執行完要自行關閉,即第 17 行呼叫 shutdown(),呼叫 shutdown() 後,就不能再傳別的 Runnable 給這個 executorService 物件了,否則會產生 exception。 
  • 第 9 行也可以改寫成 ExecutorService executorService = Executors.newFixedThreadPool(10); 一般來說,會使用 thread pool 的狀況是,程式會有很多 thread,且這些 thread 的執行時間很短,在這種狀況下要系統一直建立新的 thread 顯然會花很多時間,所以使用 thread pool。另外,任何一個系統也不能無限制的建立 thread,否則各個 thread 間頻繁的 context switch 反而會拖垮系統效能,所以會給它一個最大值。
  • ExecutorService 中的 shutdown() 和 shutdownNow() 這兩個 method 有什麼差別? shutdown() 會執行完已委派的 Runnable 物件後才將 ExecutorService 關閉,shutdownNow 則會立刻關閉,尚未被執行的 Runnable 物件則以 List<Runnable> 傳回。




上面的程式我們要來改寫一下,注意看類別圖,ExecutorService 中有兩個 submit() method,一個傳入的參數是  Runnable 物件,另一個傳入的是 Callable 物件,使用 submit 也可以產生一個新的執行緒,先看一下傳入 Runnable 物件的程式。
 1 package idv.steven.concurrency;
 2 
 3 import java.util.concurrent.ExecutorService;
 4 import java.util.concurrent.Executors;
 5 import java.util.concurrent.Future;
 6 
 7 public class ExecutorDemo {
 8 
 9     public static void main(String[] args) {
10         ExecutorService executorService = Executors.newSingleThreadExecutor();
11 
12         Future future = executorService.submit(new Runnable() {
13             public void run() {
14                 System.out.println("Asynchronous task");
15             }
16         });
17 
18         executorService.shutdown();
19     }
20 }
修改的部份是第 12~ 16 行,得到的結果與第一個程式是完全一樣的! 再來看一下傳入 Callable 物件的程式。
 1 package idv.steven.concurrency;
 2 
 3 import java.util.concurrent.Callable;
 4 import java.util.concurrent.ExecutionException;
 5 import java.util.concurrent.ExecutorService;
 6 import java.util.concurrent.Executors;
 7 import java.util.concurrent.Future;
 8 
 9 public class ExecutorDemo {
10 
11     public static void main(String[] args) throws InterruptedException, ExecutionException {
12         ExecutorService executorService = Executors.newSingleThreadExecutor();
13 
14         Future future = executorService.submit(new Callable(){
15             public Object call() throws Exception {
16                 System.out.println("Asynchronous Callable");
17                 return "Callable Result";
18             }
19         });
20 
21         System.out.println("future.get() = " + future.get());
22 
23         executorService.shutdown();
24     }
25 }
傳入 Runnable 物件和傳入 Callable 物件最大的差別就在於,傳入 Callable 物件可以有傳回值,所以當 21 行呼叫 future.get() 時,可以取得 call() 的傳回值 (line 17),如果是傳入 Runnable 物件,呼叫 get() method 會傳回 null。

2015年8月15日 星期六

java.util.concurrent - Future & Callable

在 Java 5 之前,寫多執行緒程式,唯一的辦法就是使用 Thread 類別及 Runnable 介面,Java 5 之後新增加 java.util.concurrent 這個 package,提供了豐富的類別、介面,在這之後 Java 的多執行緒程式設計才真的趨於完整,這篇先介紹 Callable 和 Future,類別圖如下。

在看程式之前,先對上圖說明如下:
  • Callable 介面只定義了一個 call() method,實作這個介面的好處是當執行緒執行結束後,可以傳回值,且值的型別可以由我們自定。 
  • FutureTask 並非 Future 唯一實作的類別,之後還會介紹別的實作 Future 介面的類別,在這篇我先介紹 FutureTask。
接下來看範例程式,這個程式會算出費式數列,傳入的參數是指出程式要列出幾個值,例如傳入 10,則列出 0 1 1 2 3 5 8 13 21 34 共 10 個值。
 1 package idv.steven.concurrency;
 2 
 3 import java.util.ArrayList;
 4 import java.util.List;
 5 import java.util.concurrent.Callable;
 6 import java.util.concurrent.ExecutionException;
 7 import java.util.concurrent.FutureTask;
 8 
 9 public class FibonacciDemo implements Callable<List<Long>> {
10     private long number;
11     
12     public FibonacciDemo(long number) {
13         this.number = number;
14     }
15 
16     @Override
17     public List<Long> call() throws Exception {
18         List<Long> fib = new ArrayList<Long>();
19         fib.add(0L);
20         fib.add(1L);
21         for(int i=2; i<number; i++) {
22             Long f = fib.get(fib.size()-1) + fib.get(fib.size()-2);
23             fib.add(f);
24         }
25         
26         return fib;
27     }
28 
29     public static void main(String[] args) {
30         Callable<List<Long>> fibonacci = new FibonacciDemo(10);
31         FutureTask<List<Long>> fibonacciTask = new FutureTask<List<Long>>(fibonacci);
32         
33         Thread t = new Thread(fibonacciTask);
34         t.start();
35         
36         try {
37             t.sleep(10);
38             
39             boolean canceled = fibonacciTask.cancel(false);
40             System.out.println("canceled = " + canceled);
41             
42             if (!fibonacciTask.isCancelled()) {
43                 List<Long> fib = fibonacciTask.get();
44                 for(Long f:fib) {
45                     System.out.print(f + " ");
46                 }
47             }
48             
49 //            if (fibonacciTask.isDone()) {
50 //                List<Long> fib = fibonacciTask.get();
51 //                for(Long f:fib) {
52 //                    System.out.print(f + " ");
53 //                }
54 //            }
55 //            else {
56 //                System.out.println("unfinished");
57 //            }
58         }
59         catch (InterruptedException | ExecutionException  e) {
60             e.printStackTrace();
61         }
62     }
63 }




程式說明如下:

  • FutureTask 有兩個建構式,一個接受實作 Callable 的類別,另一個接受實作 Runnable 的類別,這個程式實作了 Callable (line 9),並指定傳回的值為 List<Long>,也就是所有計算所得的數字。大多數的人會選擇實作 Callable,因為實作 Runnable 的話,還要傳入一個變數,用來儲存傳回值。
  • FutureTask 類別是一個實作 Runnable 介面的類別,要建立執行緒,仍需透過 Thread 類別,所以可以看到第 31 行傳入我們實作 Callable 的類別物件給 FutureTask 後,為了建立一個執行緒,第 33 行再將 FutureTask 類別的物件傳給 Thread,然後在第 34 行啟動一個新的執行緒。
  • 第 17~27 行實作 call() method,計算費式數列後,傳回給主程式。
  • 第 39~47 行及第 49~57 行是不同兩個版本,執行出來的結果是一樣的。
  • 要如何取得 call() method 傳回的值呢? 使用 FutureTask 的 get() method ! 就算是實作 Runnable 介面,也是透過 get() 取得傳回值。(line 43、50)
  • 先說明第一個版本,第 39 行是什麼意思呢? cancel() method 是試著強制中斷 FutureTask 的執行緒! 傳入的參數 true 表示,不管這個執行緒處於什麼狀態,都將它中斷結束,如果傳入的是 false,則是當執行緒已經進入 call() method 且還沒執行完離開 call() method 則不要中斷,否則都中斷。至於傳回值即是 true 表示程式沒有執行完就被中斷,false 則是有執行完。這也是為什麼會有第 37 行睡了 10 毫秒的原因,如果主程式不略微停頓一下,讓 FutureTask 的那個執行緒計算一下費式數列,第 39 行有可能傳回 true,也就是程式被中斷了!
  • 第 42 行判斷是否有被中斷,如果沒有,就是有順利的計算完傳回值,所以第 43 行呼叫 get() 取得傳回值,接下來的 for 迴圈當然就是印出結果。
  • 再來說明第二個版本,同樣的要保留第 37 行,讓費式數列有時間被計算,第 49 行和第 42 行剛好相反,它會判斷執行緒是否有被執行完? 有的話就傳回 true,沒有就傳回 false。傳回 true 的話,第 50 行取得計算結果然後印出來。
Callable 這種會傳回值的架構,在有一個計算很耗時,還蠻好用的,可以先產生一個 Thread 去計算這個值,然後做點別的事,之後再回來取得計算結果。當然,即使用只用 Thread、Runnable 也可以做到同樣的功能,只是會比較麻煩,現在 Java 直接提供相關的 API。

2015年6月6日 星期六

fork Function


在「Advanced.Programming.in.the.UNIX.Environment, 3rd.Edition」一書中的8.3節 (p. 230),有個小程式,如下,是用來說明 UNIX 環境中,使用 fork 產生子行程 (child process),要注意的一些事,先看一下程式:

 1 #include <stdio.h>
 2 #include <unistd.h>
 3 
 4 int globvar = 6;
 5 char buf[] = "a write to stdout\n";
 6 
 7 int main(void) {
 8     int var;
 9     pid_t pid;
10 
11     var = 88;
12     if (write(STDOUT_FILENO, buf, sizeof(buf)-1) != sizeof(buf)-1)
13         printf("write error");
14     printf("before fork\n");
15 
16     if ((pid = fork()) < 0) {
17         printf("fork error");
18     }
19     else if (pid == 0) {
20         globvar++;
21         var++;
22     }
23     else {
24         sleep(2);
25     }
26 
27     printf("pid = %ld, getpid = %ld, glob = %d, var = %d\n", (long) pid, (long) getpid(), globvar, var);
28     _exit(0);
29 }

上面的程式很簡單的利用 fork 產生一個子行程,等待 2 秒後,印出一段訊息,顯示 fork 傳回值、行程的 pid (process id)、全域變數及區域變數,執行結果如下: (我的程式命名為 forEx01.c)
[steven@CentOS7 Debug]$ ./forkEx01
a write to stdout
before fork
pid = 0, getpid = 9214, glob = 7, var = 89
pid = 9214, getpid = 9213, glob = 6, var = 88
[steven@CentOS7 Debug]$ 
根據上述的執行結果,說明如下:
  1. fork 函數會產生一個子行程,子行程會執行父行程 (parent process) fork 之後的指令。
  2. fork 函數在父行程會傳回子行程的 pid,在子行程則傳回 0,子行程如果要得到父行程的 pid,可以透過 getppid 函數得到值。
  3. 上面程式讓父行程睡 2 秒後再印出結果,所以第一個結果是子程式印出的,第二個結果是父行程印出的,通常確實是會得到如上的結果 (除了 pid、getpid 的值會不同之外),但是,實際上有時會是父行程先印出,因為那個行程先執行,由 OS 決定,這裡父行程睡 2 秒只是增加子行程先執行的機率。
  4. 上述程式的全域變數 (global variable) 和區域變數 (local variable) 於子行程中都被加 1,但是都沒有影響到父行程的值,這表示不管是全域變數或區域變數,父行程、子行程都不共用!
  5. 子行程的輸出接在父行程 fork 前已輸出的內容之後,父行程後面的輸出又接在子行程的輸出之後,完全不會互相覆蓋,因為父行程和子行程會共享在 fork 之前已開啟的所有檔案描述符 (file descriptions),也就是說兩個行程會共享這些檔案的檔案指標! 標準輸出是在父行程一開始執行時就被開啟的,所以會被兩個行程共享。
除了上述程式所展現的父行程、子行程關係外,另外針對行程的一些基本觀念整理如下:
  1. UNIX 系統在系統啟動後,會啟動許多 process 來處理一些系統面的事,其中一個 pid 為 1 的 process 稱為 init process,是負責啟動和關閉系統。
  2. 程式以 fork 啟動一個或數個子行程,當子行程結束後,父行程要負責將其佔用的資源都回收後,才可真正將其結束,在子行程已經執行完,但是父行程尚未將其資源回收前,這個子行程就稱為 zombie (僵屍)。
  3. 如果父行程比子行程更早結束,子行程並不會被迫結束,而是會交由 init process 管理,也就是說,子行程的 ppid 會被改為 1,之後子行程結束,其資源會由 init process 進行回收。