| 項目 | 日期 | |
|---|---|---|
| 1 | Producer-Consumer Pattern |
2015/10/17
|
| 2 | Tibco RV - fault tolerance |
2015/07/10
|
| 3 | 剖析固定長度欄位的訊息字串 |
2014/08/09
|
| 4 | 即時更新程式設定 |
2014/06/15
|
| 5 | BCD pack & unpack |
2015/05/10
|
| 6 | 在 WebSphere 8.5.5.x 上遇到的 jar 檔版本和衝突問題 |
2017/03/30
|
| 7 | 台股: 手續費與交易稅的計算 |
2016/09/03
|
| 8 | 台股: 配息之已實現損益及未實現損益的計算 |
2016/09/12
|
| 9 | 移除 VirtualBox 產生的「網路連線」介面卡 |
2021/02/16
|
| 10 | 解決 eclipse 2021-09 + lombok 產生的錯誤 |
2021/09/19
|
| 11 | IPv4 內部 IP |
2021/09/28
|
| 12 | jasypt 加解密 |
2022/07/06
|
Google Code Prettify
2015年11月7日 星期六
Practice
在實務上遇到的一些問題及提出來的解決辦法,分享出來,希望有鄉民可以提供更好的建議。
標籤:
台股,
重構,
design pattern,
JWT,
refactoring,
REST,
TibcoRV,
WebSphere
2015年10月23日 星期五
Tibco RV / JMS / Apache Kafka
| 項目 | 日期 | |
|---|---|---|
| 1 | Tibco RV request/reply 的同步與非同步 |
2015/02/07
|
| 2 | Tibco RV 的 Queue |
2015/05/05
|
| 3 | Tibco RV - fault tolerance |
2015/07/10
|
| 4 | JMS: getting started |
2015/08/30
|
| 5 | JMS: 訊息的標頭、屬性和種類 |
2015/08/30
|
2015年7月10日 星期五
Tibco RV - fault tolerance
有很多系統不允許(在某個特定時段)服務中斷,這類的系統不只會有備援,通常還會有 fault tolerance 機制,當系統掛掉時,備援的系統會自動啟動服務。Tibco RV 也有提供這樣的機制。
在了解程式如何編寫之前,我先對 RV 提供的 fault tolerance 作簡單的說明。
在了解程式如何編寫之前,我先對 RV 提供的 fault tolerance 作簡單的說明。
- group name
RV 的 fault tolerance 以 group name 來區分,也就是同一個 group name 的程式會成為一個群組,相互備援。
- active & inactive
RV 將 fault tolerance 的系統狀態分成三個階段,如下:
- DEACTIVATE: 顧名思義,就是非屬於提供服務的狀態,這時候系統當然就別做什麼事,只要一直傾聽 RV 的訊息,當傾聽到要切換狀態時再進行相關工作。
- PREPARE_TO_ACTIVATE: 當 RV 送來這個狀態,表示原本正提供服務的系統可能出了狀況,備援的系統要開始準備啟動,通常收到這個 event 時,會進行一些系統資源初始化的工作,以便當程式真的要啟動提供服務時,可以儘快啟動,以使服務中斷的時間儘可能的縮短。
- ACTIVATE: 收到這個 event,就真的要接手服務了!
- fault tolerance callback function
上述三個狀態的切換,RV 會透過 onFtAction 這個 callback function 通知備援的程式,所以程式要實作 TibrvFtMemberCallback 這個介面。
- active goal
在我們將測試的例子裡,只會啟動兩個 reciver 程式,一次只會有一個程式提供服務 (接收 generator 傳來的數字並輸出到 console),但是,實務上要有幾支程式處於 ACTIVATE 狀態是可以設定的,這個數字稱為 active goal。
- weight
每支程式建立 fault tolerance 時,在建構子的參數裡會有個 weight 的參數,這個值由 1 到整數的最大值,數字越大優先權越大,所以,如果有三支程式 A、B、C,其權重 (weight) 分別為 50、30、100,當 RV 要從這三支程式裡挑一支啟動來提供服務時,就會挑權重最大的 C。
- heart beats
每個處於 ACTIVATE 狀態的程式,會不停的送出 heart beats (心跳) 訊息給其它同群組的程式,以表示它還活著,還在正常提供服務。
這裡舉的例子會有兩支程式,一支命名為 NumberGenerator,很簡單的每秒送出一個累加的數字,另一支命名為 NumberReceiver,會由 RV 接收 NumberGenerator 送出的數字,並輸出到 console。假設接收數字的這個系統是我們負責的系統,是服務不能中斷的,我們以 RV 提供的 fault tolerance 機制,讓 NumberReceiver 萬一掛掉時,可以立即啟動另一個備援的系統,所以,測試時我們會同時啟動兩個 NumberReceiver 先啟動的會成為 active 的服務,後啟動的為備援,先來看一下程式。
1 package idv.steven.rv.ft; 2 3 import java.io.UnsupportedEncodingException; 4 5 import com.tibco.tibrv.Tibrv; 6 import com.tibco.tibrv.TibrvException; 7 import com.tibco.tibrv.TibrvMsg; 8 import com.tibco.tibrv.TibrvRvdTransport; 9 import com.tibco.tibrv.TibrvTransport; 10 11 public class NumberGenerator { 12 private String service = "7500"; 13 private String network = ";225.1.1.1"; 14 private String daemon = "tcp:7500"; 15 private String subject = "DEMO.FT.NUM"; 16 17 public void run() { 18 try { 19 Tibrv.open(Tibrv.IMPL_NATIVE); 20 TibrvMsg.setStringEncoding("Big5"); 21 TibrvTransport transport = new TibrvRvdTransport(service, network, daemon); 22 23 for(int i=1; i<=1000; i++) { 24 TibrvMsg msg = new TibrvMsg(); 25 msg.setSendSubject(subject); 26 msg.update("number", i); 27 28 transport.send(msg); 29 30 try { 31 Thread.sleep(1000); 32 } catch (InterruptedException e) { 33 } 34 } 35 36 Tibrv.close(); 37 } catch (TibrvException | UnsupportedEncodingException e) { 38 e.printStackTrace(); 39 } 40 } 41 42 public static void main(String[] args) { 43 NumberGenerator gen = new NumberGenerator(); 44 gen.run(); 45 System.out.println("stop"); 46 } 47 }
上面的 NumberGenerator.java 很簡單的每秒送出一個累加的數字,由 1 累加到 1000 為止。對於上面程式有不了解的,可以先參考「Tibco RV request/reply 的同步與非同步」,之後再回頭來看 fault tolerance。接下來要進入主題,看一下 fault tolerance 程式。
1 package idv.steven.rv.ft; 2 3 import com.tibco.tibrv.Tibrv; 4 import com.tibco.tibrv.TibrvException; 5 import com.tibco.tibrv.TibrvFtMember; 6 import com.tibco.tibrv.TibrvFtMemberCallback; 7 import com.tibco.tibrv.TibrvListener; 8 import com.tibco.tibrv.TibrvMsg; 9 import com.tibco.tibrv.TibrvMsgCallback; 10 import com.tibco.tibrv.TibrvRvdTransport; 11 12 public class NumberReceiver implements TibrvMsgCallback, TibrvFtMemberCallback, Runnable { 13 private String service = "7500"; 14 private String network = ";225.1.1.1"; 15 private String daemon = "tcp:7500"; 16 private String subject = "DEMO.FT.NUM"; 17 18 private String ftservice = "7504"; 19 private String ftnetwork = ";225.1.10.1"; 20 private String ftdaemon = "tcp:7504"; 21 22 private String ftgroupName = "DEMO.FT.GROUP"; 23 private int ftweight = 50; 24 private int activeGoalNum = 1; 25 private double hbInterval = 1.5; 26 private double prepareInterval = 3; 27 private double activateInterval = 4.8; 28 29 private TibrvRvdTransport transport = null; 30 private TibrvListener listener = null; 31 32 private boolean active = false; 33 34 @Override 35 public void run() { 36 try { 37 Tibrv.open(Tibrv.IMPL_NATIVE); 38 transport = new TibrvRvdTransport(service, network, daemon); 39 TibrvRvdTransport fttransport = new TibrvRvdTransport(ftservice, ftnetwork, ftdaemon); 40 fttransport.setDescription("fault tolerance"); 41 42 new TibrvFtMember(Tibrv.defaultQueue(), // TibrvQueue 43 this, // TibrvFtMemberCallback 44 fttransport, // TibrvTransport 45 ftgroupName, // groupName 46 ftweight, // weight 47 activeGoalNum, // activeGoal 48 hbInterval, // heartbeatInterval 49 prepareInterval, // preparationInterval, 50 // Zero is a special value, 51 // indicating that the member does 52 // not need advance warning to activate 53 activateInterval, // activationInterval 54 null); // closure 55 56 57 while(true) { 58 try { 59 Tibrv.defaultQueue().dispatch(); 60 } 61 catch (TibrvException e) { 62 System.err.println("Exception dispatching default queue:"); 63 System.exit(0); 64 } 65 catch(InterruptedException ie) { 66 System.exit(0); 67 } 68 } 69 } catch (TibrvException e) { 70 e.printStackTrace(); 71 } 72 } 73 74 void enableListener() { 75 try { 76 // Subscribe to subject 77 listener = new TibrvListener(Tibrv.defaultQueue(), 78 this, 79 transport, 80 subject, 81 null); 82 System.out.println("Start Listening on: " + subject); 83 } 84 catch (TibrvException e) { 85 System.err.println("Failed to create subject listener:"); 86 System.exit(0); 87 } 88 } 89 90 void disableListener() { 91 listener.destroy(); 92 System.out.println("Destroy Listener on Subject: " + subject); 93 } 94 95 @Override 96 public void onFtAction(TibrvFtMember member, String ftgroupName, int action) { 97 if (action == TibrvFtMember.PREPARE_TO_ACTIVATE) { 98 System.out.println("TibrvFtMember.PREPARE_TO_ACTIVATE invoked..."); 99 System.out.println("*** PREPARE TO ACTIVATE: " + ftgroupName); 100 } 101 else if (action == TibrvFtMember.ACTIVATE) { 102 System.out.println("TibrvFtMember.ACTIVATE invoked..."); 103 System.out.println("*** ACTIVATE: " + ftgroupName); 104 enableListener(); 105 active = true; 106 } 107 else if (action == TibrvFtMember.DEACTIVATE) { 108 System.out.println("TibrvFtMember.DEACTIVATE invoked..."); 109 System.out.println("*** DEACTIVATE: " + ftgroupName); 110 disableListener(); 111 active = false; 112 } 113 } 114 115 @Override 116 public void onMsg(TibrvListener listener, TibrvMsg msg) { 117 if (subject.equals(listener.getSubject())) { 118 try { 119 int num = msg.getAsInt("number", 0); 120 System.out.println("number: " + num); 121 } catch (TibrvException e) { 122 e.printStackTrace(); 123 } 124 } 125 } 126 127 public static void main(String[] args) throws InterruptedException { 128 NumberReceiver rcv = new NumberReceiver(); 129 Thread tRcv = new Thread(rcv); 130 tRcv.start(); 131 tRcv.join(); 132 System.out.println("stop"); 133 } 134 }
上面的程式與 fault tolerance 有關的是第 39~54 行及第 96~113 行,現在說明 NumberReceiver.java 程式,如下:
- Line 12: 所有參與 fault tolerance 的程式都要實作 TibrvFtMemberCallback。
- Line 39: 建立一個 fault tolerance 的通道。
- Line 42: 使這支程式成為群組中的一個成員 (member),這裡面有好幾個和時間有關的參數 - hbInterval、prepareInterval、activeInterval,單位是秒,指的是 heart beat 的時間、進入 PREPARE_TO_ACTIVATE 的時間及切換到 ACTIVATE 的時間,在這裡設定為 1.5、3、4.8,表示每三秒會有一次的心跳訊息,當 3 秒鐘都沒有收到訊息即進入 PREPARE_TO_ACTIVATE 狀態,當 4.8 秒沒有收到心跳訊息,就切換到 ACTIVATE 狀態。
- Line 74~90: 這兩個 method (enableListener、disableListener) 是用來啟動服務及停止服務。
- Line 96~113: onFtAction 是 fault tolerance 的 callback function,這裡可以接收到狀態改變的 event。
- Line 116~125: 收到 NumberGenerator 送來的訊息,將它輸出到 console。
現在進行測試 …
- 首先啟動 NumberGenerator …
- 開啟兩個「命令提示字元」的視窗,並同時啟動 NumberReceiver,可以看到其中一個開始提供服務,如下圖:
- 在左邊原本正在提供服務的程式按 Ctrl-C 以中斷服務,可以看到右邊的程式接手了服務。
- 再次啟動左邊的程式,左邊的程式將處於 DEACTIVATE 的狀態。
- 在右邊視窗中按 Ctrl-C 中斷右邊的程式,可以看到左邊的程式又接手服務。
【實戰】
上面的例子程式的狀態都是由 RV 偵測後透過 call back method 通知,也就是說 RV 是主動,程式是被動,但是某些應用裡,程式也可能必須改變狀態,例如正處於 ACTIVATE 狀態的程式發現自己其實有一些問題,並沒有正常運作,希望由同群組的別支程式接手服務,是否有辦法自己變更狀態?
TibrvFtMember 的 method 只有一個 setter method - setWeight,用來變更權重,這對於正處於 ACTIVATE 狀態的程式來說,並沒辦法將自己的運作權力交出,解決的辦法就是呼叫 destroy 退出群組,這樣 RV 就會再由群組中找出一個 DEACTIVATE 的 member,通知他提升到 ACTIVATE。
【日劇 - I'm Home】
以木村拓哉演出的日劇來說,這齣戲的收視率當然偏低,但其實這部日劇還不錯看,以家路久(木村拓哉 飾)的失憶後追尋記憶,逐漸帶出許多隱藏的秘密 …
山口麻友在劇中演出木村拓哉沒有血緣的女兒,她的眼睛相當特別,像月亮一樣彎彎的 ...
2015年5月5日 星期二
Tibco RV 的 Queue
回顧「Tibco RV request/reply 的同步與非同步」一文裡面的 Server.java,它使用的是 Tibco RV 的 default queue 進行訊息的分派 (line 36、39),這麼做的最大好處是簡單,但是,基於以下理由,一般的應用程式不會使用 default queue,而是會每個應用程式自行建立自己的 queue,理由如下:
這裡將 Server.java 的訊息分派那一段程式改寫,如下:
【日劇 - 影子寫手】
同一個故事,每個人看到的面向多不盡相同,這部影集在川原由樹 (水川麻美 飾) 揭露出自己長期當名作家遠野理紗 (中谷美紀 飾) 的槍手後,除了真相被新聞媒體聯手封殺外,她的寫作之路也被各大出版社聯手斬斷了! 最後更在司法上慘敗,面臨鉅額賠償! 這是很正常的,社會的各既得利益者會為保護自己的權益及權力,聯手且不擇手段的打擊任何對手。
台灣這幾年喊的震天響的「轉型正義」,真的要成功,一定會讓目前的許多既得利益階級喪失非常多的特權,這些特權對他們說都是天經地義,原本就屬於他們自己的,就像皇后生下的長子就該立為太子,就該繼承皇位那麼的自然。所以當改革真的開始啟動後,這些既得利益階級(反動勢力?)的反撲是必然的,也絕對是會讓台灣陷入一段時間的動盪、不安。
【每日一字】
class warfare - fighting or disagreement between social classes, usually with the lower classes trying to take power and money away from the upper classes (階級鬥爭)
* Some people think that class warfare is unavoidable and that as the upper classes continue to get richer, the lower classes will star a revolution.
(資料來源: ESL Podcast 573)
- 每個 queue 的訊息是以 FIFO 的方式消化掉,如果所有應用程式共用一個,所有的訊息會混雜在一起,會造成塞車。
- 不同的 queue 是以 round-robin 的方式消化,default queue 的優先順序是 1,也就是最低的優先順序!
- 使用自己建立的 queue,可以設定一些參數,以自己希望的方式消化訊息。
這裡將 Server.java 的訊息分派那一段程式改寫,如下:
TibrvQueue queue = new TibrvQueue();
queue.setPriority(10);
queue.setName("MyQueue");
queue.setLimitPolicy(TibrvQueue.DISCARD_LAST, 100, 1);
new TibrvListener(queue, this, transport, subject, null);
while (!eventReceived) {
eventReceived = queue.timedDispatch(server_timeout);
if (eventReceived) {
System.out.println("receive a message");
}
else {
System.out.println("timeout");
}
}
從上面的程式,可以看到我們為 queue 設定了比較高的優先順序,也為 queue 取名,以方便 debug,最重要的是,還可以用 setLimitPolicy 設定一些訊息的處理原則,它的三個參數說明如下:
- 第一個參數: 當 queue 的訊息滿了,又有新的訊息傳過來,來不及處理時,queue 該如何處理?
- 第二個參數: 這個 queue 最多可以放幾個訊息。
- 第三個參數: 當要放棄一些訊息時,一次要放棄幾個?
【日劇 - 影子寫手】
同一個故事,每個人看到的面向多不盡相同,這部影集在川原由樹 (水川麻美 飾) 揭露出自己長期當名作家遠野理紗 (中谷美紀 飾) 的槍手後,除了真相被新聞媒體聯手封殺外,她的寫作之路也被各大出版社聯手斬斷了! 最後更在司法上慘敗,面臨鉅額賠償! 這是很正常的,社會的各既得利益者會為保護自己的權益及權力,聯手且不擇手段的打擊任何對手。
台灣這幾年喊的震天響的「轉型正義」,真的要成功,一定會讓目前的許多既得利益階級喪失非常多的特權,這些特權對他們說都是天經地義,原本就屬於他們自己的,就像皇后生下的長子就該立為太子,就該繼承皇位那麼的自然。所以當改革真的開始啟動後,這些既得利益階級(反動勢力?)的反撲是必然的,也絕對是會讓台灣陷入一段時間的動盪、不安。
【每日一字】
class warfare - fighting or disagreement between social classes, usually with the lower classes trying to take power and money away from the upper classes (階級鬥爭)
* Some people think that class warfare is unavoidable and that as the upper classes continue to get richer, the lower classes will star a revolution.
(資料來源: ESL Podcast 573)
2015年2月7日 星期六
Tibco RV request/reply 的同步與非同步
Tibco RV 有提供 request/reply 模式,也有提供 publish/subscribe 模式,這兩種模式的用途分別是,request/reply 用在一對一的狀況下,而 publish/subscribe 則是用在一對多。雖然 request/reply 是用在一對一,Tibco RV 仍提供了同步與非同步兩種模式,在說明同步和非同步之前,先看一下 server 端的程式,如下:
1 package idv.steven.rv; 2 3 import com.tibco.tibrv.Tibrv; 4 import com.tibco.tibrv.TibrvException; 5 import com.tibco.tibrv.TibrvListener; 6 import com.tibco.tibrv.TibrvMsg; 7 import com.tibco.tibrv.TibrvMsgCallback; 8 import com.tibco.tibrv.TibrvMsgField; 9 import com.tibco.tibrv.TibrvRvdTransport; 10 import com.tibco.tibrv.TibrvTransport; 11 12 public class Server implements TibrvMsgCallback { 13 private TibrvTransport transport = null; 14 15 private String service = null; 16 private String network = null; 17 private String daemon = null; 18 private String subject = null; 19 20 private double server_timeout = 60; 21 22 public void run(String[] args) { 23 boolean eventReceived = false; 24 25 int i = loadParameters(args); 26 if (i > args.length-1) { 27 usage(); 28 return; 29 } 30 31 try { 32 Tibrv.open(Tibrv.IMPL_NATIVE); 33 34 transport = new TibrvRvdTransport(service,network,daemon); 35 subject = args[args.length-1]; 36 new TibrvListener(Tibrv.defaultQueue(), this, transport, subject, null); 37 38 while (!eventReceived) { 39 eventReceived = Tibrv.defaultQueue().timedDispatch(server_timeout); 40 if (eventReceived) { 41 System.out.println("receive a message"); 42 } 43 else { 44 System.out.println("timeout"); 45 } 46 } 47 } catch (TibrvException | InterruptedException e) { 48 e.printStackTrace(); 49 } 50 finally { 51 try { 52 Tibrv.close(); 53 } catch (TibrvException e) { 54 e.printStackTrace(); 55 } 56 } 57 } 58 59 @Override 60 public void onMsg(TibrvListener listener, TibrvMsg msg) { 61 String replySubject = msg.getReplySubject(); 62 if (replySubject == null) { 63 System.out.println("no reply subject,discard client's request"); 64 return; 65 }
System.out.println("TibrvMsg's reply subject: " + replySubject); 66 67 try { 68 TibrvMsgField field = msg.getField("sendData"); 69 String sendData = (String) field.data; 70 System.out.println("sendData: " + sendData); 71 72 TibrvMsg replyMsg = new TibrvMsg(); 73 replyMsg.setSendSubject(replySubject); 74 replyMsg.update("replyData", "Nice to meet you."); 75 transport.send(replyMsg); 76 } catch (TibrvException e) { 77 e.printStackTrace(); 78 } 79 } 80 81 int loadParameters(String[] args) 82 { 83 int i=0; 84 while(i < args.length-1 && args[i].startsWith("-")) 85 { 86 if (args[i].equals("-service")) 87 { 88 service = args[i+1]; 89 i += 2; 90 } 91 else 92 if (args[i].equals("-network")) 93 { 94 network = args[i+1]; 95 i += 2; 96 } 97 else 98 if (args[i].equals("-daemon")) 99 { 100 daemon = args[i+1]; 101 i += 2; 102 } 103 else 104 usage(); 105 } 106 return i; 107 } 108 109 void usage() 110 { 111 System.err.println("Usage: java idv.steven.rv.Server [-service service] [-network network]"); 112 System.err.println(" [-daemon daemon] <subject>"); 113 System.exit(-1); 114 } 115 116 public static void main(String[] args) { 117 new Server().run(args); 118 } 119 }
要執行上面的程式,可於命令列下如下指令:
java idv.steven.rv.Server -service 7500 -network ;225.1.1.1 -daemon tcp:7500 TEST.RV
這指令指出,UDP 的廣播網址是 225.1.1.1,client 端要連線到 daemon 的話,使用 TCP 7500 port,傾聽的 subject 是 TEST.RV,所以,client 端送出的訊息的 subject 也要是 TEST.RV。關於 service、network、daemon 的詳細說明可以參考 Oracle 官網上的說明 http://docs.oracle.com/cd/E21455_01/common/tutorials/connector_rendezvous_daemon.html。
server 的程式說明如下:
- Line 12: 這個類別實作了 TibcoMsgCallback 介面,當收到訊息時,RV 會呼叫 onMsg method。
- Line 39: 訊息分派機制,如果沒有寫這一行,RV 的訊息沒辦法分派,上面是呼叫 timedDispatch(timeout),也就是每隔 timeout 的秒數,就會離開這個 method,另一個用法是使用 dispatch(),這個方法就不會 timeout,會一直停在那一行直到有訊息時進行分派。
- Line 73: 將 client 指定的 reply subject 設定給 send subject,這樣 client 端才能收到訊息。
- 同步
接下來,我們先看一下當 client 端選擇同步的模式時,程式要怎麼寫。
1 package idv.steven.rv; 2 3 import com.tibco.tibrv.Tibrv; 4 import com.tibco.tibrv.TibrvException; 5 import com.tibco.tibrv.TibrvMsg; 6 import com.tibco.tibrv.TibrvRvdTransport; 7 import com.tibco.tibrv.TibrvTransport; 8 9 public class SyncClient { 10 private String service = null; 11 private String network = null; 12 private String daemon = null; 13 14 private TibrvTransport transport = null; 15 private double timeout = 5; //second 16 17 public void run(String[] args) { 18 int i = loadParameters(args); 19 if (i > args.length-2) { 20 usage(); 21 return; 22 } 23 24 try { 25 Tibrv.open(Tibrv.IMPL_NATIVE); 26 27 transport = new TibrvRvdTransport(service,network,daemon); 28 29 String subject = args[args.length-2]; 30 String sendData = args[args.length-1]; 31 32 TibrvMsg msg = new TibrvMsg(); 33 msg.setSendSubject(subject); 34 msg.update("sendData", sendData); 35 36 TibrvMsg replyMsg = null; 37 replyMsg = transport.sendRequest(msg, timeout); 38 39 if (replyMsg == null) 40 System.out.println("request time-out"); 41 else 42 System.out.println("Receive reply msg:" + replyMsg); 43 } catch (TibrvException e) { 44 e.printStackTrace(); 45 } 46 finally { 47 try { 48 Tibrv.close(); 49 } catch (TibrvException e) { 50 e.printStackTrace(); 51 } 52 } 53 } 54 55 int loadParameters(String[] args) 56 { 57 int i=0; 58 while(i < args.length-1 && args[i].startsWith("-")) 59 { 60 if (args[i].equals("-service")) 61 { 62 service = args[i+1]; 63 i += 2; 64 } 65 else 66 if (args[i].equals("-network")) 67 { 68 network = args[i+1]; 69 i += 2; 70 } 71 else 72 if (args[i].equals("-daemon")) 73 { 74 daemon = args[i+1]; 75 i += 2; 76 } 77 else 78 usage(); 79 } 80 return i; 81 } 82 83 void usage() 84 { 85 System.err.println("Usage: java idv.steven.rv.Client [-service service] [-network network]"); 86 System.err.println(" [-daemon daemon] <subject> <messages>"); 87 System.exit(-1); 88 } 89 90 public static void main(String[] args) { 91 new SyncClient().run(args); 92 } 93 94 }
要執行上面的程式,可於命令列下如下指令:
java idv.steven.rv.SyncClient -service 7500 -network ;225.1.1.1 -daemon tcp:7500 TEST.RV Hello
這裡只是簡單的傳送一個 Hello 訊息給 server,server 收到後會透過 System.out.println 印出來,接下說說明一下上面的程式:
- Line 37: 送訊息給 server 時,使用 sendRquest,並等待 server 回覆訊息,等待的時間是 timeout 指定的秒數。
- Line 48: 程式結束前,記得關閉 Tibco 回收資源,否則程式無法正常結束。
請注意看 server 的紅色部份 (line 73、75),這裡不管 client 送過來的訊息是以 request/reply 方式傳送,或是以 publish/subscribe 方式傳送,都用同樣的方式回覆,但是,在 Tibco 的官方文件裡,這種方式只用在 publish/subscribe,而 request/reply 不需要 setReplySubject,send 則要改為 sendReply。這個範例程式雖然可以收到並回覆訊息,但是否會有意料之外的事發生? 我無法確定!
另外,注意看 server 收到 client 傳來的訊息時,client msg 的 reply subject 是由 TibcoRV 產生的,會是類似 _INBOX.C0A80064.5548C4953B2.1 這樣以 _INBOX 開頭的字串,_INBOX 是 TibcoRV 的保留字,將這個 reply subject 設到要回傳的訊息的 send subject,就可以將訊息傳回原來的 client 程式,但是,如上面所說,這是非正規的寫法。
- 非同步
非同步的 client 端寫法很類似 server 端,程式如下:
1 package idv.steven.rv; 2 3 import com.tibco.tibrv.Tibrv; 4 import com.tibco.tibrv.TibrvException; 5 import com.tibco.tibrv.TibrvListener; 6 import com.tibco.tibrv.TibrvMsg; 7 import com.tibco.tibrv.TibrvMsgCallback; 8 import com.tibco.tibrv.TibrvMsgField; 9 import com.tibco.tibrv.TibrvRvdTransport; 10 import com.tibco.tibrv.TibrvTransport; 11 12 public class AsyncClient implements TibrvMsgCallback { 13 private String service = null; 14 private String network = null; 15 private String daemon = null; 16 17 private TibrvTransport transport = null; 18 private boolean running = true; 19 20 public void run(String[] args) { 21 int i = loadParameters(args); 22 if (i > args.length-2) { 23 usage(); 24 return; 25 } 26 27 try { 28 Tibrv.open(Tibrv.IMPL_NATIVE); 29 30 transport = new TibrvRvdTransport(service,network,daemon); 31 32 String subject = args[args.length-2]; 33 String sendData = args[args.length-1]; 34 String replySubject = transport.createInbox(); 35 new TibrvListener(Tibrv.defaultQueue(), this, transport, replySubject, null); 36 37 TibrvMsg msg = new TibrvMsg(); 38 msg.setSendSubject(subject); 39 msg.setReplySubject(replySubject); 40 msg.update("sendData", sendData); 41 transport.send(msg); 42 43 while (running) { 44 Tibrv.defaultQueue().dispatch(); 45 } 46 } catch (TibrvException | InterruptedException e) { 47 e.printStackTrace(); 48 } 49 finally { 50 try { 51 Tibrv.close(); 52 } catch (TibrvException e) { 53 e.printStackTrace(); 54 } 55 } 56 } 57 58 @Override 59 public void onMsg(TibrvListener listener, TibrvMsg msg) { 60 System.out.println("subject: " + listener.getSubject()); 61 62 try { 63 TibrvMsgField field; 64 field = msg.getField("replyData"); 65 String replyData = (String) field.data; 66 System.out.println("replyData: " + replyData); 67 } catch (TibrvException e) { 68 e.printStackTrace(); 69 } 70 71 running = false; 72 } 73 74 int loadParameters(String[] args) 75 { 76 int i=0; 77 while(i < args.length-1 && args[i].startsWith("-")) 78 { 79 if (args[i].equals("-service")) 80 { 81 service = args[i+1]; 82 i += 2; 83 } 84 else 85 if (args[i].equals("-network")) 86 { 87 network = args[i+1]; 88 i += 2; 89 } 90 else 91 if (args[i].equals("-daemon")) 92 { 93 daemon = args[i+1]; 94 i += 2; 95 } 96 else 97 usage(); 98 } 99 return i; 100 } 101 102 void usage() 103 { 104 System.err.println("Usage: java idv.steven.rv.Client [-service service] [-network network]"); 105 System.err.println(" [-daemon daemon] <subject> <messages>"); 106 System.exit(-1); 107 } 108 109 public static void main(String[] args) { 110 new AsyncClient().run(args); 111 } 112 }
要執行的話,在命令列打入如下指令:
java idv.steven.rv.AsyncClient -service 7500 -network ;225.1.1.1 -daemon tcp:7500 TEST.RV Hello
程式說明如下:
- Line 12: 實作 TibrvMsgCallback,當收到訊息時 RV 會呼叫 onMsg。
- Line 34: 產生一個 InBox,這樣的話,當送出訊息時,RV 會讓 client 和 server 直接連線,也就是說,Line 35 看起來 client 端是透過傾聽 inbox 這個 subject 來接收訊息,似乎別的程式如果知道這個 inbox 字串的話,也可以傾聽相同的 subject 得到訊息內容,事實上是沒辦法的! 採用 InBox 的模式,RV 會將 server 的訊息直接送給 client 端,而不是用 UDP 群播的方式。
- Line 41: 送出訊息使用的是 send,和同步的方式不同!
- Line 44: 因為是非同步,需要等待 RV 將訊息回送,也要有訊息的分派機制。
訂閱:
意見 (Atom)





