Google Code Prettify

顯示具有 TibcoRV 標籤的文章。 顯示所有文章
顯示具有 TibcoRV 標籤的文章。 顯示所有文章

2015年7月10日 星期五

Tibco RV - fault tolerance

有很多系統不允許(在某個特定時段)服務中斷,這類的系統不只會有備援,通常還會有 fault tolerance 機制,當系統掛掉時,備援的系統會自動啟動服務。Tibco RV 也有提供這樣的機制。

在了解程式如何編寫之前,我先對 RV 提供的 fault tolerance 作簡單的說明。
  • group name
RV 的 fault tolerance 以 group name 來區分,也就是同一個 group  name 的程式會成為一個群組,相互備援。
  • active & inactive
RV 將 fault tolerance 的系統狀態分成三個階段,如下:
    1. DEACTIVATE: 顧名思義,就是非屬於提供服務的狀態,這時候系統當然就別做什麼事,只要一直傾聽 RV 的訊息,當傾聽到要切換狀態時再進行相關工作。
    2. PREPARE_TO_ACTIVATE: 當 RV 送來這個狀態,表示原本正提供服務的系統可能出了狀況,備援的系統要開始準備啟動,通常收到這個 event 時,會進行一些系統資源初始化的工作,以便當程式真的要啟動提供服務時,可以儘快啟動,以使服務中斷的時間儘可能的縮短。
    3. ACTIVATE: 收到這個 event,就真的要接手服務了!
  • fault tolerance callback function
上述三個狀態的切換,RV 會透過 onFtAction 這個 callback function 通知備援的程式,所以程式要實作 TibrvFtMemberCallback 這個介面。
  • active goal
在我們將測試的例子裡,只會啟動兩個 reciver 程式,一次只會有一個程式提供服務 (接收 generator 傳來的數字並輸出到 console),但是,實務上要有幾支程式處於 ACTIVATE 狀態是可以設定的,這個數字稱為 active goal。
  • weight
每支程式建立 fault tolerance 時,在建構子的參數裡會有個 weight 的參數,這個值由 1 到整數的最大值,數字越大優先權越大,所以,如果有三支程式 A、B、C,其權重 (weight) 分別為 50、30、100,當 RV 要從這三支程式裡挑一支啟動來提供服務時,就會挑權重最大的 C。
  • heart beats
每個處於 ACTIVATE 狀態的程式,會不停的送出 heart beats (心跳) 訊息給其它同群組的程式,以表示它還活著,還在正常提供服務。
這裡舉的例子會有兩支程式,一支命名為 NumberGenerator,很簡單的每秒送出一個累加的數字,另一支命名為 NumberReceiver,會由 RV 接收 NumberGenerator 送出的數字,並輸出到 console。假設接收數字的這個系統是我們負責的系統,是服務不能中斷的,我們以 RV 提供的 fault tolerance 機制,讓 NumberReceiver 萬一掛掉時,可以立即啟動另一個備援的系統,所以,測試時我們會同時啟動兩個 NumberReceiver 先啟動的會成為 active 的服務,後啟動的為備援,先來看一下程式。 
 1 package idv.steven.rv.ft;
 2 
 3 import java.io.UnsupportedEncodingException;
 4 
 5 import com.tibco.tibrv.Tibrv;
 6 import com.tibco.tibrv.TibrvException;
 7 import com.tibco.tibrv.TibrvMsg;
 8 import com.tibco.tibrv.TibrvRvdTransport;
 9 import com.tibco.tibrv.TibrvTransport;
