Google Code Prettify

顯示具有 multi-thread 標籤的文章。 顯示所有文章
顯示具有 multi-thread 標籤的文章。 顯示所有文章

2015年10月17日 星期六

Producer-Consumer Pattern



實務上,曾經遇過如上的例子,說明如下:

  1. 資料源每天在規定的時間(約數小時)快速的產生資料,寫到檔案裡,在該時段裡,約會產生近兩億筆的資料,每筆資料中會有不重複的序號。
  2. 有兩支程式會由檔案讀出資料,第一支是讀出資料後,寫入資料庫,第二支程式則是寫到 MQSeries,不管是寫到資料庫或 MQSeries,當然是為了下游的其它系統的需要。(方框中的兩支程式)
這裡要說明的是方框中的那兩支程式,這兩支程式的架構肯定會很像,因為都是讀檔後輸出,差別只在於輸出的地方不同。實務上看到的程式架構是,主程式只有一個類別,在該類別中讀取資料後輸出,再讀取下一筆,再輸出,也就是循序的,如下圖:


從 I/O 的效率看,讀檔案的速度遠遠大於寫入資料庫的速度,就算讀與寫沒有分開成兩個執行緒,影響應該是很有限。回到第一個圖,這整套系統有兩支這樣的程式,另一支是寫到 MQSeries,所以,又有一支如圖二架構的程式,差別只在輸出的那部份是寫到 MQSeries。這樣的寫法有兩個明顯的問題:
  1. 程式碼重複。
  2. 輸入與輸出的部份相耦合,可重用性(reuse)不高。
要改善這個架構,可以套用 Producer-Consumer Pattern (生產者-消費者 範式),這個 pattern 在多執行緒程式中是很常見的,以第二張圖的架構來說, 輸入的部份會成為 producer,輸出的部份成為 consumer,中間加上一個佇列為中介,即可以將這兩部份解耦,如下圖:


套用 pattern 後的程式,輸入與輸出的程式,要面對的都是佇列,而不需要在意另一端是如何實作,實際上資料源是什麼? 或實際要輸出的儲存體是什麼? 也就是說,輸出的部份要寫兩個類別,一個是輸出到資料庫的,一個是輸出到 MQSeries 的,其它部份都不需要改變,只要依需要,換用適當的輸出類別。這個寫法有以下優點:
  1. 輸入、輸出解耦,可重用性高。
  2. 解耦後,各自的邏輯變簡單,程式的可讀性提高。
  3. 輸入、輸出分開在不同執行緒,效率提高。




現在來看一下程式怎麼寫?

 1 package idv.steven.patterns;
 2 
 3 import java.nio.ByteBuffer;
 4 import java.util.Vector;
 5 import java.util.concurrent.TimeUnit;
 6 import java.util.concurrent.locks.Condition;
 7 import java.util.concurrent.locks.ReentrantLock;
 8 
 9 public class Queue {
10     private Vector<ByteBuffer> pool = new Vector<ByteBuffer>();
11     private Vector<ByteBuffer> queue = new Vector<ByteBuffer>();
12     
13     private ReentrantLock lock = new ReentrantLock();
14     private Condition waitQueue = lock.newCondition();
15     
16     private int size = 0;
17     private int capacity = 0;
18     
19     public Queue(int size, int capacity) {
20         this.size = size;
21         this.capacity = capacity;
22         
23         for(int i=0; i<size; i++) {
24             pool.add(ByteBuffer.allocate(capacity));
25         }
26     }
27     
28     public ByteBuffer alloc() {
29         ByteBuffer buf = null;
30         
31         try {
32             buf = pool.remove(0);
33         }
34         catch (ArrayIndexOutOfBoundsException e) {
35             
36         }
37         
38         return buf;
39     }
40     
41     /**
42      * 取得的 ByteBuffer 使用完一定要記得呼叫此 method,否則資源會越來越少。
43      * @param buf
44      */
45     public void release(ByteBuffer buf) {
46         if (buf.capacity() != capacity) {
47             buf = ByteBuffer.allocate(capacity);
48         }
49         else {
50             buf.clear();
51         }
52         
53         pool.add(buf);
54     }
55     
56     public ByteBuffer take() {
57         ByteBuffer buf = null;
58         
59         try {
60             buf = queue.remove(0);
61         }
62         catch (ArrayIndexOutOfBoundsException e) {
63             
64         }
65         
66         if (buf == null) {
67             try {
68                 lock.lock();
69                 if (waitQueue.await(10, TimeUnit.SECONDS)) {
70                     buf = queue.remove(0);
71                 }
72             } catch (InterruptedException | ArrayIndexOutOfBoundsException e) {
73             }
74             finally {
75                 lock.unlock();
76             }
77         }
78         
79         return buf;
80     }
81     
82     public void put(ByteBuffer buf) {
83         try {
84             lock.lock();
85             queue.add(buf);
86             waitQueue.signal();
87         }
88         finally {
89             lock.unlock();
90         }
91     }
92 }

