Google Code Prettify

2015年8月30日 星期日

JMS: getting started

Tibco EMS 是眾多遵循 JMS (Java Message Service) 規格的產品之一,這裡要整理的 JMS 相關資料,會以 EMS 來寫相關 sample code,所以如果有 EMS 的特異功能,也會儘量說明 (如果我知道的話)。

JMS 提供兩種訊息模式 (Messaging Models) - Point-to-point 及 Publish-and-subscribe (如下圖),這兩種模式有什麼不同?
在開始說明之前,先說明一個習慣說法,一般來說,在左邊送出訊息的稱為"生產者" (producer),在右邊收訊息的稱為"消費者" (consumer)。
  • Point-to-Point (點對點)
  1. 點對點一定是佇列模式 (queue),生產者送訊息到佇列,消費者由佇列取得訊息。
  2. 一個佇列可以有多個生產者送訊息過來,有多個消費者來取出訊息,但是,一個訊息是由一個生產者送出到佇列後,只會由一個消費者取得。
  3. 透過這個方式,在負載平衡上的運用很廣泛,生產者將要處理的訊息放入佇列,如果消費者取出訊息後,需要花比較長的時間處理,就可以創建多個消費者來處理這些訊息,每處理完一個就再去佇列中取出未處理的資料繼續處理,如此就可將負載分配到各個消費者。
  • Publish-and-Subscribe (發行者和訂閱者)
  1. 發佈訂閱是 topic 模式,生產者發佈指定 topic (主題) 的訊息到 message server,所有訂閱該 topic 的消費者即會由 server 收到該訊息。
  2. 同一個 topic 可以同時有多個消費者產生,產生後也都會送給所有訂閱的消費者。
  3. 當生產者送出訊息時,消費者必須是在 active 狀態 (等待接收訊息的狀態),消費者才能接到訊息,否則即沒有收到該訊息。
  4. 如果要確定消費者一定可以收到訊息,可以創建"durable"的消費者(訂閱者),那麼訊息在發佈時,如果該消費者沒有 active,訊息會先放在 server 上,直到出現下列三個條件之一,訊息才會被刪除。(1) 訊息過期 (訊息送出前可以設定過期時間),(2) 訊息被消費者取走,(3) server 的儲存空間超過限制。
  5. 當 client 重新創建相同的 ID 的 durable 消費者,server 上此消費者有訂閱該 topic  的訊息都會送給這個消費者。
JMS 本身是一個規範,定義了許多介面,由各家廠商實作這些介面,由上面的說明看起來,雖然兩種模式的運作有許多的不同,但是 JMS API 則是差不多的,程式的寫法有差距不大,下表是 Point-to-Point 和 Publis-and-Subscribe 間的差別:

General APIPoint-to-Point APIPublish-and-Subscribe API
ConnectionFactoryQueueConnectionFactoryTopicConnectionFactory
DestinationQueueTopic
ConnectionQueueConnectionTopicConnection
SessionQueueSessionTopicSession
MessageConsumerQueueSenderTopicPublisher
MessageProducerQueueReceiverTopicSubscriber






