Tibco EMS 是眾多遵循 JMS (Java Message Service) 規格的產品之一,這裡要整理的 JMS 相關資料,會以 EMS 來寫相關 sample code,所以如果有 EMS 的特異功能,也會儘量說明 (如果我知道的話)。
JMS 提供兩種訊息模式 (Messaging Models) - Point-to-point 及 Publish-and-subscribe (如下圖),這兩種模式有什麼不同?
在開始說明之前,先說明一個習慣說法,一般來說,在左邊送出訊息的稱為"生產者" (producer),在右邊收訊息的稱為"消費者" (consumer)。
- 點對點一定是佇列模式 (queue),生產者送訊息到佇列,消費者由佇列取得訊息。
- 一個佇列可以有多個生產者送訊息過來,有多個消費者來取出訊息,但是,一個訊息是由一個生產者送出到佇列後,只會由一個消費者取得。
- 透過這個方式,在負載平衡上的運用很廣泛,生產者將要處理的訊息放入佇列,如果消費者取出訊息後,需要花比較長的時間處理,就可以創建多個消費者來處理這些訊息,每處理完一個就再去佇列中取出未處理的資料繼續處理,如此就可將負載分配到各個消費者。
- Publish-and-Subscribe (發行者和訂閱者)
- 發佈訂閱是 topic 模式,生產者發佈指定 topic (主題) 的訊息到 message server,所有訂閱該 topic 的消費者即會由 server 收到該訊息。
- 同一個 topic 可以同時有多個消費者產生,產生後也都會送給所有訂閱的消費者。
- 當生產者送出訊息時,消費者必須是在 active 狀態 (等待接收訊息的狀態),消費者才能接到訊息,否則即沒有收到該訊息。
- 如果要確定消費者一定可以收到訊息,可以創建"durable"的消費者(訂閱者),那麼訊息在發佈時,如果該消費者沒有 active,訊息會先放在 server 上,直到出現下列三個條件之一,訊息才會被刪除。(1) 訊息過期 (訊息送出前可以設定過期時間),(2) 訊息被消費者取走,(3) server 的儲存空間超過限制。
- 當 client 重新創建相同的 ID 的 durable 消費者,server 上此消費者有訂閱該 topic 的訊息都會送給這個消費者。
JMS 本身是一個規範,定義了許多介面,由各家廠商實作這些介面,由上面的說明看起來,雖然兩種模式的運作有許多的不同,但是 JMS API 則是差不多的,程式的寫法有差距不大,下表是 Point-to-Point 和 Publis-and-Subscribe 間的差別:
General API | Point-to-Point API | Publish-and-Subscribe API |
ConnectionFactory | QueueConnectionFactory | TopicConnectionFactory |
Destination | Queue | Topic |
Connection | QueueConnection | TopicConnection |
Session | QueueSession | TopicSession |
MessageConsumer | QueueSender | TopicPublisher |
MessageProducer | QueueReceiver | TopicSubscriber |
上面說明的是一些最基本的觀念,在開始寫程式前,當然要先把環境搞定,我在電腦裡,安裝了 Tibco EMS 8.1 development 版本,首先開啟「EMS Administration Tool」,用它來建立一個群組、一個使用者、一個佇列、一個主題(topic),當然要讓這個使用者可以存取佇列和主題。
- connect: 連線到 EMS,預設會用 admin,密碼為空白,EMS 預設的 port 是 7222,如下圖。
- create group ap "programmer": 建立一個命名為 ap 的群組,寫程式的人就加入這個群組。(create group group_name ["description"])
- create user steven "programmer01" password=password: 建立一個帳號 steven,密碼為 password。(create user user_name ["description"] [password=password]
- add member ap steven: 將 steven 加入 ap 群組。(add member group_name user_name [,user2, user3,...])
- create queue queue01: 建立一個命名為 queue01 的佇列。(create queue queue_name [properties])
- grant queue queue01 group=ap send,receive,browse: 授權給群組 ap,可以送訊息到 queue01、從 queue01 收訊息及瀏覽 queue01 的訊息。最後一個參數是權限,有三個值 send、receive、browse,如上面所示,要一次授權多種權限,用逗點分隔。(grant queue queue-name user=name | group=name permissions)
- create topic topic01: 建立一個命名為 topic01 的主題。(create topic topic_name [properties])
- grant topic topic01 group=ap subscribe,publish,durable,use_durable: 授權給群組 ap,可以有 subscribe (訂閱)、publish (發佈)、durable (持久的)、use_durable (使用持久的) 權限。(grant topic topic-name user=name | group=name permissions)
(EMS Administration Tool 的各種指令,請參閱官方文件「TIBCO Enterprise Message Service User's Guide」第六章。)
接下來看看程式怎麼寫 ...
** 生產者
1 package idv.steven.ems;
2
3 import java.util.concurrent.Callable;
4
5 import javax.jms.Connection;
6 import javax.jms.ConnectionFactory;
7 import javax.jms.DeliveryMode;
8 import javax.jms.Destination;
9 import javax.jms.MessageProducer;
10 import javax.jms.Session;
11 import javax.jms.TextMessage;
12
13 public class QueueProducer implements Callable<TextMessage> {
14 private String userName = "steven", password = "password", url = "tcp://localhost:7222";
15 private String queueName = "queue01";
16
17 @Override
18 public TextMessage call() throws Exception {
19 Connection connection = null;
20
21 ConnectionFactory factory = new com.tibco.tibjms.TibjmsConnectionFactory(url);
22 connection = factory.createConnection(userName,password);
23 Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
24 Destination destination = session.createQueue(queueName);
25 MessageProducer msgProducer = session.createProducer(null);
26 TextMessage msg;
27 msg = session.createTextMessage();
28 msg.setText("just for test");
29 msg.setStringProperty("CustomerType","VIP");
30 msg.setIntProperty("value",123);
31 msg.setIntProperty("Age",18);
32 msg.setStringProperty("Location","Taipei");
33 msgProducer.send(destination, msg, DeliveryMode.PERSISTENT, 9, 60000);
34 System.out.println(msg);
35
36 connection.close();
37
38 return msg;
39 }
40 }
程式說明:
- Line 21: 先建立一個連線的工廠類別,由這行程式清楚的看到,各廠商實作了 JMS 的介面。
- Line 22: 由連線工廠類別產生連線。
- Line 23: 由連線產生 session,session 是非常重要的一個工廠類別,由之後的程式可以看到,程式會用這個類別產生 queue、topic、producer、consumer … createSession() 的第二個參數是關於 acknowledge 的設定,關於這個設定,在初學階段都先設定為 AUTO_ACKNOWLEDGE,之後會再說明其它用法。
- Line 24: 不管是佇列模式或主題模式,都會產生這個 Destination 介面的實作類別,現在要用佇列模式,就用 session 產生一個佇列,如果是要用主題模式,就用 session 產生一個主題。這樣生產者送出訊息時,可不用在意是以佇列模式或主題模式送出。
- Line 28~32: 28 行是設定訊息 body 的值,29~32行是設定訊息的屬性值,設定 body 和屬性的 setter method 的名稱差別在於,設定屬性的 setter method 名稱是以 Property 為結尾,請參考「JMS: 訊息的標頭、屬性和種類」。
- Line 33: 將訊息送出到 Destination 介面的物件,在這裡當然就是送到佇列。
** 消費者
1 package idv.steven.ems;
2
3 import java.util.Hashtable;
4 import java.util.concurrent.Callable;
5
6 import javax.jms.Connection;
7 import javax.jms.ConnectionFactory;
8 import javax.jms.Destination;
9 import javax.jms.ExceptionListener;
10 import javax.jms.JMSException;
11 import javax.jms.Message;
12 import javax.jms.MessageConsumer;
13 import javax.jms.Session;
14
15 public class QueueConsumer implements Callable<String>, ExceptionListener {
16 private String userName = "steven", password = "password", url = "tcp://localhost:7222";
17 private String queueName = "queue01";
18
19 @Override
20 public String call() throws Exception {
21 Connection connection = null;
22
23 Hashtable prop = new Hashtable();
24 prop.put("com.tibco.tibjms.reconnect.attemptcount", new Integer(10));
25 prop.put("com.tibco.tibjms.reconnect.attemptdelay", new Integer(1000));
26 ConnectionFactory factory = new com.tibco.tibjms.TibjmsConnectionFactory(url,null,prop);
27 connection = factory.createConnection(userName,password);
28
29 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
30 Destination destination = session.createQueue(queueName);
31 connection.setExceptionListener(this);
32 MessageConsumer msgConsumer = session.createConsumer(destination);
33
34 connection.start();
35
36 Message msg = msgConsumer.receive();
37
38 if (msg != null)
39 System.err.println("Received message: "+ msg);
40
41 connection.close();
42
43 return msg.getJMSMessageID();
44 }
45
46 @Override
47 public void onException(JMSException ex) {
48 ex.printStackTrace();
49 }
50 }
程式說明:
- Line 23~27: 建立連線,這裡在建立過程有傳入一些參數,這會在後續再說明。
- Line 34: 這一行很重要,呼叫 start() 後,才會開始接收資料。
- Line 36: 程式會停在這一行等待訊息到達。
** 測試程式
1 package idv.steven.ems;
2
3 import java.util.concurrent.Callable;
4 import java.util.concurrent.ExecutionException;
5 import java.util.concurrent.FutureTask;
6
7 import javax.jms.JMSException;
8 import javax.jms.TextMessage;
9
10 public class QueueMsg {
11
12 public static void main(String[] args) {
13 QueueProducer producer = new QueueProducer();
14 FutureTask<TextMessage> producerTask = new FutureTask<TextMessage>(producer);
15 Thread tProducer = new Thread(producerTask);
16 tProducer.start();
17
18 try {
19 Thread.sleep(1000);
20 } catch (InterruptedException e1) {
21 }
22
23 Callable<String> consumer = new QueueConsumer();
24 FutureTask<String> consumerTask = new FutureTask<String>(consumer);
25 Thread tConsumer = new Thread(consumerTask);
26 tConsumer.start();
27
28 try {
29 if (producerTask.isDone()) {
30 TextMessage msg = producerTask.get();
31 if (msg != null) {
32 System.out.println("producer's message: " + msg.getText());
33 }
34 }
35
36 try {
37 Thread.sleep(300);
38 } catch (InterruptedException e1) {
39 }
40
41 if (consumerTask.isDone()) {
42 String text = consumerTask.get();
43 System.out.println("consumer's message: " + text);
44 }
45 } catch (InterruptedException | ExecutionException | JMSException e) {
46 e.printStackTrace();
47 }
48 }
49 }
這個測試程式很簡單的先啟動生產者,再啟動消費者,並輸出生產者送出的資料及消費者收到的資料,以確定雙方確實有正確的溝通。
- Publish-and-Subscribe (發佈、訂閱)
** 生產者
1 package idv.steven.ems;
2
3 import java.util.concurrent.Callable;
4
5 import javax.jms.Connection;
6 import javax.jms.ConnectionFactory;
7 import javax.jms.DeliveryMode;
8 import javax.jms.MapMessage;
9 import javax.jms.Session;
10 import javax.jms.Topic;
11 import javax.jms.TopicConnection;
12 import javax.jms.TopicConnectionFactory;
13 import javax.jms.TopicPublisher;
14 import javax.jms.TopicSession;
15
16 import com.tibco.tibjms.Tibjms;
17 import com.tibco.tibjms.TibjmsConnectionFactory;
18 import com.tibco.tibjms.TibjmsTopicConnectionFactory;
19
20 public class MyTopicPublisher implements Callable<String> {
21 private String userName = "steven", password = "paasword", url = "tcp://localhost:7222";
22 private String topicName = "topic01";
23
24 @Override
25 public String call() throws Exception {
26 TopicConnection connection = null;
27
28 Tibjms.setEncoding("Big5");
29 TopicConnectionFactory factory = new TibjmsTopicConnectionFactory(url);
30 connection = factory.createTopicConnection(userName,password);
31 TopicSession session = connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
32 Topic topic = session.createTopic(topicName);
33 TopicPublisher publisher = session.createPublisher(topic);
34
35 MapMessage msg = session.createMapMessage();
36 msg.setInt("age",35);
37 msg.setString("address","台北市瑞湖街68號15樓");
38 msg.setStringProperty("JMS_TIBCO_MSG_TRACE","body");
39 msg.setBooleanProperty("JMS_TIBCO_PRESERVE_UNDELIVERED",true);
40 msg.setIntProperty("Level",123);
41 msg.setStringProperty("Location","Taipei");
42
43 publisher.publish(msg, DeliveryMode.PERSISTENT, 9, 1000000);
44 String msgID = msg.getJMSMessageID();
45
46 connection.close();
47
48 return msgID;
49 }
50 }
程式說明:
- Line 31: 這裡建立的 session,第 2 個參數之後會詳加討論。
- Line 43: 在這裡生產者將訊息發佈出去,這和 31 行間的關係,也會在之後詳加討論。
- Line 44: 每個訊息送出時,server 都會給予一個不會重複的訊息編號。
** 消費者
1 package idv.steven.ems;
2
3 import java.util.Hashtable;
4 import java.util.concurrent.Callable;
5
6 import javax.jms.Connection;
7 import javax.jms.ConnectionFactory;
8 import javax.jms.Destination;
9 import javax.jms.ExceptionListener;
10 import javax.jms.JMSException;
11 import javax.jms.Message;
12 import javax.jms.MessageConsumer;
13 import javax.jms.Session;
14 import javax.jms.Topic;
15 import javax.jms.TopicConnection;
16 import javax.jms.TopicConnectionFactory;
17 import javax.jms.TopicSession;
18 import javax.jms.TopicSubscriber;
19
20 import com.tibco.tibjms.TibjmsTopicConnectionFactory;
21
22 public class MyTopicSubscriber implements Callable<Message>, ExceptionListener {
23 private String userName = "steven", password = "password", url = "tcp://localhost:7222";
24 private String topicName = "topic01";
25
26 @Override
27 public Message call() throws Exception {
28 Message msg = null;
29
30 try {
31 Hashtable prop = new Hashtable();
32 prop.put("com.tibco.tibjms.reconnect.attemptcount", new Integer(2));
33 prop.put("com.tibco.tibjms.reconnect.attemptdelay", new Integer(10));
34
35 TopicConnectionFactory factory = new TibjmsTopicConnectionFactory(url, null, prop);
36 TopicConnection connection = factory.createTopicConnection(userName,password);
37 TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
38 Topic topic = session.createTopic(topicName);
39 TopicSubscriber subscriber = session.createSubscriber(topic);
40
41 connection.setExceptionListener(this);
42 connection.start();
43
44 msg = subscriber.receive();
45
46 if (msg != null)
47 System.out.println("Received message: "+ msg);
48
49 connection.close();
50 } catch (JMSException e) {
51 e.printStackTrace();
52 }
53
54 return msg;
55 }
56
57 @Override
58 public void onException(JMSException e) {
59 e.printStackTrace();
60 }
61 }
程式說明:
- Line 42: 不管是佇列模式、主題模式,都需要呼叫 start() 才會開始收訊息。
- Line 44: 程式會停在這一行以等待傾聽的主題的訊息到達。
** 測試程式
1 package idv.steven.ems;
2
3 import java.util.concurrent.Callable;
4 import java.util.concurrent.ExecutionException;
5 import java.util.concurrent.FutureTask;
6
7 import javax.jms.JMSException;
8 import javax.jms.Message;
9
10 public class TopicMsg {
11
12 public static void main(String[] args) {
13 Callable<Message> consumer = new MyTopicSubscriber();
14 FutureTask<Message> consumerTask = new FutureTask<Message>(consumer);
15 Thread tConsumer = new Thread(consumerTask);
16 tConsumer.start();
17
18 try {
19 Thread.sleep(300);
20 } catch (InterruptedException e1) {
21 }
22
23 Callable<String> producer = new MyTopicPublisher();
24 FutureTask<String> producerTask = new FutureTask<String>(producer);
25 Thread tProducer = new Thread(producerTask);
26 tProducer.start();
27
28 try {
29 Thread.sleep(300);
30 } catch (InterruptedException e1) {
31 }
32
33 if (consumerTask.isDone()) {
34 try {
35 Message msg = consumerTask.get();
36 if (msg != null) {
37 System.out.println("consumer's msgID: " + msg.getJMSMessageID());
38 }
39 else {
40 System.out.println("msg is null");
41 }
42 } catch (InterruptedException | ExecutionException | JMSException e) {
43 e.printStackTrace();
44 }
45 }
46
47 if (producerTask.isDone()) {
48 try {
49 System.out.println("producer's msgID: " + producerTask.get());
50 } catch (InterruptedException | ExecutionException e) {
51 e.printStackTrace();
52 }
53 }
54 }
55 }
主題模式在測試時要注意的就是,要確定消費者已經是在 active 狀態 (執行到 receive()),生產者再送出訊息,這樣消費者才會收到,如果生產者送出訊息時,消費者還沒有在 active 狀態,這個訊息就消失了,沒有人收到。