這個佇列程式在初始化時先產生足夠的 ByteBuffer 備用,這樣效率會比較好,因為這個系統會產生大量的資料,一秒鐘就有超過百筆,如果每次都用 ByteBuffer.allocate(...) 產生,會造成效率上的瓶頸。

在 take() method 中,如果第一次從佇列中取資料,結果佇列沒資料回傳 null,就等 10 秒再取一次。在寫 multi-thread 程式時,盡量不要用 sleep 來等其它 thread,這樣很沒有效率,應該用 await,當其它 thread 完成正等待的事情後 (這裡就是等 producer 產生資料放入佇列後),該 thread 呼叫 signal 就可以繼續往下執行,程式第 86 行就是當有資料放入佇列後,會呼叫 signal。

 1 package idv.steven.patterns;
 2 
 3 import java.nio.ByteBuffer;
 4 
 5 public class Consumer implements Runnable {
 6     private String name = "";
 7     private Queue queue = null;
 8     
 9     public Consumer(String name, Queue queue) {
10         this.name = name;
11         this.queue = queue;
12     }
13 
14     @Override
15     public void run() {
16         ByteBuffer buf = null;
17         
18         int i = 1;
19         while ((buf = queue.take()) != null) {
20             byte[] dst = new byte[buf.limit()];
21             buf.get(dst, 0, buf.limit());
22             String s = new String(dst);
23             System.out.println(name + "(" + (i++) + "):" + s);
24             queue.release(buf);
25         }
26     }
27 }

Consumer (消費者) 程式收到資料將它輸出到 console,真正的程式當然要輸出到資料庫或 MQSeries,那會複雜的多,這裡為了說明簡化了。

1 package idv.steven.patterns;
 2 
 3 import java.nio.ByteBuffer;
 4 import java.util.Random;
 5 
 6 public class Producer implements Runnable {
 7     private Queue queue = null;
 8     
 9     @Override
10     public void run() {
11         queue = new Queue(100, 1024);
12         
13         Thread tC1 = createConsumer("c1");
14         Thread tC2 = createConsumer("c2");
15         Thread tC3 = createConsumer("c3");
16         
17         Random rand = new Random();
18         
19         for(int i=0; i<10; i++) {
20             ByteBuffer buf = queue.alloc();
21             if (buf != null) {
22                 buf.put(String.format("%04d", rand.nextInt(10000)).getBytes());
23                 buf.flip();
24                 queue.put(buf);
25             }
26         }
27         
28         try {
29             tC1.join();
30             tC2.join();
31             tC3.join();
32         } catch (InterruptedException e) {
33         }
34     }
35     
36     private Thread createConsumer(String name) {
37         Consumer c = new Consumer(name, queue);
38         Thread tConsumer = new Thread(c);
39         tConsumer.start();
40         
41         return tConsumer;
42     }
43 
44     public static void main(String[] args) {
45         Producer producer = new Producer();
46         Thread tProducer = new Thread(producer);
47         tProducer.start();
48     }
49 }

