實務上,曾經遇過如上的例子,說明如下:
- 資料源每天在規定的時間(約數小時)快速的產生資料,寫到檔案裡,在該時段裡,約會產生近兩億筆的資料,每筆資料中會有不重複的序號。
- 有兩支程式會由檔案讀出資料,第一支是讀出資料後,寫入資料庫,第二支程式則是寫到 MQSeries,不管是寫到資料庫或 MQSeries,當然是為了下游的其它系統的需要。(方框中的兩支程式)
這裡要說明的是方框中的那兩支程式,這兩支程式的架構肯定會很像,因為都是讀檔後輸出,差別只在於輸出的地方不同。實務上看到的程式架構是,主程式只有一個類別,在該類別中讀取資料後輸出,再讀取下一筆,再輸出,也就是循序的,如下圖:
從 I/O 的效率看,讀檔案的速度遠遠大於寫入資料庫的速度,就算讀與寫沒有分開成兩個執行緒,影響應該是很有限。回到第一個圖,這整套系統有兩支這樣的程式,另一支是寫到 MQSeries,所以,又有一支如圖二架構的程式,差別只在輸出的那部份是寫到 MQSeries。這樣的寫法有兩個明顯的問題:
- 程式碼重複。
- 輸入與輸出的部份相耦合,可重用性(reuse)不高。
要改善這個架構,可以套用 Producer-Consumer Pattern (生產者-消費者 範式),這個 pattern 在多執行緒程式中是很常見的,以第二張圖的架構來說, 輸入的部份會成為 producer,輸出的部份成為 consumer,中間加上一個佇列為中介,即可以將這兩部份解耦,如下圖:
套用 pattern 後的程式,輸入與輸出的程式,要面對的都是佇列,而不需要在意另一端是如何實作,實際上資料源是什麼? 或實際要輸出的儲存體是什麼? 也就是說,輸出的部份要寫兩個類別,一個是輸出到資料庫的,一個是輸出到 MQSeries 的,其它部份都不需要改變,只要依需要,換用適當的輸出類別。這個寫法有以下優點:
- 輸入、輸出解耦,可重用性高。
- 解耦後,各自的邏輯變簡單,程式的可讀性提高。
- 輸入、輸出分開在不同執行緒,效率提高。
現在來看一下程式怎麼寫?
1 package idv.steven.patterns; 2 3 import java.nio.ByteBuffer; 4 import java.util.Vector; 5 import java.util.concurrent.TimeUnit; 6 import java.util.concurrent.locks.Condition; 7 import java.util.concurrent.locks.ReentrantLock; 8 9 public class Queue { 10 private Vector<ByteBuffer> pool = new Vector<ByteBuffer>(); 11 private Vector<ByteBuffer> queue = new Vector<ByteBuffer>(); 12 13 private ReentrantLock lock = new ReentrantLock(); 14 private Condition waitQueue = lock.newCondition(); 15 16 private int size = 0; 17 private int capacity = 0; 18 19 public Queue(int size, int capacity) { 20 this.size = size; 21 this.capacity = capacity; 22 23 for(int i=0; i<size; i++) { 24 pool.add(ByteBuffer.allocate(capacity)); 25 } 26 } 27 28 public ByteBuffer alloc() { 29 ByteBuffer buf = null; 30 31 try { 32 buf = pool.remove(0); 33 } 34 catch (ArrayIndexOutOfBoundsException e) { 35 36 } 37 38 return buf; 39 } 40 41 /** 42 * 取得的 ByteBuffer 使用完一定要記得呼叫此 method,否則資源會越來越少。 43 * @param buf 44 */ 45 public void release(ByteBuffer buf) { 46 if (buf.capacity() != capacity) { 47 buf = ByteBuffer.allocate(capacity); 48 } 49 else { 50 buf.clear(); 51 } 52 53 pool.add(buf); 54 } 55 56 public ByteBuffer take() { 57 ByteBuffer buf = null; 58 59 try { 60 buf = queue.remove(0); 61 } 62 catch (ArrayIndexOutOfBoundsException e) { 63 64 } 65 66 if (buf == null) { 67 try { 68 lock.lock(); 69 if (waitQueue.await(10, TimeUnit.SECONDS)) { 70 buf = queue.remove(0); 71 } 72 } catch (InterruptedException | ArrayIndexOutOfBoundsException e) { 73 } 74 finally { 75 lock.unlock(); 76 } 77 } 78 79 return buf; 80 } 81 82 public void put(ByteBuffer buf) { 83 try { 84 lock.lock(); 85 queue.add(buf); 86 waitQueue.signal(); 87 } 88 finally { 89 lock.unlock(); 90 } 91 } 92 }
這個佇列程式在初始化時先產生足夠的 ByteBuffer 備用,這樣效率會比較好,因為這個系統會產生大量的資料,一秒鐘就有超過百筆,如果每次都用 ByteBuffer.allocate(...) 產生,會造成效率上的瓶頸。
在 take() method 中,如果第一次從佇列中取資料,結果佇列沒資料回傳 null,就等 10 秒再取一次。在寫 multi-thread 程式時,盡量不要用 sleep 來等其它 thread,這樣很沒有效率,應該用 await,當其它 thread 完成正等待的事情後 (這裡就是等 producer 產生資料放入佇列後),該 thread 呼叫 signal 就可以繼續往下執行,程式第 86 行就是當有資料放入佇列後,會呼叫 signal。
1 package idv.steven.patterns; 2 3 import java.nio.ByteBuffer; 4 5 public class Consumer implements Runnable { 6 private String name = ""; 7 private Queue queue = null; 8 9 public Consumer(String name, Queue queue) { 10 this.name = name; 11 this.queue = queue; 12 } 13 14 @Override 15 public void run() { 16 ByteBuffer buf = null; 17 18 int i = 1; 19 while ((buf = queue.take()) != null) { 20 byte[] dst = new byte[buf.limit()]; 21 buf.get(dst, 0, buf.limit()); 22 String s = new String(dst); 23 System.out.println(name + "(" + (i++) + "):" + s); 24 queue.release(buf); 25 } 26 } 27 }
Consumer (消費者) 程式收到資料將它輸出到 console,真正的程式當然要輸出到資料庫或 MQSeries,那會複雜的多,這裡為了說明簡化了。
1 package idv.steven.patterns; 2 3 import java.nio.ByteBuffer; 4 import java.util.Random; 5 6 public class Producer implements Runnable { 7 private Queue queue = null; 8 9 @Override 10 public void run() { 11 queue = new Queue(100, 1024); 12 13 Thread tC1 = createConsumer("c1"); 14 Thread tC2 = createConsumer("c2"); 15 Thread tC3 = createConsumer("c3"); 16 17 Random rand = new Random(); 18 19 for(int i=0; i<10; i++) { 20 ByteBuffer buf = queue.alloc(); 21 if (buf != null) { 22 buf.put(String.format("%04d", rand.nextInt(10000)).getBytes()); 23 buf.flip(); 24 queue.put(buf); 25 } 26 } 27 28 try { 29 tC1.join(); 30 tC2.join(); 31 tC3.join(); 32 } catch (InterruptedException e) { 33 } 34 } 35 36 private Thread createConsumer(String name) { 37 Consumer c = new Consumer(name, queue); 38 Thread tConsumer = new Thread(c); 39 tConsumer.start(); 40 41 return tConsumer; 42 } 43 44 public static void main(String[] args) { 45 Producer producer = new Producer(); 46 Thread tProducer = new Thread(producer); 47 tProducer.start(); 48 } 49 }
這個程式是一個 Producer (生產者) 有三個 Consumer (消費者),實際上的應用會依需要而定,所以,Producer-Consumer pattern 會有很多的變形。這裡為了說明方便,由 producer 用亂數產生十個資料,放入佇列中,由 consumer 取出並輸出到 console,佇列中的資料會由那一個 consumer 取得,會依系統的排程決定。輸出的結果如下,當然每次執行會有點差異。
c1(1):2368
c3(1):0728
c2(1):5240
c3(2):3292
c1(2):7614
c3(3):7558
c2(2):8970
c3(4):2345
c1(3):1866
c2(3):5144
佇列也可以更複雜,不同的資料還會有優先權,或是其它各項規則,也可能佇列改成堆壘,這都可視需要而變更。
沒有留言:
張貼留言