10 
11 public class NumberGenerator {
12     private String service = "7500";
13     private String network = ";225.1.1.1";
14     private String daemon = "tcp:7500";
15     private String subject = "DEMO.FT.NUM";
16     
17     public void run() {
18         try {
19             Tibrv.open(Tibrv.IMPL_NATIVE);
20             TibrvMsg.setStringEncoding("Big5");
21             TibrvTransport transport = new TibrvRvdTransport(service, network, daemon);
22             
23             for(int i=1; i<=1000; i++) {
24                 TibrvMsg msg = new TibrvMsg();
25                 msg.setSendSubject(subject);
26                 msg.update("number", i);
27                 
28                 transport.send(msg);
29                 
30                 try {
31                     Thread.sleep(1000);
32                 } catch (InterruptedException e) {
33                 }
34             }
35             
36             Tibrv.close();
37         } catch (TibrvException | UnsupportedEncodingException e) {
38             e.printStackTrace();
39         }
40     }
41 
42     public static void main(String[] args) {
43         NumberGenerator gen = new NumberGenerator();
44         gen.run();
45         System.out.println("stop");
46     }
47 }
上面的 NumberGenerator.java 很簡單的每秒送出一個累加的數字,由 1 累加到 1000 為止。對於上面程式有不了解的,可以先參考「Tibco RV request/reply 的同步與非同步」,之後再回頭來看 fault tolerance。接下來要進入主題,看一下 fault tolerance 程式。
  1 package idv.steven.rv.ft;
  2 
  3 import com.tibco.tibrv.Tibrv;
  4 import com.tibco.tibrv.TibrvException;
  5 import com.tibco.tibrv.TibrvFtMember;
  6 import com.tibco.tibrv.TibrvFtMemberCallback;
  7 import com.tibco.tibrv.TibrvListener;
  8 import com.tibco.tibrv.TibrvMsg;
  9 import com.tibco.tibrv.TibrvMsgCallback;
 10 import com.tibco.tibrv.TibrvRvdTransport;
 11 
 12 public class NumberReceiver implements TibrvMsgCallback, TibrvFtMemberCallback, Runnable {
 13     private String service = "7500";
 14     private String network = ";225.1.1.1";
 15     private String daemon = "tcp:7500";
 16     private String subject = "DEMO.FT.NUM";
 17     
 18     private String ftservice = "7504";
 19     private String ftnetwork = ";225.1.10.1";
 20     private String ftdaemon = "tcp:7504";
 21     
 22     private String ftgroupName = "DEMO.FT.GROUP";
 23     private int     ftweight = 50;
 24     private int     activeGoalNum = 1;
 25     private double  hbInterval = 1.5;
 26     private double  prepareInterval = 3; 
 27     private double  activateInterval = 4.8;
 28     
 29     private TibrvRvdTransport transport = null;
 30     private TibrvListener listener = null;
 31     
 32     private boolean active = false;
 33     
 34     @Override
 35     public void run() {
 36         try {
 37             Tibrv.open(Tibrv.IMPL_NATIVE);
 38             transport = new TibrvRvdTransport(service, network, daemon);
 39             TibrvRvdTransport fttransport = new TibrvRvdTransport(ftservice, ftnetwork, ftdaemon);
 40             fttransport.setDescription("fault tolerance");
 41             
 42             new TibrvFtMember(Tibrv.defaultQueue(), // TibrvQueue 
 43                       this,                 // TibrvFtMemberCallback
 44                       fttransport,          // TibrvTransport    
 45                       ftgroupName,          // groupName
 46                       ftweight,             // weight
 47                       activeGoalNum,        // activeGoal
 48                       hbInterval,           // heartbeatInterval
 49                       prepareInterval,      // preparationInterval,
 50                                             // Zero is a special value,
 51                                             // indicating that the member does 
 52                                             // not need advance warning to activate 
 53                       activateInterval,     // activationInterval
 54                       null);                // closure
 55             
 56             
 57             while(true) {
 58                 try {
 59                       Tibrv.defaultQueue().dispatch();
 60                 }
 61                 catch (TibrvException e) {
 62                    System.err.println("Exception dispatching default queue:");
 63                    System.exit(0);
 64                 }
 65                 catch(InterruptedException ie) {
 66                    System.exit(0);
 67                 }
 68             }
 69         } catch (TibrvException e) {
 70             e.printStackTrace();
 71         }
 72     }
 73     
 74     void enableListener() {
 75         try {
 76           // Subscribe to subject        
 77             listener = new TibrvListener(Tibrv.defaultQueue(),
 78                                          this, 
 79                                          transport, 
 80                                          subject, 
 81                                          null);
 82             System.out.println("Start Listening on: " + subject);    
 83         }
 84         catch (TibrvException e) {
 85            System.err.println("Failed to create subject listener:");
 86            System.exit(0);
 87         }
 88     }
 89     
 90     void disableListener() {
 91        listener.destroy();
 92        System.out.println("Destroy Listener on Subject: " + subject);    
 93     }
 94 
 95     @Override
 96     public void onFtAction(TibrvFtMember member, String ftgroupName, int action) {
 97         if (action == TibrvFtMember.PREPARE_TO_ACTIVATE) {
 98             System.out.println("TibrvFtMember.PREPARE_TO_ACTIVATE invoked...");
 99             System.out.println("*** PREPARE TO ACTIVATE: " + ftgroupName);
100         }
101         else if (action == TibrvFtMember.ACTIVATE) {
102             System.out.println("TibrvFtMember.ACTIVATE invoked...");
103             System.out.println("*** ACTIVATE: " + ftgroupName);
104             enableListener();
105             active = true;
106         }
107         else if (action == TibrvFtMember.DEACTIVATE) {
108             System.out.println("TibrvFtMember.DEACTIVATE invoked...");
109             System.out.println("*** DEACTIVATE: " + ftgroupName);
110             disableListener();
111             active = false;
112         }
113     }
114 
115     @Override
116     public void onMsg(TibrvListener listener, TibrvMsg msg) {
117         if (subject.equals(listener.getSubject())) {
118             try {
119                 int num = msg.getAsInt("number", 0);
120                 System.out.println("number: " + num);
121             } catch (TibrvException e) {
122                 e.printStackTrace();
123             }
124         }
125     }
126 
127     public static void main(String[] args) throws InterruptedException {
128         NumberReceiver rcv = new NumberReceiver();
129         Thread tRcv = new Thread(rcv);
130         tRcv.start();
131         tRcv.join();
132         System.out.println("stop");
133     }
134 }