這個程式是一個 Producer (生產者) 有三個 Consumer (消費者),實際上的應用會依需要而定,所以,Producer-Consumer pattern 會有很多的變形。這裡為了說明方便,由 producer 用亂數產生十個資料,放入佇列中,由 consumer 取出並輸出到 console,佇列中的資料會由那一個 consumer 取得,會依系統的排程決定。輸出的結果如下,當然每次執行會有點差異。
c1(1):2368
c3(1):0728
c2(1):5240
c3(2):3292
c1(2):7614
c3(3):7558
c2(2):8970
c3(4):2345
c1(3):1866
c2(3):5144
佇列也可以更複雜,不同的資料還會有優先權,或是其它各項規則,也可能佇列改成堆壘,這都可視需要而變更。

2015年8月13日 星期四

java.util.concurrent.locks - 臨界區間的讀寫

java 在推出時,提供 synchronized、notify、wait 的簡單方式,讓多執行緒程式可以控制臨界區間的存取,到了 Java 5 之後加入  java.util.concurrent 這個 package,又提供了一些 interface (介面) 及 class (類別),這些新的 interface、class 比之前的方法更有彈性、更有效率,這裡簡要的說明 java.util.concurrent.locks 下的主要類別與介面。下圖的 class diagram (類別圖) 僅是主要的 interface、class 及其包含的主要 method,要了解全貌,請參考 Java Documentation

  • ReadWriteLock

ReadWriteLock lock = new ReentrantReadWriteLock();
//...
lock.writeLock().lock(); //取得寫入鎖定
try {
    ...
}
finally {
    lock.writeLock().unlock(); //解除寫入鎖定
}
上面的程式稱不上是一個範例,主要說明使用 ReadWriteLock 的優缺點及注意事項:
  1. 跟 synchronized 比起來,解除了只能針一個 method 或一個 block 設定臨界區間的限制,但是要注意一定要記得 unlock,否則會防礙到其他執行緒進入臨界區間,所以上面的程式,將 unlock() 放在 finally 以確保其最後一定會被執行。
  2. 請回頭看一下類別圖中的 ReadWriteLock,它提供的是有 read lock 和 write lock,這可以改善 synchronized 的效率,synchronized 只要有一個 thread 搶到了 lock,就得等它執行完臨界區間內所有程式,並離開後,其它等著進入臨界區間的 thread 才能進入。現在分出了 read lock 及 write lock,當取得 write lock 的 thread 進入了臨界區間,其它 thread 也只能等待,但是,如果是取得 read lock 的 thread 進入臨界區間,是可以有多個 read lock thread 同時進入,這就改善了效率,又不會讓資料不一致。 
  • Lock
ReentrantLock lock = new ReentrantLock();

