Tibco RV 有提供 request/reply 模式,也有提供 publish/subscribe 模式,這兩種模式的用途分別是,request/reply 用在一對一的狀況下,而 publish/subscribe 則是用在一對多。雖然 request/reply 是用在一對一,Tibco RV 仍提供了同步與非同步兩種模式,在說明同步和非同步之前,先看一下 server 端的程式,如下:
1 package idv.steven.rv;
2
3 import com.tibco.tibrv.Tibrv;
4 import com.tibco.tibrv.TibrvException;
5 import com.tibco.tibrv.TibrvListener;
6 import com.tibco.tibrv.TibrvMsg;
7 import com.tibco.tibrv.TibrvMsgCallback;
8 import com.tibco.tibrv.TibrvMsgField;
9 import com.tibco.tibrv.TibrvRvdTransport;
10 import com.tibco.tibrv.TibrvTransport;
11
12 public class Server implements TibrvMsgCallback {
13 private TibrvTransport transport = null;
14
15 private String service = null;
16 private String network = null;
17 private String daemon = null;
18 private String subject = null;
19
20 private double server_timeout = 60;
21
22 public void run(String[] args) {
23 boolean eventReceived = false;
24
25 int i = loadParameters(args);
26 if (i > args.length-1) {
27 usage();
28 return;
29 }
30
31 try {
32 Tibrv.open(Tibrv.IMPL_NATIVE);
33
34 transport = new TibrvRvdTransport(service,network,daemon);
35 subject = args[args.length-1];
36 new TibrvListener(Tibrv.defaultQueue(), this, transport, subject, null);
37
38 while (!eventReceived) {
39 eventReceived = Tibrv.defaultQueue().timedDispatch(server_timeout);
40 if (eventReceived) {
41 System.out.println("receive a message");
42 }
43 else {
44 System.out.println("timeout");
45 }
46 }
47 } catch (TibrvException | InterruptedException e) {
48 e.printStackTrace();
49 }
50 finally {
51 try {
52 Tibrv.close();
53 } catch (TibrvException e) {
54 e.printStackTrace();
55 }
56 }
57 }
58
59 @Override
60 public void onMsg(TibrvListener listener, TibrvMsg msg) {
61 String replySubject = msg.getReplySubject();
62 if (replySubject == null) {
63 System.out.println("no reply subject,discard client's request");
64 return;
65 } System.out.println("TibrvMsg's reply subject: " + replySubject);
66
67 try {
68 TibrvMsgField field = msg.getField("sendData");
69 String sendData = (String) field.data;
70 System.out.println("sendData: " + sendData);
71
72 TibrvMsg replyMsg = new TibrvMsg();
73 replyMsg.setSendSubject(replySubject);
74 replyMsg.update("replyData", "Nice to meet you.");
75 transport.send(replyMsg);
76 } catch (TibrvException e) {
77 e.printStackTrace();
78 }
79 }
80
81 int loadParameters(String[] args)
82 {
83 int i=0;
84 while(i < args.length-1 && args[i].startsWith("-"))
85 {
86 if (args[i].equals("-service"))
87 {
88 service = args[i+1];
89 i += 2;
90 }
91 else
92 if (args[i].equals("-network"))
93 {
94 network = args[i+1];
95 i += 2;
96 }
97 else
98 if (args[i].equals("-daemon"))
99 {
100 daemon = args[i+1];
101 i += 2;
102 }
103 else
104 usage();
105 }
106 return i;
107 }
108
109 void usage()
110 {
111 System.err.println("Usage: java idv.steven.rv.Server [-service service] [-network network]");
112 System.err.println(" [-daemon daemon] <subject>");
113 System.exit(-1);
114 }
115
116 public static void main(String[] args) {
117 new Server().run(args);
118 }
119 }要執行上面的程式,可於命令列下如下指令:
java idv.steven.rv.Server -service 7500 -network ;225.1.1.1 -daemon tcp:7500 TEST.RV
server 的程式說明如下:
- Line 12: 這個類別實作了 TibcoMsgCallback 介面,當收到訊息時,RV 會呼叫 onMsg method。
- Line 39: 訊息分派機制,如果沒有寫這一行,RV 的訊息沒辦法分派,上面是呼叫 timedDispatch(timeout),也就是每隔 timeout 的秒數,就會離開這個 method,另一個用法是使用 dispatch(),這個方法就不會 timeout,會一直停在那一行直到有訊息時進行分派。
- Line 73: 將 client 指定的 reply subject 設定給 send subject,這樣 client 端才能收到訊息。
接下來,我們先看一下當 client 端選擇同步的模式時,程式要怎麼寫。
1 package idv.steven.rv;
2
3 import com.tibco.tibrv.Tibrv;
4 import com.tibco.tibrv.TibrvException;
5 import com.tibco.tibrv.TibrvMsg;
6 import com.tibco.tibrv.TibrvRvdTransport;
7 import com.tibco.tibrv.TibrvTransport;
8
9 public class SyncClient {
10 private String service = null;
11 private String network = null;
12 private String daemon = null;
13
14 private TibrvTransport transport = null;
15 private double timeout = 5; //second
16
17 public void run(String[] args) {
18 int i = loadParameters(args);
19 if (i > args.length-2) {
20 usage();
21 return;
22 }
23
24 try {
25 Tibrv.open(Tibrv.IMPL_NATIVE);
26
27 transport = new TibrvRvdTransport(service,network,daemon);
28
29 String subject = args[args.length-2];
30 String sendData = args[args.length-1];
31
32 TibrvMsg msg = new TibrvMsg();
33 msg.setSendSubject(subject);
34 msg.update("sendData", sendData);
35
36 TibrvMsg replyMsg = null;
37 replyMsg = transport.sendRequest(msg, timeout);
38
39 if (replyMsg == null)
40 System.out.println("request time-out");
41 else
42 System.out.println("Receive reply msg:" + replyMsg);
43 } catch (TibrvException e) {
44 e.printStackTrace();
45 }
46 finally {
47 try {
48 Tibrv.close();
49 } catch (TibrvException e) {
50 e.printStackTrace();
51 }
52 }
53 }
54
55 int loadParameters(String[] args)
56 {
57 int i=0;
58 while(i < args.length-1 && args[i].startsWith("-"))
59 {
60 if (args[i].equals("-service"))
61 {
62 service = args[i+1];
63 i += 2;
64 }
65 else
66 if (args[i].equals("-network"))
67 {
68 network = args[i+1];
69 i += 2;
70 }
71 else
72 if (args[i].equals("-daemon"))
73 {
74 daemon = args[i+1];
75 i += 2;
76 }
77 else
78 usage();
79 }
80 return i;
81 }
82
83 void usage()
84 {
85 System.err.println("Usage: java idv.steven.rv.Client [-service service] [-network network]");
86 System.err.println(" [-daemon daemon] <subject> <messages>");
87 System.exit(-1);
88 }
89
90 public static void main(String[] args) {
91 new SyncClient().run(args);
92 }
93
94 }要執行上面的程式,可於命令列下如下指令:
java idv.steven.rv.SyncClient -service 7500 -network ;225.1.1.1 -daemon tcp:7500 TEST.RV Hello
這裡只是簡單的傳送一個 Hello 訊息給 server,server 收到後會透過 System.out.println 印出來,接下說說明一下上面的程式:
- Line 37: 送訊息給 server 時,使用 sendRquest,並等待 server 回覆訊息,等待的時間是 timeout 指定的秒數。
- Line 48: 程式結束前,記得關閉 Tibco 回收資源,否則程式無法正常結束。
請注意看 server 的紅色部份 (line 73、75),這裡不管 client 送過來的訊息是以 request/reply 方式傳送,或是以 publish/subscribe 方式傳送,都用同樣的方式回覆,但是,在 Tibco 的官方文件裡,這種方式只用在 publish/subscribe,而 request/reply 不需要 setReplySubject,send 則要改為 sendReply。這個範例程式雖然可以收到並回覆訊息,但是否會有意料之外的事發生? 我無法確定!
另外,注意看 server 收到 client 傳來的訊息時,client msg 的 reply subject 是由 TibcoRV 產生的,會是類似 _INBOX.C0A80064.5548C4953B2.1 這樣以 _INBOX 開頭的字串,_INBOX 是 TibcoRV 的保留字,將這個 reply subject 設到要回傳的訊息的 send subject,就可以將訊息傳回原來的 client 程式,但是,如上面所說,這是非正規的寫法。
非同步的 client 端寫法很類似 server 端,程式如下:
1 package idv.steven.rv;
2
3 import com.tibco.tibrv.Tibrv;
4 import com.tibco.tibrv.TibrvException;
5 import com.tibco.tibrv.TibrvListener;
6 import com.tibco.tibrv.TibrvMsg;
7 import com.tibco.tibrv.TibrvMsgCallback;
8 import com.tibco.tibrv.TibrvMsgField;
9 import com.tibco.tibrv.TibrvRvdTransport;
10 import com.tibco.tibrv.TibrvTransport;
11
12 public class AsyncClient implements TibrvMsgCallback {
13 private String service = null;
14 private String network = null;
15 private String daemon = null;
16
17 private TibrvTransport transport = null;
18 private boolean running = true;
19
20 public void run(String[] args) {
21 int i = loadParameters(args);
22 if (i > args.length-2) {
23 usage();
24 return;
25 }
26
27 try {
28 Tibrv.open(Tibrv.IMPL_NATIVE);
29
30 transport = new TibrvRvdTransport(service,network,daemon);
31
32 String subject = args[args.length-2];
33 String sendData = args[args.length-1];
34 String replySubject = transport.createInbox();
35 new TibrvListener(Tibrv.defaultQueue(), this, transport, replySubject, null);
36
37 TibrvMsg msg = new TibrvMsg();
38 msg.setSendSubject(subject);
39 msg.setReplySubject(replySubject);
40 msg.update("sendData", sendData);
41 transport.send(msg);
42
43 while (running) {
44 Tibrv.defaultQueue().dispatch();
45 }
46 } catch (TibrvException | InterruptedException e) {
47 e.printStackTrace();
48 }
49 finally {
50 try {
51 Tibrv.close();
52 } catch (TibrvException e) {
53 e.printStackTrace();
54 }
55 }
56 }
57
58 @Override
59 public void onMsg(TibrvListener listener, TibrvMsg msg) {
60 System.out.println("subject: " + listener.getSubject());
61
62 try {
63 TibrvMsgField field;
64 field = msg.getField("replyData");
65 String replyData = (String) field.data;
66 System.out.println("replyData: " + replyData);
67 } catch (TibrvException e) {
68 e.printStackTrace();
69 }
70
71 running = false;
72 }
73
74 int loadParameters(String[] args)
75 {
76 int i=0;
77 while(i < args.length-1 && args[i].startsWith("-"))
78 {
79 if (args[i].equals("-service"))
80 {
81 service = args[i+1];
82 i += 2;
83 }
84 else
85 if (args[i].equals("-network"))
86 {
87 network = args[i+1];
88 i += 2;
89 }
90 else
91 if (args[i].equals("-daemon"))
92 {
93 daemon = args[i+1];
94 i += 2;
95 }
96 else
97 usage();
98 }
99 return i;
100 }
101
102 void usage()
103 {
104 System.err.println("Usage: java idv.steven.rv.Client [-service service] [-network network]");
105 System.err.println(" [-daemon daemon] <subject> <messages>");
106 System.exit(-1);
107 }
108
109 public static void main(String[] args) {
110 new AsyncClient().run(args);
111 }
112 }要執行的話,在命令列打入如下指令:
java idv.steven.rv.AsyncClient -service 7500 -network ;225.1.1.1 -daemon tcp:7500 TEST.RV Hello
程式說明如下:
- Line 12: 實作 TibrvMsgCallback,當收到訊息時 RV 會呼叫 onMsg。
- Line 34: 產生一個 InBox,這樣的話,當送出訊息時,RV 會讓 client 和 server 直接連線,也就是說,Line 35 看起來 client 端是透過傾聽 inbox 這個 subject 來接收訊息,似乎別的程式如果知道這個 inbox 字串的話,也可以傾聽相同的 subject 得到訊息內容,事實上是沒辦法的! 採用 InBox 的模式,RV 會將 server 的訊息直接送給 client 端,而不是用 UDP 群播的方式。
- Line 41: 送出訊息使用的是 send,和同步的方式不同!
- Line 44: 因為是非同步,需要等待 RV 將訊息回送,也要有訊息的分派機制。