上面的程式與 fault tolerance 有關的是第 39~54 行及第 96~113 行,現在說明 NumberReceiver.java 程式,如下:
  • Line 12: 所有參與 fault tolerance 的程式都要實作 TibrvFtMemberCallback。
  • Line 39: 建立一個 fault tolerance 的通道。
  • Line 42: 使這支程式成為群組中的一個成員 (member),這裡面有好幾個和時間有關的參數 - hbInterval、prepareInterval、activeInterval,單位是秒,指的是 heart beat 的時間、進入 PREPARE_TO_ACTIVATE 的時間及切換到 ACTIVATE 的時間,在這裡設定為 1.5、3、4.8,表示每三秒會有一次的心跳訊息,當 3 秒鐘都沒有收到訊息即進入 PREPARE_TO_ACTIVATE 狀態,當 4.8 秒沒有收到心跳訊息,就切換到 ACTIVATE 狀態。
  • Line 74~90: 這兩個 method (enableListener、disableListener) 是用來啟動服務及停止服務。
  • Line 96~113: onFtAction 是 fault tolerance 的 callback function,這裡可以接收到狀態改變的 event。
  • Line 116~125: 收到 NumberGenerator 送來的訊息,將它輸出到 console。
現在進行測試 …
  • 首先啟動 NumberGenerator …
  • 開啟兩個「命令提示字元」的視窗,並同時啟動 NumberReceiver,可以看到其中一個開始提供服務,如下圖:
  • 在左邊原本正在提供服務的程式按 Ctrl-C 以中斷服務,可以看到右邊的程式接手了服務。
  • 再次啟動左邊的程式,左邊的程式將處於 DEACTIVATE 的狀態。
  • 在右邊視窗中按 Ctrl-C 中斷右邊的程式,可以看到左邊的程式又接手服務。

【實戰】
上面的例子程式的狀態都是由 RV 偵測後透過 call back method 通知,也就是說 RV 是主動,程式是被動,但是某些應用裡,程式也可能必須改變狀態,例如正處於 ACTIVATE 狀態的程式發現自己其實有一些問題,並沒有正常運作,希望由同群組的別支程式接手服務,是否有辦法自己變更狀態?

TibrvFtMember 的 method 只有一個 setter method - setWeight,用來變更權重,這對於正處於 ACTIVATE 狀態的程式來說,並沒辦法將自己的運作權力交出,解決的辦法就是呼叫 destroy 退出群組,這樣 RV 就會再由群組中找出一個 DEACTIVATE 的 member,通知他提升到 ACTIVATE。

【日劇 - I'm Home】
以木村拓哉演出的日劇來說,這齣戲的收視率當然偏低,但其實這部日劇還不錯看,以家路久(木村拓哉 飾)的失憶後追尋記憶,逐漸帶出許多隱藏的秘密 … 
山口麻友在劇中演出木村拓哉沒有血緣的女兒,她的眼睛相當特別,像月亮一樣彎彎的 ...