try {
    if (lock.tryLock()) {
        //臨界區間 - do something
    }
}
finally {
    if (lock.isHeldByCurrentThread()) {
        lock.unlock();
    }
}
interface Lock 只有一個實作類別 ReetrantLock,這裡要特別注意 tryLock() 這個 method,它相當有趣,因為當遇到臨界區間時,使用這個 method,如果取得 lock 就傳 true,沒有取得就傳回 false,程式可以依傳回值決定做什麼事,以免有 thread 呆呆的一直等著進臨界區間而浪費時間。
要記得,unlock 前也要先用 isHeldByCurrentThread() 這個 method 判斷一下目前是否有取得 lock,有的話才 unlock。
  • Condition
 前面提到的 lock 是可用來取代 synchronized 的新解法,Condition 的 signal、await 則是與 notify、wait 相當。使用的方法可以參考官網 Condition 的範例如下: 
 1  class BoundedBuffer {
 2    final Lock lock = new ReentrantLock();
 3    final Condition notFull  = lock.newCondition(); 
 4    final Condition notEmpty = lock.newCondition(); 
 5 
 6    final Object[] items = new Object[100];
 7    int putptr, takeptr, count;
 8 
 9    public void put(Object x) throws InterruptedException {
10      lock.lock();
11      try {
12        while (count == items.length)
13          notFull.await();
14        items[putptr] = x;
15        if (++putptr == items.length) putptr = 0;
16        ++count;
17        notEmpty.signal();
18      } finally {
19        lock.unlock();
20      }
21    }
22 
23    public Object take() throws InterruptedException {
24      lock.lock();
25      try {
26        while (count == 0)
27          notEmpty.await();
28        Object x = items[takeptr];
29        if (++takeptr == items.length) takeptr = 0;
30        --count;
31        notFull.signal();
32        return x;
33      } finally {
34        lock.unlock();
35      }
36    }
37  }
這是一個很簡單的"生產者"、"消費者"的例子,當陣列中沒有任何物件時,消費者 (take) 就需等待 (line 27),當陣列滿了時,生產者 (put) 也需等待 (line 13); 如果有新的物件產生生產者會通知消費者 (line 17),如果陣列還有空間,消費者也會通知生產者 (line 31)。
特別注意一下 2~4 行,Condition 是由 lock 的 newCondition() 產生,還有,就如 notify、wait 必需位於 synchronized 區間內一樣,Condition 的 await、signal 也要位於 lock、unlock 之間。




  • StampedLock
前面提到的 ReadWriteLock 可允許多個 readLock 的執行緒同時進入臨界區間,但只允許一個 writeLock 的執行緒進入臨界區間,且當有 writeLock 的執行緒位於臨界區間內,即不允許其它執行緒取得 readLock、writeLock,這會有個問題,當程式有很多讀取的執行緒,只有很少的寫入執行緒,臨界區間大部份時間被取得 readLock  執行緒佔據,寫入的執行緒會很難取得 writeLock 而長期處於等待狀態。為了解決這個問題,Java 8 提供了 StampedLock 這個新類別。
 1 public class BankAccountStampedLock {
 2   private final StampedLock sl = new StampedLock();
 3   private long balance;
 4 
 5   public BankAccountStampedLock(long balance) {
 6     this.balance = balance;
 7   }
 8 
 9   public void deposit(long amount) {
10     long stamp = sl.writeLock();
11     try {
12       balance += amount;
13     } finally {
14       sl.unlockWrite(stamp);
15     }
16   }
17 
18   public void withdraw(long amount) {
19     long stamp = sl.writeLock();
20     try {
21       balance -= amount;
22     } finally {
23       sl.unlockWrite(stamp);
24     }
25   }
26 
27   public long getBalance() {
28     long stamp = sl.readLock();
29     try {
30       return balance;
31     } finally {
32       sl.unlockRead(stamp);
33     }
34   }
35 
36   public long getBalanceOptimisticRead() {
37     long stamp = sl.tryOptimisticRead();
38     long balance = this.balance;
39     if (!sl.validate(stamp)) {
40       stamp = sl.readLock();
41       try {
42         balance = this.balance;
43       } finally {
44         sl.unlockRead(stamp);
45       }
46     }
47     return balance;
48   }
49 }
這是一個存、提款的範例程式,來源是 javaspecialists 網站。這個程式和完全只使用 ReadWriteLock 最大差別在於 getBalanceOptimisticRead(),第 37 行先呼叫 tryOptimisticRead 取得一個樂觀讀取鎖定,第 38 行取得存款餘額,第 39 行判斷看看是否在取得樂觀讀取鎖定後,有臨界區間有取得寫入鎖定的執行緒進入,沒有的話,直接回傳存款餘額 (line 47),萬一有的話,balance 的值有可能已經被改變,所以就要謹慎的取得讀取鎖定 (line 40),再取得真正最新的值後再解鎖 (line 44)。