2015年10月17日 星期六

Producer-Consumer Pattern



實務上,曾經遇過如上的例子,說明如下:

  1. 資料源每天在規定的時間(約數小時)快速的產生資料,寫到檔案裡,在該時段裡,約會產生近兩億筆的資料,每筆資料中會有不重複的序號。
  2. 有兩支程式會由檔案讀出資料,第一支是讀出資料後,寫入資料庫,第二支程式則是寫到 MQSeries,不管是寫到資料庫或 MQSeries,當然是為了下游的其它系統的需要。(方框中的兩支程式)
這裡要說明的是方框中的那兩支程式,這兩支程式的架構肯定會很像,因為都是讀檔後輸出,差別只在於輸出的地方不同。實務上看到的程式架構是,主程式只有一個類別,在該類別中讀取資料後輸出,再讀取下一筆,再輸出,也就是循序的,如下圖:


從 I/O 的效率看,讀檔案的速度遠遠大於寫入資料庫的速度,就算讀與寫沒有分開成兩個執行緒,影響應該是很有限。回到第一個圖,這整套系統有兩支這樣的程式,另一支是寫到 MQSeries,所以,又有一支如圖二架構的程式,差別只在輸出的那部份是寫到 MQSeries。這樣的寫法有兩個明顯的問題:
  1. 程式碼重複。
  2. 輸入與輸出的部份相耦合,可重用性(reuse)不高。
要改善這個架構,可以套用 Producer-Consumer Pattern (生產者-消費者 範式),這個 pattern 在多執行緒程式中是很常見的,以第二張圖的架構來說, 輸入的部份會成為 producer,輸出的部份成為 consumer,中間加上一個佇列為中介,即可以將這兩部份解耦,如下圖:


套用 pattern 後的程式,輸入與輸出的程式,要面對的都是佇列,而不需要在意另一端是如何實作,實際上資料源是什麼? 或實際要輸出的儲存體是什麼? 也就是說,輸出的部份要寫兩個類別,一個是輸出到資料庫的,一個是輸出到 MQSeries 的,其它部份都不需要改變,只要依需要,換用適當的輸出類別。這個寫法有以下優點:
  1. 輸入、輸出解耦,可重用性高。
  2. 解耦後,各自的邏輯變簡單,程式的可讀性提高。
  3. 輸入、輸出分開在不同執行緒,效率提高。




現在來看一下程式怎麼寫?

 1 package idv.steven.patterns;
 2 
 3 import java.nio.ByteBuffer;
 4 import java.util.Vector;
 5 import java.util.concurrent.TimeUnit;
 6 import java.util.concurrent.locks.Condition;
 7 import java.util.concurrent.locks.ReentrantLock;
 8 
 9 public class Queue {
10     private Vector<ByteBuffer> pool = new Vector<ByteBuffer>();
11     private Vector<ByteBuffer> queue = new Vector<ByteBuffer>();
12     
13     private ReentrantLock lock = new ReentrantLock();
14     private Condition waitQueue = lock.newCondition();
15     
16     private int size = 0;
17     private int capacity = 0;
18     
19     public Queue(int size, int capacity) {
20         this.size = size;
21         this.capacity = capacity;
22         
23         for(int i=0; i<size; i++) {
24             pool.add(ByteBuffer.allocate(capacity));
25         }
26     }
27     
28     public ByteBuffer alloc() {
29         ByteBuffer buf = null;
30         
31         try {
32             buf = pool.remove(0);
33         }
34         catch (ArrayIndexOutOfBoundsException e) {
35             
36         }
37         
38         return buf;
39     }
40     
41     /**
42      * 取得的 ByteBuffer 使用完一定要記得呼叫此 method,否則資源會越來越少。
43      * @param buf
44      */
45     public void release(ByteBuffer buf) {
46         if (buf.capacity() != capacity) {
47             buf = ByteBuffer.allocate(capacity);
48         }
49         else {
50             buf.clear();
51         }
52         
53         pool.add(buf);
54     }
55     
56     public ByteBuffer take() {
57         ByteBuffer buf = null;
58         
59         try {
60             buf = queue.remove(0);
61         }
62         catch (ArrayIndexOutOfBoundsException e) {
63             
64         }
65         
66         if (buf == null) {
67             try {
68                 lock.lock();
69                 if (waitQueue.await(10, TimeUnit.SECONDS)) {
70                     buf = queue.remove(0);
71                 }
72             } catch (InterruptedException | ArrayIndexOutOfBoundsException e) {
73             }
74             finally {
75                 lock.unlock();
76             }
77         }
78         
79         return buf;
80     }
81     
82     public void put(ByteBuffer buf) {
83         try {
84             lock.lock();
85             queue.add(buf);
86             waitQueue.signal();
87         }
88         finally {
89             lock.unlock();
90         }
91     }
92 }