2015年5月5日 星期二

Tibco RV 的 Queue

回顧「Tibco RV request/reply 的同步與非同步」一文裡面的 Server.java,它使用的是 Tibco RV 的 default queue 進行訊息的分派 (line 36、39),這麼做的最大好處是簡單,但是,基於以下理由,一般的應用程式不會使用 default queue,而是會每個應用程式自行建立自己的 queue,理由如下:

  1. 每個 queue 的訊息是以 FIFO 的方式消化掉,如果所有應用程式共用一個,所有的訊息會混雜在一起,會造成塞車。
  2. 不同的 queue 是以 round-robin 的方式消化,default queue 的優先順序是 1,也就是最低的優先順序!
  3. 使用自己建立的 queue,可以設定一些參數,以自己希望的方式消化訊息。

這裡將 Server.java 的訊息分派那一段程式改寫,如下:
             TibrvQueue queue = new TibrvQueue();
             queue.setPriority(10);
             queue.setName("MyQueue");
             queue.setLimitPolicy(TibrvQueue.DISCARD_LAST, 100, 1);
           
             new TibrvListener(queue, this, transport, subject, null);
           
             while (!eventReceived) {
                 eventReceived = queue.timedDispatch(server_timeout);
                 if (eventReceived) {
                     System.out.println("receive a message");
                 }
                 else {
                     System.out.println("timeout");
                 }
             }
從上面的程式,可以看到我們為 queue 設定了比較高的優先順序,也為 queue 取名,以方便 debug,最重要的是,還可以用 setLimitPolicy 設定一些訊息的處理原則,它的三個參數說明如下:
  1. 第一個參數: 當 queue 的訊息滿了,又有新的訊息傳過來,來不及處理時,queue 該如何處理?
  2. 第二個參數: 這個 queue 最多可以放幾個訊息。
  3. 第三個參數: 當要放棄一些訊息時,一次要放棄幾個?




【日劇 - 影子寫手】
同一個故事,每個人看到的面向多不盡相同,這部影集在川原由樹 (水川麻美 飾) 揭露出自己長期當名作家遠野理紗 (中谷美紀 飾) 的槍手後,除了真相被新聞媒體聯手封殺外,她的寫作之路也被各大出版社聯手斬斷了! 最後更在司法上慘敗,面臨鉅額賠償! 這是很正常的,社會的各既得利益者會為保護自己的權益及權力,聯手且不擇手段的打擊任何對手。

台灣這幾年喊的震天響的「轉型正義」,真的要成功,一定會讓目前的許多既得利益階級喪失非常多的特權,這些特權對他們說都是天經地義,原本就屬於他們自己的,就像皇后生下的長子就該立為太子,就該繼承皇位那麼的自然。所以當改革真的開始啟動後,這些既得利益階級(反動勢力?)的反撲是必然的,也絕對是會讓台灣陷入一段時間的動盪、不安。

【每日一字】
class warfare - fighting or disagreement between social classes, usually with the lower classes trying to take power and money away from the upper classes (階級鬥爭)
* Some people think that class warfare is unavoidable and that as the upper classes continue to get richer, the lower classes will star a revolution.
(資料來源: ESL Podcast 573)

2015年2月7日 星期六

Tibco RV request/reply 的同步與非同步

