Tibco RV 有提供 request/reply 模式,也有提供 publish/subscribe 模式,這兩種模式的用途分別是,request/reply 用在一對一的狀況下,而 publish/subscribe 則是用在一對多。雖然 request/reply 是用在一對一,Tibco RV 仍提供了同步與非同步兩種模式,在說明同步和非同步之前,先看一下 server 端的程式,如下:
1 package idv.steven.rv; 2 3 import com.tibco.tibrv.Tibrv; 4 import com.tibco.tibrv.TibrvException; 5 import com.tibco.tibrv.TibrvListener; 6 import com.tibco.tibrv.TibrvMsg; 7 import com.tibco.tibrv.TibrvMsgCallback; 8 import com.tibco.tibrv.TibrvMsgField; 9 import com.tibco.tibrv.TibrvRvdTransport; 10 import com.tibco.tibrv.TibrvTransport; 11 12 public class Server implements TibrvMsgCallback { 13 private TibrvTransport transport = null; 14 15 private String service = null; 16 private String network = null; 17 private String daemon = null; 18 private String subject = null; 19 20 private double server_timeout = 60; 21 22 public void run(String[] args) { 23 boolean eventReceived = false; 24 25 int i = loadParameters(args); 26 if (i > args.length-1) { 27 usage(); 28 return; 29 } 30 31 try { 32 Tibrv.open(Tibrv.IMPL_NATIVE); 33 34 transport = new TibrvRvdTransport(service,network,daemon); 35 subject = args[args.length-1]; 36 new TibrvListener(Tibrv.defaultQueue(), this, transport, subject, null); 37 38 while (!eventReceived) { 39 eventReceived = Tibrv.defaultQueue().timedDispatch(server_timeout); 40 if (eventReceived) { 41 System.out.println("receive a message"); 42 } 43 else { 44 System.out.println("timeout"); 45 } 46 } 47 } catch (TibrvException | InterruptedException e) { 48 e.printStackTrace(); 49 } 50 finally { 51 try { 52 Tibrv.close(); 53 } catch (TibrvException e) { 54 e.printStackTrace(); 55 } 56 } 57 } 58 59 @Override 60 public void onMsg(TibrvListener listener, TibrvMsg msg) { 61 String replySubject = msg.getReplySubject(); 62 if (replySubject == null) { 63 System.out.println("no reply subject,discard client's request"); 64 return; 65 }
System.out.println("TibrvMsg's reply subject: " + replySubject); 66 67 try { 68 TibrvMsgField field = msg.getField("sendData"); 69 String sendData = (String) field.data; 70 System.out.println("sendData: " + sendData); 71 72 TibrvMsg replyMsg = new TibrvMsg(); 73 replyMsg.setSendSubject(replySubject); 74 replyMsg.update("replyData", "Nice to meet you."); 75 transport.send(replyMsg); 76 } catch (TibrvException e) { 77 e.printStackTrace(); 78 } 79 } 80 81 int loadParameters(String[] args) 82 { 83 int i=0; 84 while(i < args.length-1 && args[i].startsWith("-")) 85 { 86 if (args[i].equals("-service")) 87 { 88 service = args[i+1]; 89 i += 2; 90 } 91 else 92 if (args[i].equals("-network")) 93 { 94 network = args[i+1]; 95 i += 2; 96 } 97 else 98 if (args[i].equals("-daemon")) 99 { 100 daemon = args[i+1]; 101 i += 2; 102 } 103 else 104 usage(); 105 } 106 return i; 107 } 108 109 void usage() 110 { 111 System.err.println("Usage: java idv.steven.rv.Server [-service service] [-network network]"); 112 System.err.println(" [-daemon daemon] <subject>"); 113 System.exit(-1); 114 } 115 116 public static void main(String[] args) { 117 new Server().run(args); 118 } 119 }
要執行上面的程式,可於命令列下如下指令:
java idv.steven.rv.Server -service 7500 -network ;225.1.1.1 -daemon tcp:7500 TEST.RV
這指令指出,UDP 的廣播網址是 225.1.1.1,client 端要連線到 daemon 的話,使用 TCP 7500 port,傾聽的 subject 是 TEST.RV,所以,client 端送出的訊息的 subject 也要是 TEST.RV。關於 service、network、daemon 的詳細說明可以參考 Oracle 官網上的說明 http://docs.oracle.com/cd/E21455_01/common/tutorials/connector_rendezvous_daemon.html。
server 的程式說明如下:
- Line 12: 這個類別實作了 TibcoMsgCallback 介面,當收到訊息時,RV 會呼叫 onMsg method。
- Line 39: 訊息分派機制,如果沒有寫這一行,RV 的訊息沒辦法分派,上面是呼叫 timedDispatch(timeout),也就是每隔 timeout 的秒數,就會離開這個 method,另一個用法是使用 dispatch(),這個方法就不會 timeout,會一直停在那一行直到有訊息時進行分派。
- Line 73: 將 client 指定的 reply subject 設定給 send subject,這樣 client 端才能收到訊息。
- 同步
接下來,我們先看一下當 client 端選擇同步的模式時,程式要怎麼寫。
1 package idv.steven.rv; 2 3 import com.tibco.tibrv.Tibrv; 4 import com.tibco.tibrv.TibrvException; 5 import com.tibco.tibrv.TibrvMsg; 6 import com.tibco.tibrv.TibrvRvdTransport; 7 import com.tibco.tibrv.TibrvTransport; 8 9 public class SyncClient { 10 private String service = null; 11 private String network = null; 12 private String daemon = null; 13 14 private TibrvTransport transport = null; 15 private double timeout = 5; //second 16 17 public void run(String[] args) { 18 int i = loadParameters(args); 19 if (i > args.length-2) { 20 usage(); 21 return; 22 } 23 24 try { 25 Tibrv.open(Tibrv.IMPL_NATIVE); 26 27 transport = new TibrvRvdTransport(service,network,daemon); 28 29 String subject = args[args.length-2]; 30 String sendData = args[args.length-1]; 31 32 TibrvMsg msg = new TibrvMsg(); 33 msg.setSendSubject(subject); 34 msg.update("sendData", sendData); 35 36 TibrvMsg replyMsg = null; 37 replyMsg = transport.sendRequest(msg, timeout); 38 39 if (replyMsg == null) 40 System.out.println("request time-out"); 41 else 42 System.out.println("Receive reply msg:" + replyMsg); 43 } catch (TibrvException e) { 44 e.printStackTrace(); 45 } 46 finally { 47 try { 48 Tibrv.close(); 49 } catch (TibrvException e) { 50 e.printStackTrace(); 51 } 52 } 53 } 54 55 int loadParameters(String[] args) 56 { 57 int i=0; 58 while(i < args.length-1 && args[i].startsWith("-")) 59 { 60 if (args[i].equals("-service")) 61 { 62 service = args[i+1]; 63 i += 2; 64 } 65 else 66 if (args[i].equals("-network")) 67 { 68 network = args[i+1]; 69 i += 2; 70 } 71 else 72 if (args[i].equals("-daemon")) 73 { 74 daemon = args[i+1]; 75 i += 2; 76 } 77 else 78 usage(); 79 } 80 return i; 81 } 82 83 void usage() 84 { 85 System.err.println("Usage: java idv.steven.rv.Client [-service service] [-network network]"); 86 System.err.println(" [-daemon daemon] <subject> <messages>"); 87 System.exit(-1); 88 } 89 90 public static void main(String[] args) { 91 new SyncClient().run(args); 92 } 93 94 }
要執行上面的程式,可於命令列下如下指令:
java idv.steven.rv.SyncClient -service 7500 -network ;225.1.1.1 -daemon tcp:7500 TEST.RV Hello
這裡只是簡單的傳送一個 Hello 訊息給 server,server 收到後會透過 System.out.println 印出來,接下說說明一下上面的程式:
- Line 37: 送訊息給 server 時,使用 sendRquest,並等待 server 回覆訊息,等待的時間是 timeout 指定的秒數。
- Line 48: 程式結束前,記得關閉 Tibco 回收資源,否則程式無法正常結束。
請注意看 server 的紅色部份 (line 73、75),這裡不管 client 送過來的訊息是以 request/reply 方式傳送,或是以 publish/subscribe 方式傳送,都用同樣的方式回覆,但是,在 Tibco 的官方文件裡,這種方式只用在 publish/subscribe,而 request/reply 不需要 setReplySubject,send 則要改為 sendReply。這個範例程式雖然可以收到並回覆訊息,但是否會有意料之外的事發生? 我無法確定!
另外,注意看 server 收到 client 傳來的訊息時,client msg 的 reply subject 是由 TibcoRV 產生的,會是類似 _INBOX.C0A80064.5548C4953B2.1 這樣以 _INBOX 開頭的字串,_INBOX 是 TibcoRV 的保留字,將這個 reply subject 設到要回傳的訊息的 send subject,就可以將訊息傳回原來的 client 程式,但是,如上面所說,這是非正規的寫法。
- 非同步
非同步的 client 端寫法很類似 server 端,程式如下:
1 package idv.steven.rv; 2 3 import com.tibco.tibrv.Tibrv; 4 import com.tibco.tibrv.TibrvException; 5 import com.tibco.tibrv.TibrvListener; 6 import com.tibco.tibrv.TibrvMsg; 7 import com.tibco.tibrv.TibrvMsgCallback; 8 import com.tibco.tibrv.TibrvMsgField; 9 import com.tibco.tibrv.TibrvRvdTransport; 10 import com.tibco.tibrv.TibrvTransport; 11 12 public class AsyncClient implements TibrvMsgCallback { 13 private String service = null; 14 private String network = null; 15 private String daemon = null; 16 17 private TibrvTransport transport = null; 18 private boolean running = true; 19 20 public void run(String[] args) { 21 int i = loadParameters(args); 22 if (i > args.length-2) { 23 usage(); 24 return; 25 } 26 27 try { 28 Tibrv.open(Tibrv.IMPL_NATIVE); 29 30 transport = new TibrvRvdTransport(service,network,daemon); 31 32 String subject = args[args.length-2]; 33 String sendData = args[args.length-1]; 34 String replySubject = transport.createInbox(); 35 new TibrvListener(Tibrv.defaultQueue(), this, transport, replySubject, null); 36 37 TibrvMsg msg = new TibrvMsg(); 38 msg.setSendSubject(subject); 39 msg.setReplySubject(replySubject); 40 msg.update("sendData", sendData); 41 transport.send(msg); 42 43 while (running) { 44 Tibrv.defaultQueue().dispatch(); 45 } 46 } catch (TibrvException | InterruptedException e) { 47 e.printStackTrace(); 48 } 49 finally { 50 try { 51 Tibrv.close(); 52 } catch (TibrvException e) { 53 e.printStackTrace(); 54 } 55 } 56 } 57 58 @Override 59 public void onMsg(TibrvListener listener, TibrvMsg msg) { 60 System.out.println("subject: " + listener.getSubject()); 61 62 try { 63 TibrvMsgField field; 64 field = msg.getField("replyData"); 65 String replyData = (String) field.data; 66 System.out.println("replyData: " + replyData); 67 } catch (TibrvException e) { 68 e.printStackTrace(); 69 } 70 71 running = false; 72 } 73 74 int loadParameters(String[] args) 75 { 76 int i=0; 77 while(i < args.length-1 && args[i].startsWith("-")) 78 { 79 if (args[i].equals("-service")) 80 { 81 service = args[i+1]; 82 i += 2; 83 } 84 else 85 if (args[i].equals("-network")) 86 { 87 network = args[i+1]; 88 i += 2; 89 } 90 else 91 if (args[i].equals("-daemon")) 92 { 93 daemon = args[i+1]; 94 i += 2; 95 } 96 else 97 usage(); 98 } 99 return i; 100 } 101 102 void usage() 103 { 104 System.err.println("Usage: java idv.steven.rv.Client [-service service] [-network network]"); 105 System.err.println(" [-daemon daemon] <subject> <messages>"); 106 System.exit(-1); 107 } 108 109 public static void main(String[] args) { 110 new AsyncClient().run(args); 111 } 112 }
要執行的話,在命令列打入如下指令:
java idv.steven.rv.AsyncClient -service 7500 -network ;225.1.1.1 -daemon tcp:7500 TEST.RV Hello
程式說明如下:
- Line 12: 實作 TibrvMsgCallback,當收到訊息時 RV 會呼叫 onMsg。
- Line 34: 產生一個 InBox,這樣的話,當送出訊息時,RV 會讓 client 和 server 直接連線,也就是說,Line 35 看起來 client 端是透過傾聽 inbox 這個 subject 來接收訊息,似乎別的程式如果知道這個 inbox 字串的話,也可以傾聽相同的 subject 得到訊息內容,事實上是沒辦法的! 採用 InBox 的模式,RV 會將 server 的訊息直接送給 client 端,而不是用 UDP 群播的方式。
- Line 41: 送出訊息使用的是 send,和同步的方式不同!
- Line 44: 因為是非同步,需要等待 RV 將訊息回送,也要有訊息的分派機制。
沒有留言:
張貼留言