這個佇列程式在初始化時先產生足夠的 ByteBuffer 備用,這樣效率會比較好,因為這個系統會產生大量的資料,一秒鐘就有超過百筆,如果每次都用 ByteBuffer.allocate(...) 產生,會造成效率上的瓶頸。

在 take() method 中,如果第一次從佇列中取資料,結果佇列沒資料回傳 null,就等 10 秒再取一次。在寫 multi-thread 程式時,盡量不要用 sleep 來等其它 thread,這樣很沒有效率,應該用 await,當其它 thread 完成正等待的事情後 (這裡就是等 producer 產生資料放入佇列後),該 thread 呼叫 signal 就可以繼續往下執行,程式第 86 行就是當有資料放入佇列後,會呼叫 signal。

 1 package idv.steven.patterns;
 2 
 3 import java.nio.ByteBuffer;
 4 
 5 public class Consumer implements Runnable {
 6     private String name = "";
 7     private Queue queue = null;
 8     
 9     public Consumer(String name, Queue queue) {
10         this.name = name;
11         this.queue = queue;
12     }
13 
14     @Override
15     public void run() {
16         ByteBuffer buf = null;
17         
18         int i = 1;
19         while ((buf = queue.take()) != null) {
20             byte[] dst = new byte[buf.limit()];
21             buf.get(dst, 0, buf.limit());
22             String s = new String(dst);
23             System.out.println(name + "(" + (i++) + "):" + s);
24             queue.release(buf);
25         }
26     }
27 }

Consumer (消費者) 程式收到資料將它輸出到 console,真正的程式當然要輸出到資料庫或 MQSeries,那會複雜的多,這裡為了說明簡化了。

1 package idv.steven.patterns;
 2 
 3 import java.nio.ByteBuffer;
 4 import java.util.Random;
 5 
 6 public class Producer implements Runnable {
 7     private Queue queue = null;
 8     
 9     @Override
10     public void run() {
11         queue = new Queue(100, 1024);
12         
13         Thread tC1 = createConsumer("c1");
14         Thread tC2 = createConsumer("c2");
15         Thread tC3 = createConsumer("c3");
16         
17         Random rand = new Random();
18         
19         for(int i=0; i<10; i++) {
20             ByteBuffer buf = queue.alloc();
21             if (buf != null) {
22                 buf.put(String.format("%04d", rand.nextInt(10000)).getBytes());
23                 buf.flip();
24                 queue.put(buf);
25             }
26         }
27         
28         try {
29             tC1.join();
30             tC2.join();
31             tC3.join();
32         } catch (InterruptedException e) {
33         }
34     }
35     
36     private Thread createConsumer(String name) {
37         Consumer c = new Consumer(name, queue);
38         Thread tConsumer = new Thread(c);
39         tConsumer.start();
40         
41         return tConsumer;
42     }
43 
44     public static void main(String[] args) {
45         Producer producer = new Producer();
46         Thread tProducer = new Thread(producer);
47         tProducer.start();
48     }
49 }

這個程式是一個 Producer (生產者) 有三個 Consumer (消費者),實際上的應用會依需要而定,所以,Producer-Consumer pattern 會有很多的變形。這裡為了說明方便,由 producer 用亂數產生十個資料,放入佇列中,由 consumer 取出並輸出到 console,佇列中的資料會由那一個 consumer 取得,會依系統的排程決定。輸出的結果如下,當然每次執行會有點差異。
c1(1):2368
c3(1):0728
c2(1):5240
c3(2):3292
c1(2):7614
c3(3):7558
c2(2):8970
c3(4):2345
c1(3):1866
c2(3):5144
佇列也可以更複雜,不同的資料還會有優先權,或是其它各項規則,也可能佇列改成堆壘,這都可視需要而變更。

沒有留言:

張貼留言