Tibco RV 有提供 request/reply 模式,也有提供 publish/subscribe 模式,這兩種模式的用途分別是,request/reply 用在一對一的狀況下,而 publish/subscribe 則是用在一對多。雖然 request/reply 是用在一對一,Tibco RV 仍提供了同步與非同步兩種模式,在說明同步和非同步之前,先看一下 server 端的程式,如下:
  1 package idv.steven.rv;
  2 
  3 import com.tibco.tibrv.Tibrv;
  4 import com.tibco.tibrv.TibrvException;
  5 import com.tibco.tibrv.TibrvListener;
  6 import com.tibco.tibrv.TibrvMsg;
  7 import com.tibco.tibrv.TibrvMsgCallback;
  8 import com.tibco.tibrv.TibrvMsgField;
  9 import com.tibco.tibrv.TibrvRvdTransport;
 10 import com.tibco.tibrv.TibrvTransport;
 11 
 12 public class Server implements TibrvMsgCallback {
 13     private TibrvTransport transport = null;
 14     
 15     private String service = null;
 16     private String network = null;
 17     private String daemon  = null;
 18     private String subject = null;
 19     
 20     private double server_timeout = 60;
 21        
 22     public void run(String[] args) {
 23         boolean eventReceived = false;
 24         
 25         int i = loadParameters(args);
 26         if (i > args.length-1) {
 27             usage();
 28             return;
 29         }
 30         
 31         try {
 32             Tibrv.open(Tibrv.IMPL_NATIVE);
 33             
 34             transport = new TibrvRvdTransport(service,network,daemon);
 35             subject = args[args.length-1];
 36             new TibrvListener(Tibrv.defaultQueue(), this, transport, subject, null);
 37             
 38             while (!eventReceived) {
 39                 eventReceived = Tibrv.defaultQueue().timedDispatch(server_timeout);
 40                 if (eventReceived) {
 41                     System.out.println("receive a message");
 42                 }
 43                 else {
 44                     System.out.println("timeout");
 45                 }
 46             }
 47         } catch (TibrvException | InterruptedException e) {
 48             e.printStackTrace();
 49         }
 50         finally {
 51             try {
 52                 Tibrv.close();
 53             } catch (TibrvException e) {
 54                 e.printStackTrace();
 55             }
 56         }
 57     }
 58     
 59     @Override
 60     public void onMsg(TibrvListener listener, TibrvMsg msg) {
 61         String replySubject = msg.getReplySubject();
 62         if (replySubject == null) {
 63             System.out.println("no reply subject,discard client's request");
 64             return;
 65         }
            System.out.println("TibrvMsg's reply subject: " + replySubject);
 66         
 67         try {
 68             TibrvMsgField field = msg.getField("sendData");
 69             String sendData = (String) field.data;
 70             System.out.println("sendData: " + sendData);
 71 
 72             TibrvMsg replyMsg = new TibrvMsg();
 73             replyMsg.setSendSubject(replySubject);
 74             replyMsg.update("replyData", "Nice to meet you.");
 75             transport.send(replyMsg);
 76         } catch (TibrvException e) {
 77             e.printStackTrace();
 78         }
 79     }
 80     
 81     int loadParameters(String[] args)
 82     {
 83         int i=0;
 84         while(i < args.length-1 && args[i].startsWith("-"))
 85         {
 86             if (args[i].equals("-service"))
 87             {
 88                 service = args[i+1];
 89                 i += 2;
 90             }
 91             else
 92             if (args[i].equals("-network"))
 93             {
 94                 network = args[i+1];
 95                 i += 2;
 96             }
 97             else
 98             if (args[i].equals("-daemon"))
 99             {
100                 daemon = args[i+1];
101                 i += 2;
102             }
103             else
104                 usage();
105         }
106         return i;
107     }
108     
109     void usage()
110     {
111         System.err.println("Usage: java idv.steven.rv.Server [-service service] [-network network]");
112         System.err.println("            [-daemon daemon] <subject>");
113         System.exit(-1);
114     }
115 
116     public static void main(String[] args) {
117         new Server().run(args);
118     }
119 }
要執行上面的程式,可於命令列下如下指令:
java idv.steven.rv.Server -service 7500 -network ;225.1.1.1 -daemon tcp:7500 TEST.RV
這指令指出,UDP 的廣播網址是 225.1.1.1,client 端要連線到 daemon 的話,使用 TCP 7500 port,傾聽的 subject 是 TEST.RV,所以,client 端送出的訊息的 subject 也要是 TEST.RV。關於 service、network、daemon 的詳細說明可以參考 Oracle 官網上的說明 http://docs.oracle.com/cd/E21455_01/common/tutorials/connector_rendezvous_daemon.html
server 的程式說明如下:
  1. Line 12: 這個類別實作了 TibcoMsgCallback 介面,當收到訊息時,RV 會呼叫 onMsg method。
  2. Line 39: 訊息分派機制,如果沒有寫這一行,RV 的訊息沒辦法分派,上面是呼叫 timedDispatch(timeout),也就是每隔 timeout 的秒數,就會離開這個 method,另一個用法是使用 dispatch(),這個方法就不會 timeout,會一直停在那一行直到有訊息時進行分派。
  3. Line 73: 將 client 指定的 reply subject 設定給 send subject,這樣 client 端才能收到訊息。


  • 同步