上面說明的是一些最基本的觀念,在開始寫程式前,當然要先把環境搞定,我在電腦裡,安裝了 Tibco EMS 8.1 development 版本,首先開啟「EMS Administration Tool」,用它來建立一個群組、一個使用者、一個佇列、一個主題(topic),當然要讓這個使用者可以存取佇列和主題。
  • connect: 連線到 EMS,預設會用 admin,密碼為空白,EMS 預設的 port 是 7222,如下圖。
  • create group ap "programmer": 建立一個命名為 ap 的群組,寫程式的人就加入這個群組。(create group group_name ["description"])
  • create user steven "programmer01" password=password: 建立一個帳號 steven,密碼為 password。(create user user_name ["description"] [password=password]
  • add member ap steven: 將 steven 加入 ap 群組。(add member group_name user_name [,user2, user3,...])
  • create queue queue01: 建立一個命名為 queue01 的佇列。(create queue queue_name [properties])
  • grant queue queue01 group=ap send,receive,browse: 授權給群組 ap,可以送訊息到 queue01、從 queue01 收訊息及瀏覽 queue01 的訊息。最後一個參數是權限,有三個值 send、receive、browse,如上面所示,要一次授權多種權限,用逗點分隔。(grant queue queue-name user=name | group=name permissions)
  • create topic topic01: 建立一個命名為 topic01 的主題。(create topic topic_name [properties])
  • grant topic topic01 group=ap subscribe,publish,durable,use_durable: 授權給群組 ap,可以有 subscribe (訂閱)、publish (發佈)、durable (持久的)、use_durable (使用持久的) 權限。(grant topic topic-name user=name | group=name permissions)
(EMS Administration Tool 的各種指令,請參閱官方文件「TIBCO Enterprise Message Service User's Guide」第六章。)

接下來看看程式怎麼寫 ...
  • Point-to-Point (點對點)
** 生產者
1 package idv.steven.ems;
 2 
 3 import java.util.concurrent.Callable;
 4 
 5 import javax.jms.Connection;
 6 import javax.jms.ConnectionFactory;
 7 import javax.jms.DeliveryMode;
 8 import javax.jms.Destination;
 9 import javax.jms.MessageProducer;
10 import javax.jms.Session;
11 import javax.jms.TextMessage;
12 
13 public class QueueProducer implements Callable<TextMessage> {
14     private String userName = "steven", password = "password", url = "tcp://localhost:7222";
15     private String queueName = "queue01";
16 
17     @Override
18     public TextMessage call() throws Exception {
19         Connection connection   = null;
20         
21         ConnectionFactory factory = new com.tibco.tibjms.TibjmsConnectionFactory(url);
22         connection = factory.createConnection(userName,password);
23         Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
24         Destination destination = session.createQueue(queueName);
25         MessageProducer msgProducer = session.createProducer(null);
26         TextMessage msg;
27         msg = session.createTextMessage();
28         msg.setText("just for test");
29         msg.setStringProperty("CustomerType","VIP");
30         msg.setIntProperty("value",123);
31         msg.setIntProperty("Age",18);
32         msg.setStringProperty("Location","Taipei");
33         msgProducer.send(destination, msg, DeliveryMode.PERSISTENT, 9, 60000);
34         System.out.println(msg);
35         
36         connection.close();
37         
38         return msg;
39     }
40 }


程式說明:
  1. Line 21: 先建立一個連線的工廠類別,由這行程式清楚的看到,各廠商實作了 JMS 的介面。
  2. Line 22: 由連線工廠類別產生連線。
  3. Line 23: 由連線產生 session,session 是非常重要的一個工廠類別,由之後的程式可以看到,程式會用這個類別產生 queue、topic、producer、consumer … createSession() 的第二個參數是關於 acknowledge 的設定,關於這個設定,在初學階段都先設定為 AUTO_ACKNOWLEDGE,之後會再說明其它用法。
  4. Line 24: 不管是佇列模式或主題模式,都會產生這個 Destination 介面的實作類別,現在要用佇列模式,就用 session 產生一個佇列,如果是要用主題模式,就用 session 產生一個主題。這樣生產者送出訊息時,可不用在意是以佇列模式或主題模式送出。
  5. Line 28~32: 28 行是設定訊息 body 的值,29~32行是設定訊息的屬性值,設定 body 和屬性的 setter method 的名稱差別在於,設定屬性的 setter method 名稱是以 Property 為結尾,請參考「JMS: 訊息的標頭、屬性和種類」。
  6. Line 33: 將訊息送出到 Destination 介面的物件,在這裡當然就是送到佇列。
** 消費者
 1 package idv.steven.ems;
 2 
 3 import java.util.Hashtable;
 4 import java.util.concurrent.Callable;
 5 
 6 import javax.jms.Connection;
 7 import javax.jms.ConnectionFactory;
 8 import javax.jms.Destination;
 9 import javax.jms.ExceptionListener;
10 import javax.jms.JMSException;
11 import javax.jms.Message;
12 import javax.jms.MessageConsumer;
13 import javax.jms.Session;
14 
15 public class QueueConsumer implements Callable<String>, ExceptionListener {
16     private String userName = "steven", password = "password", url = "tcp://localhost:7222";
17     private String queueName = "queue01";
18     
19     @Override
20     public String call() throws Exception {
21         Connection      connection  = null;
22         
23         Hashtable prop = new Hashtable();
24         prop.put("com.tibco.tibjms.reconnect.attemptcount", new Integer(10));
25         prop.put("com.tibco.tibjms.reconnect.attemptdelay", new Integer(1000));
26         ConnectionFactory factory = new com.tibco.tibjms.TibjmsConnectionFactory(url,null,prop);
27         connection = factory.createConnection(userName,password);
28         
29         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
30         Destination destination = session.createQueue(queueName);
31         connection.setExceptionListener(this);
32         MessageConsumer msgConsumer = session.createConsumer(destination);
33         
34         connection.start();
35         
36         Message msg = msgConsumer.receive();
37 
38         if (msg != null)
39             System.err.println("Received message: "+ msg);
40         
41         connection.close();
42             
43         return msg.getJMSMessageID();
44     }
45 
46     @Override
47     public void onException(JMSException ex) {
48         ex.printStackTrace();
49     }
50 }
程式說明:
  1. Line 23~27: 建立連線,這裡在建立過程有傳入一些參數,這會在後續再說明。
  2. Line 34: 這一行很重要,呼叫 start() 後,才會開始接收資料。
  3. Line 36: 程式會停在這一行等待訊息到達。
** 測試程式
 1 package idv.steven.ems;
 2 
 3 import java.util.concurrent.Callable;
 4 import java.util.concurrent.ExecutionException;
 5 import java.util.concurrent.FutureTask;
 6 
 7 import javax.jms.JMSException;
 8 import javax.jms.TextMessage;
 9 
10 public class QueueMsg {
11 
12     public static void main(String[] args) {
13         QueueProducer producer = new QueueProducer();
14         FutureTask<TextMessage> producerTask = new FutureTask<TextMessage>(producer);
15         Thread tProducer = new Thread(producerTask);
16         tProducer.start();
17         
18         try {
19             Thread.sleep(1000);
20         } catch (InterruptedException e1) {
21         }
22 
23         Callable<String> consumer = new QueueConsumer();
24         FutureTask<String> consumerTask = new FutureTask<String>(consumer);
25         Thread tConsumer = new Thread(consumerTask);
26         tConsumer.start();        
27         
28         try {
29             if (producerTask.isDone()) {
30                 TextMessage msg = producerTask.get();
31                 if (msg != null) {
32                     System.out.println("producer's message: " + msg.getText());
33                 }
34             }
35             
36             try {
37                 Thread.sleep(300);
38             } catch (InterruptedException e1) {
39             }
40             
41             if (consumerTask.isDone()) {
42                 String text = consumerTask.get();
43                 System.out.println("consumer's message: " + text);
44             }
45         } catch (InterruptedException | ExecutionException | JMSException e) {
46             e.printStackTrace();
47         }
48     }
49 }
這個測試程式很簡單的先啟動生產者,再啟動消費者,並輸出生產者送出的資料及消費者收到的資料,以確定雙方確實有正確的溝通。
  • Publish-and-Subscribe (發佈、訂閱)
** 生產者
 1 package idv.steven.ems;
 2 
 3 import java.util.concurrent.Callable;
 4 
 5 import javax.jms.Connection;
 6 import javax.jms.ConnectionFactory;
 7 import javax.jms.DeliveryMode;
 8 import javax.jms.MapMessage;
 9 import javax.jms.Session;
10 import javax.jms.Topic;
11 import javax.jms.TopicConnection;
12 import javax.jms.TopicConnectionFactory;
13 import javax.jms.TopicPublisher;
14 import javax.jms.TopicSession;
15 
16 import com.tibco.tibjms.Tibjms;
17 import com.tibco.tibjms.TibjmsConnectionFactory;
18 import com.tibco.tibjms.TibjmsTopicConnectionFactory;
19 
20 public class MyTopicPublisher implements Callable<String> {
21     private String userName = "steven", password = "paasword", url = "tcp://localhost:7222";
22     private String topicName = "topic01";
23     
24     @Override
25     public String call() throws Exception {
26         TopicConnection connection = null;
27         
28         Tibjms.setEncoding("Big5");
29         TopicConnectionFactory factory = new TibjmsTopicConnectionFactory(url);
30         connection = factory.createTopicConnection(userName,password);
31         TopicSession session = connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
32         Topic topic = session.createTopic(topicName);
33         TopicPublisher publisher = session.createPublisher(topic);
34         
35         MapMessage msg = session.createMapMessage();
36         msg.setInt("age",35);
37         msg.setString("address","台北市瑞湖街68號15樓");
38         msg.setStringProperty("JMS_TIBCO_MSG_TRACE","body");
39         msg.setBooleanProperty("JMS_TIBCO_PRESERVE_UNDELIVERED",true);
40         msg.setIntProperty("Level",123);
41         msg.setStringProperty("Location","Taipei");
42         
43         publisher.publish(msg, DeliveryMode.PERSISTENT, 9, 1000000);
44         String msgID = msg.getJMSMessageID();
45         
46         connection.close();
47         
48         return msgID;
49     }
50 }
程式說明:
  1. Line 31: 這裡建立的 session,第 2 個參數之後會詳加討論。
  2. Line 43: 在這裡生產者將訊息發佈出去,這和 31 行間的關係,也會在之後詳加討論。
  3. Line 44: 每個訊息送出時,server 都會給予一個不會重複的訊息編號。
** 消費者
 1 package idv.steven.ems;
 2 
 3 import java.util.Hashtable;
 4 import java.util.concurrent.Callable;
 5 
 6 import javax.jms.Connection;
 7 import javax.jms.ConnectionFactory;
 8 import javax.jms.Destination;
 9 import javax.jms.ExceptionListener;
10 import javax.jms.JMSException;
11 import javax.jms.Message;
12 import javax.jms.MessageConsumer;
13 import javax.jms.Session;
14 import javax.jms.Topic;
15 import javax.jms.TopicConnection;
16 import javax.jms.TopicConnectionFactory;
17 import javax.jms.TopicSession;
18 import javax.jms.TopicSubscriber;
19 
20 import com.tibco.tibjms.TibjmsTopicConnectionFactory;
21 
22 public class MyTopicSubscriber implements Callable<Message>, ExceptionListener {
23     private String userName = "steven", password = "password", url = "tcp://localhost:7222";
24     private String topicName = "topic01";
25     
26     @Override
27     public Message call() throws Exception {
28         Message msg = null;
29         
30         try {
31             Hashtable prop = new Hashtable();
32             prop.put("com.tibco.tibjms.reconnect.attemptcount", new Integer(2));
33             prop.put("com.tibco.tibjms.reconnect.attemptdelay", new Integer(10));
34             
35             TopicConnectionFactory factory = new TibjmsTopicConnectionFactory(url, null, prop);
36             TopicConnection connection = factory.createTopicConnection(userName,password);
37             TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
38             Topic topic = session.createTopic(topicName);
39             TopicSubscriber subscriber = session.createSubscriber(topic);
40             
41             connection.setExceptionListener(this);
42             connection.start();
43             
44             msg = subscriber.receive();
45             
46             if (msg != null)
47                 System.out.println("Received message: "+ msg);
48 
49             connection.close();
50         } catch (JMSException e) {
51             e.printStackTrace();
52         }
53         
54         return msg;
55     }
56     
57     @Override
58     public void onException(JMSException e) {
59         e.printStackTrace();
60     }
61 }
程式說明:
  1. Line 42: 不管是佇列模式、主題模式,都需要呼叫 start() 才會開始收訊息。
  2. Line 44: 程式會停在這一行以等待傾聽的主題的訊息到達。
** 測試程式
 1 package idv.steven.ems;
 2 
 3 import java.util.concurrent.Callable;
 4 import java.util.concurrent.ExecutionException;
 5 import java.util.concurrent.FutureTask;
 6 
 7 import javax.jms.JMSException;
 8 import javax.jms.Message;
 9 
10 public class TopicMsg {
11 
12     public static void main(String[] args) {
13         Callable<Message> consumer = new MyTopicSubscriber();
14         FutureTask<Message> consumerTask = new FutureTask<Message>(consumer);
15         Thread tConsumer = new Thread(consumerTask);
16         tConsumer.start();
17         
18         try {
19             Thread.sleep(300);
20         } catch (InterruptedException e1) {
21         }
22         
23         Callable<String> producer = new MyTopicPublisher();
24         FutureTask<String> producerTask = new FutureTask<String>(producer);
25         Thread tProducer = new Thread(producerTask);
26         tProducer.start();
27         
28         try {
29             Thread.sleep(300);
30         } catch (InterruptedException e1) {
31         }
32         
33         if (consumerTask.isDone()) {
34             try {
35                 Message msg = consumerTask.get();
36                 if (msg != null) {
37                     System.out.println("consumer's msgID: " + msg.getJMSMessageID());
38                 }
39                 else {
40                     System.out.println("msg is null");
41                 }
42             } catch (InterruptedException | ExecutionException | JMSException e) {
43                 e.printStackTrace();
44             }
45         }
46         
47         if (producerTask.isDone()) {
48             try {
49                 System.out.println("producer's msgID: " + producerTask.get());
50             } catch (InterruptedException | ExecutionException e) {
51                 e.printStackTrace();
52             }
53         }
54     }
55 }
主題模式在測試時要注意的就是,要確定消費者已經是在 active 狀態 (執行到 receive()),生產者再送出訊息,這樣消費者才會收到,如果生產者送出訊息時,消費者還沒有在 active 狀態,這個訊息就消失了,沒有人收到。

沒有留言:

張貼留言