| 分類 | 項目 | 日期 | |
|---|---|---|---|
| 1 |
Java
| java.util.concurrent.locks - 臨界區間的讀寫 |
2015/08/13
|
| 2 |
Java
| java.util.concurrent - Future & Callable |
2015/08/15
|
| 3 |
Java
| java.util.concurrent - ExecutorService |
2015/08/16
|
| 4 |
C
| fork Function |
2015/06/06
|
| 5 |
Design Pattern
| Producer-Consumer Pattern |
2015/10/17
|
Google Code Prettify
2015年10月21日 星期三
concurrency programming
2015年10月17日 星期六
Producer-Consumer Pattern
實務上,曾經遇過如上的例子,說明如下:
- 資料源每天在規定的時間(約數小時)快速的產生資料,寫到檔案裡,在該時段裡,約會產生近兩億筆的資料,每筆資料中會有不重複的序號。
- 有兩支程式會由檔案讀出資料,第一支是讀出資料後,寫入資料庫,第二支程式則是寫到 MQSeries,不管是寫到資料庫或 MQSeries,當然是為了下游的其它系統的需要。(方框中的兩支程式)
這裡要說明的是方框中的那兩支程式,這兩支程式的架構肯定會很像,因為都是讀檔後輸出,差別只在於輸出的地方不同。實務上看到的程式架構是,主程式只有一個類別,在該類別中讀取資料後輸出,再讀取下一筆,再輸出,也就是循序的,如下圖:
從 I/O 的效率看,讀檔案的速度遠遠大於寫入資料庫的速度,就算讀與寫沒有分開成兩個執行緒,影響應該是很有限。回到第一個圖,這整套系統有兩支這樣的程式,另一支是寫到 MQSeries,所以,又有一支如圖二架構的程式,差別只在輸出的那部份是寫到 MQSeries。這樣的寫法有兩個明顯的問題:
- 程式碼重複。
- 輸入與輸出的部份相耦合,可重用性(reuse)不高。
要改善這個架構,可以套用 Producer-Consumer Pattern (生產者-消費者 範式),這個 pattern 在多執行緒程式中是很常見的,以第二張圖的架構來說, 輸入的部份會成為 producer,輸出的部份成為 consumer,中間加上一個佇列為中介,即可以將這兩部份解耦,如下圖:
套用 pattern 後的程式,輸入與輸出的程式,要面對的都是佇列,而不需要在意另一端是如何實作,實際上資料源是什麼? 或實際要輸出的儲存體是什麼? 也就是說,輸出的部份要寫兩個類別,一個是輸出到資料庫的,一個是輸出到 MQSeries 的,其它部份都不需要改變,只要依需要,換用適當的輸出類別。這個寫法有以下優點:
- 輸入、輸出解耦,可重用性高。
- 解耦後,各自的邏輯變簡單,程式的可讀性提高。
- 輸入、輸出分開在不同執行緒,效率提高。
現在來看一下程式怎麼寫?
1 package idv.steven.patterns; 2 3 import java.nio.ByteBuffer; 4 import java.util.Vector; 5 import java.util.concurrent.TimeUnit; 6 import java.util.concurrent.locks.Condition; 7 import java.util.concurrent.locks.ReentrantLock; 8 9 public class Queue { 10 private Vector<ByteBuffer> pool = new Vector<ByteBuffer>(); 11 private Vector<ByteBuffer> queue = new Vector<ByteBuffer>(); 12 13 private ReentrantLock lock = new ReentrantLock(); 14 private Condition waitQueue = lock.newCondition(); 15 16 private int size = 0; 17 private int capacity = 0; 18 19 public Queue(int size, int capacity) { 20 this.size = size; 21 this.capacity = capacity; 22 23 for(int i=0; i<size; i++) { 24 pool.add(ByteBuffer.allocate(capacity)); 25 } 26 } 27 28 public ByteBuffer alloc() { 29 ByteBuffer buf = null; 30 31 try { 32 buf = pool.remove(0); 33 } 34 catch (ArrayIndexOutOfBoundsException e) { 35 36 } 37 38 return buf; 39 } 40 41 /** 42 * 取得的 ByteBuffer 使用完一定要記得呼叫此 method,否則資源會越來越少。 43 * @param buf 44 */ 45 public void release(ByteBuffer buf) { 46 if (buf.capacity() != capacity) { 47 buf = ByteBuffer.allocate(capacity); 48 } 49 else { 50 buf.clear(); 51 } 52 53 pool.add(buf); 54 } 55 56 public ByteBuffer take() { 57 ByteBuffer buf = null; 58 59 try { 60 buf = queue.remove(0); 61 } 62 catch (ArrayIndexOutOfBoundsException e) { 63 64 } 65 66 if (buf == null) { 67 try { 68 lock.lock(); 69 if (waitQueue.await(10, TimeUnit.SECONDS)) { 70 buf = queue.remove(0); 71 } 72 } catch (InterruptedException | ArrayIndexOutOfBoundsException e) { 73 } 74 finally { 75 lock.unlock(); 76 } 77 } 78 79 return buf; 80 } 81 82 public void put(ByteBuffer buf) { 83 try { 84 lock.lock(); 85 queue.add(buf); 86 waitQueue.signal(); 87 } 88 finally { 89 lock.unlock(); 90 } 91 } 92 }
這個佇列程式在初始化時先產生足夠的 ByteBuffer 備用,這樣效率會比較好,因為這個系統會產生大量的資料,一秒鐘就有超過百筆,如果每次都用 ByteBuffer.allocate(...) 產生,會造成效率上的瓶頸。
在 take() method 中,如果第一次從佇列中取資料,結果佇列沒資料回傳 null,就等 10 秒再取一次。在寫 multi-thread 程式時,盡量不要用 sleep 來等其它 thread,這樣很沒有效率,應該用 await,當其它 thread 完成正等待的事情後 (這裡就是等 producer 產生資料放入佇列後),該 thread 呼叫 signal 就可以繼續往下執行,程式第 86 行就是當有資料放入佇列後,會呼叫 signal。
1 package idv.steven.patterns; 2 3 import java.nio.ByteBuffer; 4 5 public class Consumer implements Runnable { 6 private String name = ""; 7 private Queue queue = null; 8 9 public Consumer(String name, Queue queue) { 10 this.name = name; 11 this.queue = queue; 12 } 13 14 @Override 15 public void run() { 16 ByteBuffer buf = null; 17 18 int i = 1; 19 while ((buf = queue.take()) != null) { 20 byte[] dst = new byte[buf.limit()]; 21 buf.get(dst, 0, buf.limit()); 22 String s = new String(dst); 23 System.out.println(name + "(" + (i++) + "):" + s); 24 queue.release(buf); 25 } 26 } 27 }
Consumer (消費者) 程式收到資料將它輸出到 console,真正的程式當然要輸出到資料庫或 MQSeries,那會複雜的多,這裡為了說明簡化了。
1 package idv.steven.patterns; 2 3 import java.nio.ByteBuffer; 4 import java.util.Random; 5 6 public class Producer implements Runnable { 7 private Queue queue = null; 8 9 @Override 10 public void run() { 11 queue = new Queue(100, 1024); 12 13 Thread tC1 = createConsumer("c1"); 14 Thread tC2 = createConsumer("c2"); 15 Thread tC3 = createConsumer("c3"); 16 17 Random rand = new Random(); 18 19 for(int i=0; i<10; i++) { 20 ByteBuffer buf = queue.alloc(); 21 if (buf != null) { 22 buf.put(String.format("%04d", rand.nextInt(10000)).getBytes()); 23 buf.flip(); 24 queue.put(buf); 25 } 26 } 27 28 try { 29 tC1.join(); 30 tC2.join(); 31 tC3.join(); 32 } catch (InterruptedException e) { 33 } 34 } 35 36 private Thread createConsumer(String name) { 37 Consumer c = new Consumer(name, queue); 38 Thread tConsumer = new Thread(c); 39 tConsumer.start(); 40 41 return tConsumer; 42 } 43 44 public static void main(String[] args) { 45 Producer producer = new Producer(); 46 Thread tProducer = new Thread(producer); 47 tProducer.start(); 48 } 49 }
這個程式是一個 Producer (生產者) 有三個 Consumer (消費者),實際上的應用會依需要而定,所以,Producer-Consumer pattern 會有很多的變形。這裡為了說明方便,由 producer 用亂數產生十個資料,放入佇列中,由 consumer 取出並輸出到 console,佇列中的資料會由那一個 consumer 取得,會依系統的排程決定。輸出的結果如下,當然每次執行會有點差異。
c1(1):2368
c3(1):0728
c2(1):5240
c3(2):3292
c1(2):7614
c3(3):7558
c2(2):8970
c3(4):2345
c1(3):1866
c2(3):5144
佇列也可以更複雜,不同的資料還會有優先權,或是其它各項規則,也可能佇列改成堆壘,這都可視需要而變更。
2015年8月13日 星期四
java.util.concurrent.locks - 臨界區間的讀寫
java 在推出時,提供 synchronized、notify、wait 的簡單方式,讓多執行緒程式可以控制臨界區間的存取,到了 Java 5 之後加入 java.util.concurrent 這個 package,又提供了一些 interface (介面) 及 class (類別),這些新的 interface、class 比之前的方法更有彈性、更有效率,這裡簡要的說明 java.util.concurrent.locks 下的主要類別與介面。下圖的 class diagram (類別圖) 僅是主要的 interface、class 及其包含的主要 method,要了解全貌,請參考 Java Documentation。
- ReadWriteLock
ReadWriteLock lock = new ReentrantReadWriteLock();//... lock.writeLock().lock(); //取得寫入鎖定 try { ... } finally { lock.writeLock().unlock(); //解除寫入鎖定 }
上面的程式稱不上是一個範例,主要說明使用 ReadWriteLock 的優缺點及注意事項:
- 跟 synchronized 比起來,解除了只能針一個 method 或一個 block 設定臨界區間的限制,但是要注意一定要記得 unlock,否則會防礙到其他執行緒進入臨界區間,所以上面的程式,將 unlock() 放在 finally 以確保其最後一定會被執行。
- 請回頭看一下類別圖中的 ReadWriteLock,它提供的是有 read lock 和 write lock,這可以改善 synchronized 的效率,synchronized 只要有一個 thread 搶到了 lock,就得等它執行完臨界區間內所有程式,並離開後,其它等著進入臨界區間的 thread 才能進入。現在分出了 read lock 及 write lock,當取得 write lock 的 thread 進入了臨界區間,其它 thread 也只能等待,但是,如果是取得 read lock 的 thread 進入臨界區間,是可以有多個 read lock thread 同時進入,這就改善了效率,又不會讓資料不一致。
- Lock
ReentrantLock lock = new ReentrantLock(); try { if (lock.tryLock()) { //臨界區間 - do something } } finally { if (lock.isHeldByCurrentThread()) { lock.unlock(); } }
interface Lock 只有一個實作類別 ReetrantLock,這裡要特別注意 tryLock() 這個 method,它相當有趣,因為當遇到臨界區間時,使用這個 method,如果取得 lock 就傳 true,沒有取得就傳回 false,程式可以依傳回值決定做什麼事,以免有 thread 呆呆的一直等著進臨界區間而浪費時間。
要記得,unlock 前也要先用 isHeldByCurrentThread() 這個 method 判斷一下目前是否有取得 lock,有的話才 unlock。
- Condition
前面提到的 lock 是可用來取代 synchronized 的新解法,Condition 的 signal、await 則是與 notify、wait 相當。使用的方法可以參考官網 Condition 的範例如下:
1 class BoundedBuffer { 2 final Lock lock = new ReentrantLock(); 3 final Condition notFull = lock.newCondition(); 4 final Condition notEmpty = lock.newCondition(); 5 6 final Object[] items = new Object[100]; 7 int putptr, takeptr, count; 8 9 public void put(Object x) throws InterruptedException { 10 lock.lock(); 11 try { 12 while (count == items.length) 13 notFull.await(); 14 items[putptr] = x; 15 if (++putptr == items.length) putptr = 0; 16 ++count; 17 notEmpty.signal(); 18 } finally { 19 lock.unlock(); 20 } 21 } 22 23 public Object take() throws InterruptedException { 24 lock.lock(); 25 try { 26 while (count == 0) 27 notEmpty.await(); 28 Object x = items[takeptr]; 29 if (++takeptr == items.length) takeptr = 0; 30 --count; 31 notFull.signal(); 32 return x; 33 } finally { 34 lock.unlock(); 35 } 36 } 37 }
這是一個很簡單的"生產者"、"消費者"的例子,當陣列中沒有任何物件時,消費者 (take) 就需等待 (line 27),當陣列滿了時,生產者 (put) 也需等待 (line 13); 如果有新的物件產生生產者會通知消費者 (line 17),如果陣列還有空間,消費者也會通知生產者 (line 31)。
特別注意一下 2~4 行,Condition 是由 lock 的 newCondition() 產生,還有,就如 notify、wait 必需位於 synchronized 區間內一樣,Condition 的 await、signal 也要位於 lock、unlock 之間。
- StampedLock
前面提到的 ReadWriteLock 可允許多個 readLock 的執行緒同時進入臨界區間,但只允許一個 writeLock 的執行緒進入臨界區間,且當有 writeLock 的執行緒位於臨界區間內,即不允許其它執行緒取得 readLock、writeLock,這會有個問題,當程式有很多讀取的執行緒,只有很少的寫入執行緒,臨界區間大部份時間被取得 readLock 執行緒佔據,寫入的執行緒會很難取得 writeLock 而長期處於等待狀態。為了解決這個問題,Java 8 提供了 StampedLock 這個新類別。
1 public class BankAccountStampedLock { 2 private final StampedLock sl = new StampedLock(); 3 private long balance; 4 5 public BankAccountStampedLock(long balance) { 6 this.balance = balance; 7 } 8 9 public void deposit(long amount) { 10 long stamp = sl.writeLock(); 11 try { 12 balance += amount; 13 } finally { 14 sl.unlockWrite(stamp); 15 } 16 } 17 18 public void withdraw(long amount) { 19 long stamp = sl.writeLock(); 20 try { 21 balance -= amount; 22 } finally { 23 sl.unlockWrite(stamp); 24 } 25 } 26 27 public long getBalance() { 28 long stamp = sl.readLock(); 29 try { 30 return balance; 31 } finally { 32 sl.unlockRead(stamp); 33 } 34 } 35 36 public long getBalanceOptimisticRead() { 37 long stamp = sl.tryOptimisticRead(); 38 long balance = this.balance; 39 if (!sl.validate(stamp)) { 40 stamp = sl.readLock(); 41 try { 42 balance = this.balance; 43 } finally { 44 sl.unlockRead(stamp); 45 } 46 } 47 return balance; 48 } 49 }
這是一個存、提款的範例程式,來源是 javaspecialists 網站。這個程式和完全只使用 ReadWriteLock 最大差別在於 getBalanceOptimisticRead(),第 37 行先呼叫 tryOptimisticRead 取得一個樂觀讀取鎖定,第 38 行取得存款餘額,第 39 行判斷看看是否在取得樂觀讀取鎖定後,有臨界區間有取得寫入鎖定的執行緒進入,沒有的話,直接回傳存款餘額 (line 47),萬一有的話,balance 的值有可能已經被改變,所以就要謹慎的取得讀取鎖定 (line 40),再取得真正最新的值後再解鎖 (line 44)。
標籤:
平行程式,
多執行緒,
臨界區間,
concurrent,
Java,
lock,
multi-thread,
StampedLock,
thread
訂閱:
意見 (Atom)