接下來,我們先看一下當 client 端選擇同步的模式時,程式要怎麼寫。
 1 package idv.steven.rv;
 2 
 3 import com.tibco.tibrv.Tibrv;
 4 import com.tibco.tibrv.TibrvException;
 5 import com.tibco.tibrv.TibrvMsg;
 6 import com.tibco.tibrv.TibrvRvdTransport;
 7 import com.tibco.tibrv.TibrvTransport;
 8 
 9 public class SyncClient {
10     private String service = null;
11     private String network = null;
12     private String daemon  = null;
13     
14     private TibrvTransport transport = null;
15     private double timeout = 5; //second
16     
17     public void run(String[] args) {
18         int i = loadParameters(args);
19         if (i > args.length-2) {
20             usage();
21             return;
22         }
23         
24         try {
25             Tibrv.open(Tibrv.IMPL_NATIVE);
26             
27             transport = new TibrvRvdTransport(service,network,daemon);
28             
29             String subject = args[args.length-2];
30             String sendData = args[args.length-1];
31             
32             TibrvMsg msg = new TibrvMsg();
33             msg.setSendSubject(subject);
34             msg.update("sendData", sendData);
35             
36             TibrvMsg replyMsg = null;
37             replyMsg = transport.sendRequest(msg, timeout);
38             
39             if (replyMsg == null)
40                 System.out.println("request time-out");
41             else 
42                 System.out.println("Receive reply msg:" + replyMsg);
43         } catch (TibrvException e) {
44             e.printStackTrace();
45         }
46         finally {
47             try {
48                 Tibrv.close();
49             } catch (TibrvException e) {
50                 e.printStackTrace();
51             }
52         }
53     }
54     
55     int loadParameters(String[] args)
56     {
57         int i=0;
58         while(i < args.length-1 && args[i].startsWith("-"))
59         {
60             if (args[i].equals("-service"))
61             {
62                 service = args[i+1];
63                 i += 2;
64             }
65             else
66             if (args[i].equals("-network"))
67             {
68                 network = args[i+1];
69                 i += 2;
70             }
71             else
72             if (args[i].equals("-daemon"))
73             {
74                 daemon = args[i+1];
75                 i += 2;
76             }
77             else
78                 usage();
79         }
80         return i;
81     }
82     
83     void usage()
84     {
85         System.err.println("Usage: java idv.steven.rv.Client [-service service] [-network network]");
86         System.err.println("            [-daemon daemon] <subject> <messages>");
87         System.exit(-1);
88     }
89 
90     public static void main(String[] args) {
91         new SyncClient().run(args);
92     }
93 
94 }
要執行上面的程式,可於命令列下如下指令:
java idv.steven.rv.SyncClient -service 7500 -network ;225.1.1.1 -daemon tcp:7500 TEST.RV Hello
這裡只是簡單的傳送一個 Hello 訊息給 server,server 收到後會透過 System.out.println 印出來,接下說說明一下上面的程式:
  1. Line 37: 送訊息給 server 時,使用 sendRquest,並等待 server 回覆訊息,等待的時間是 timeout 指定的秒數。
  2. Line 48: 程式結束前,記得關閉 Tibco 回收資源,否則程式無法正常結束。

請注意看 server 的紅色部份 (line 73、75),這裡不管 client 送過來的訊息是以 request/reply 方式傳送,或是以 publish/subscribe 方式傳送,都用同樣的方式回覆,但是,在 Tibco 的官方文件裡,這種方式只用在 publish/subscribe,而 request/reply 不需要 setReplySubject,send 則要改為 sendReply。這個範例程式雖然可以收到並回覆訊息,但是否會有意料之外的事發生? 我無法確定!

另外,注意看 server 收到 client 傳來的訊息時,client msg 的 reply subject 是由 TibcoRV 產生的,會是類似 _INBOX.C0A80064.5548C4953B2.1 這樣以 _INBOX 開頭的字串,_INBOX 是 TibcoRV 的保留字,將這個 reply subject 設到要回傳的訊息的 send subject,就可以將訊息傳回原來的 client 程式,但是,如上面所說,這是非正規的寫法。
  • 非同步
非同步的 client 端寫法很類似 server 端,程式如下:
  1 package idv.steven.rv;
  2 
  3 import com.tibco.tibrv.Tibrv;
  4 import com.tibco.tibrv.TibrvException;
  5 import com.tibco.tibrv.TibrvListener;
  6 import com.tibco.tibrv.TibrvMsg;
  7 import com.tibco.tibrv.TibrvMsgCallback;
  8 import com.tibco.tibrv.TibrvMsgField;
  9 import com.tibco.tibrv.TibrvRvdTransport;
 10 import com.tibco.tibrv.TibrvTransport;
 11 
 12 public class AsyncClient implements TibrvMsgCallback {
 13     private String service = null;
 14     private String network = null;
 15     private String daemon  = null;
 16     
 17     private TibrvTransport transport = null;
 18     private boolean running = true;
 19     
 20     public void run(String[] args) {
 21         int i = loadParameters(args);
 22         if (i > args.length-2) {
 23             usage();
 24             return;
 25         }
 26         
 27         try {
 28             Tibrv.open(Tibrv.IMPL_NATIVE);
 29             
 30             transport = new TibrvRvdTransport(service,network,daemon);
 31             
 32             String subject = args[args.length-2];
 33             String sendData = args[args.length-1];
 34             String replySubject = transport.createInbox();
 35             new TibrvListener(Tibrv.defaultQueue(), this, transport, replySubject, null);
 36             
 37             TibrvMsg msg = new TibrvMsg();
 38             msg.setSendSubject(subject);
 39             msg.setReplySubject(replySubject);
 40             msg.update("sendData", sendData);
 41             transport.send(msg);
 42             
 43             while (running) {
 44                 Tibrv.defaultQueue().dispatch();
 45             }
 46         } catch (TibrvException | InterruptedException e) {
 47             e.printStackTrace();
 48         }
 49         finally {
 50             try {
 51                 Tibrv.close();
 52             } catch (TibrvException e) {
 53                 e.printStackTrace();
 54             }
 55         }
 56     }
 57     
 58     @Override
 59     public void onMsg(TibrvListener listener, TibrvMsg msg) {
 60         System.out.println("subject: " + listener.getSubject());
 61         
 62         try {
 63             TibrvMsgField field;
 64             field = msg.getField("replyData");
 65             String replyData = (String) field.data;
 66             System.out.println("replyData: " + replyData);
 67         } catch (TibrvException e) {
 68             e.printStackTrace();
 69         }
 70         
 71         running = false;
 72     }
 73     
 74     int loadParameters(String[] args)
 75     {
 76         int i=0;
 77         while(i < args.length-1 && args[i].startsWith("-"))
 78         {
 79             if (args[i].equals("-service"))
 80             {
 81                 service = args[i+1];
 82                 i += 2;
 83             }
 84             else
 85             if (args[i].equals("-network"))
 86             {
 87                 network = args[i+1];
 88                 i += 2;
 89             }
 90             else
 91             if (args[i].equals("-daemon"))
 92             {
 93                 daemon = args[i+1];
 94                 i += 2;
 95             }
 96             else
 97                 usage();
 98         }
 99         return i;
100     }
101     
102     void usage()
103     {
104         System.err.println("Usage: java idv.steven.rv.Client [-service service] [-network network]");
105         System.err.println("            [-daemon daemon] <subject> <messages>");
106         System.exit(-1);
107     }
108 
109     public static void main(String[] args) {
110         new AsyncClient().run(args);
111     }
112 }
要執行的話,在命令列打入如下指令:
java idv.steven.rv.AsyncClient -service 7500 -network ;225.1.1.1 -daemon tcp:7500 TEST.RV Hello
程式說明如下:
  1. Line 12: 實作 TibrvMsgCallback,當收到訊息時 RV 會呼叫 onMsg。
  2. Line 34: 產生一個 InBox,這樣的話,當送出訊息時,RV 會讓 client 和 server 直接連線,也就是說,Line 35 看起來 client 端是透過傾聽 inbox 這個 subject 來接收訊息,似乎別的程式如果知道這個 inbox 字串的話,也可以傾聽相同的 subject 得到訊息內容,事實上是沒辦法的! 採用 InBox 的模式,RV 會將 server 的訊息直接送給 client 端,而不是用 UDP 群播的方式。
  3. Line 41: 送出訊息使用的是 send,和同步的方式不同!
  4. Line 44: 因為是非同步,需要等待 RV 將訊息回送,也要有訊息的分派機制。