| 項目 | 日期 | |
|---|---|---|
| 1 | Tibco RV request/reply 的同步與非同步 |
2015/02/07
|
| 2 | Tibco RV 的 Queue |
2015/05/05
|
| 3 | Tibco RV - fault tolerance |
2015/07/10
|
| 4 | JMS: getting started |
2015/08/30
|
| 5 | JMS: 訊息的標頭、屬性和種類 |
2015/08/30
|
Google Code Prettify
2015年10月23日 星期五
Tibco RV / JMS / Apache Kafka
2015年8月30日 星期日
JMS: 訊息的標頭、屬性和種類
上一篇主要是讓初次接觸 JMS 的人快速了解 JMS,對於一些細節略而不談,JMS 的訊息結構分三部份:
- Header (required) - 標頭
- Properties (optional) - 屬性
- Body (optional) - 訊息本體
- Header
| 項目 | 說明 | |
|---|---|---|
| 1 | JMSDestination | 傳送的目的地有可能是 quque 或 topic,client 程式可以利用這個 header,如下取得 queue 或 topic 值: Topic destination = (Topic) message.getJMSDestination(); |
| 2 | JMSDeliveryMode | 傳送模式有兩種 - DeliveryMode.PERSISTENT 及 DeliveryMode.NON_PERSISTENT,這個標頭值可以在 producer (生產者) 端以 setJMSDeliverMode(DeliveryMode.PERSISTENT); 的方式設定。當訊息為 PERSISTENT 時,訊息至少會被 server 傳送出一次,萬一傳送過程 server 出問題,當 server 服務恢復後,就會重傳,這表示,消費者端有可能同一個訊息收到兩次; 當訊息為 NON_PERSISTENT,訊息永遠只會被送出一次,所以,萬一因各種原因沒有送到消費者,也不會重送。 |
| 3 | JMSMessageID | JMSMessageID 的型別是 String,是由 server 端產生的一個 unique 字串,JMS 沒有規範產生的規則,所以不同廠商的 server 產生的 JMSMessageID 的規則會不相同,但都會是唯一。client 要取得這個值的方法如下: String messageId = message.getJMSMessageID(); |
| 4 | JMSTimestamp | 這是個 long 型別的值,當訊息由生產者送到 server 端時,EMS (或其它品牌的 message server) 會設定給它當時系統時間的值,即 JMSTimestamp 所記錄的是 server 收到 producer 送來訊息的時間。 |
| 5 | JMSExpiration | 也是一個長整數的值,記錄何時這個訊息就會失效,設定這個標頭值的方法是在生產者端呼叫 setJMSExpiration(...) 或 setTimeToLive(...),如果傳入的值為 0,表示永遠不失效。 |
| 6 | JMSRedelivered | 值為 true 或 false,用來表示 server 如果沒有收到消費者端的 acknowledge,訊息是否重傳。 |
| 7 | JMSPriority | 這個值由0~9 表示訊息的重要性,數字越大表示越重要,由生產者設定,一般的訊息應設定在 0~4,重要的訊息則設定其值為 5~9。 |
| 8 | JMSReplyTo | 這個值由生產者端設定,要求消費者端回覆時,回覆到那一個 queue 或 topic,所以在消費者端一定會使用 getJMSReplyTo() 取得值。 |
| 9 | JMSCorrelationID | 這個值用來標示訊息間的關係,一種普遍的作法是,消費者端回覆訊息給生產者時,將生產者傳來的訊息的 message id 設定到 JMSCorrelationID 再回覆給生產者,這樣生產者就知道這次回覆的訊息是那一個訊息的回覆訊息。 |
| 10 | JMSType | 用來標示訊息本體是那一種型別,可能的值有 MapMessage、TextMessage、ByteMessage 等。 |
- Properties
TestMessage msg = session.createTextMessage();
msg.setText("just for test");
msg.setStringProperty("username", "steven");
publisher.publish(msg);屬性的型別可以是 boolean、byte、short、int、long、float、double、String 中任一種。JMS 定義了以下的幾個屬性,但除了 JMSXGroupID 及 JMSXGroupSeq 強制廠商一定要支援外,其餘不一定要支援,所以可能你選用的產品有部份的屬性是沒有的。
- JMSXUserID
- JMSXAppID
- JMSProducerTXID
- JMSConsumerTXID
- JMSRcvTimestamp
- JMSXDeliveryCount
- JMSXState
- JMSGroupID
- JMSXGroupSeq
- JMS_TIBCO_CM_PUBLISHER
- JMS_TIBCO_CM_SEQUENCE
- JMS_TIBCO_COMPRESS
- JMS_TIBCO_DISABLE_SENDER
- JMS_TIBCO_IMPORTED
- JMS_TIBCO_MSG_EXT
- JMS_TIBCO_MSG_TRACE
- JMS_TIBCO_PRESERVE_UNDELIVERED
- JMS_TIBCO_SENDER
- JMS_TIBCO_SS_SENDER
- Types
- TextMessage
- StreamMessage
- MapMessage
- ObjectMessage
- BytesMessage
這是 JMS 定義的五個介面,這五個介面都繼承 Message 這個介面,這是 body 的五個種類。
JMS: getting started
Tibco EMS 是眾多遵循 JMS (Java Message Service) 規格的產品之一,這裡要整理的 JMS 相關資料,會以 EMS 來寫相關 sample code,所以如果有 EMS 的特異功能,也會儘量說明 (如果我知道的話)。
JMS 提供兩種訊息模式 (Messaging Models) - Point-to-point 及 Publish-and-subscribe (如下圖),這兩種模式有什麼不同?
在開始說明之前,先說明一個習慣說法,一般來說,在左邊送出訊息的稱為"生產者" (producer),在右邊收訊息的稱為"消費者" (consumer)。
上面說明的是一些最基本的觀念,在開始寫程式前,當然要先把環境搞定,我在電腦裡,安裝了 Tibco EMS 8.1 development 版本,首先開啟「EMS Administration Tool」,用它來建立一個群組、一個使用者、一個佇列、一個主題(topic),當然要讓這個使用者可以存取佇列和主題。
接下來看看程式怎麼寫 ...
程式說明:
JMS 提供兩種訊息模式 (Messaging Models) - Point-to-point 及 Publish-and-subscribe (如下圖),這兩種模式有什麼不同?
在開始說明之前,先說明一個習慣說法,一般來說,在左邊送出訊息的稱為"生產者" (producer),在右邊收訊息的稱為"消費者" (consumer)。
- Point-to-Point (點對點)
- 點對點一定是佇列模式 (queue),生產者送訊息到佇列,消費者由佇列取得訊息。
- 一個佇列可以有多個生產者送訊息過來,有多個消費者來取出訊息,但是,一個訊息是由一個生產者送出到佇列後,只會由一個消費者取得。
- 透過這個方式,在負載平衡上的運用很廣泛,生產者將要處理的訊息放入佇列,如果消費者取出訊息後,需要花比較長的時間處理,就可以創建多個消費者來處理這些訊息,每處理完一個就再去佇列中取出未處理的資料繼續處理,如此就可將負載分配到各個消費者。
- Publish-and-Subscribe (發行者和訂閱者)
- 發佈訂閱是 topic 模式,生產者發佈指定 topic (主題) 的訊息到 message server,所有訂閱該 topic 的消費者即會由 server 收到該訊息。
- 同一個 topic 可以同時有多個消費者產生,產生後也都會送給所有訂閱的消費者。
- 當生產者送出訊息時,消費者必須是在 active 狀態 (等待接收訊息的狀態),消費者才能接到訊息,否則即沒有收到該訊息。
- 如果要確定消費者一定可以收到訊息,可以創建"durable"的消費者(訂閱者),那麼訊息在發佈時,如果該消費者沒有 active,訊息會先放在 server 上,直到出現下列三個條件之一,訊息才會被刪除。(1) 訊息過期 (訊息送出前可以設定過期時間),(2) 訊息被消費者取走,(3) server 的儲存空間超過限制。
- 當 client 重新創建相同的 ID 的 durable 消費者,server 上此消費者有訂閱該 topic 的訊息都會送給這個消費者。
| General API | Point-to-Point API | Publish-and-Subscribe API |
|---|---|---|
| ConnectionFactory | QueueConnectionFactory | TopicConnectionFactory |
| Destination | Queue | Topic |
| Connection | QueueConnection | TopicConnection |
| Session | QueueSession | TopicSession |
| MessageConsumer | QueueSender | TopicPublisher |
| MessageProducer | QueueReceiver | TopicSubscriber |
上面說明的是一些最基本的觀念,在開始寫程式前,當然要先把環境搞定,我在電腦裡,安裝了 Tibco EMS 8.1 development 版本,首先開啟「EMS Administration Tool」,用它來建立一個群組、一個使用者、一個佇列、一個主題(topic),當然要讓這個使用者可以存取佇列和主題。
- connect: 連線到 EMS,預設會用 admin,密碼為空白,EMS 預設的 port 是 7222,如下圖。
- create group ap "programmer": 建立一個命名為 ap 的群組,寫程式的人就加入這個群組。(create group group_name ["description"])
- create user steven "programmer01" password=password: 建立一個帳號 steven,密碼為 password。(create user user_name ["description"] [password=password]
- add member ap steven: 將 steven 加入 ap 群組。(add member group_name user_name [,user2, user3,...])
- create queue queue01: 建立一個命名為 queue01 的佇列。(create queue queue_name [properties])
- grant queue queue01 group=ap send,receive,browse: 授權給群組 ap,可以送訊息到 queue01、從 queue01 收訊息及瀏覽 queue01 的訊息。最後一個參數是權限,有三個值 send、receive、browse,如上面所示,要一次授權多種權限,用逗點分隔。(grant queue queue-name user=name | group=name permissions)
- create topic topic01: 建立一個命名為 topic01 的主題。(create topic topic_name [properties])
- grant topic topic01 group=ap subscribe,publish,durable,use_durable: 授權給群組 ap,可以有 subscribe (訂閱)、publish (發佈)、durable (持久的)、use_durable (使用持久的) 權限。(grant topic topic-name user=name | group=name permissions)
接下來看看程式怎麼寫 ...
- Point-to-Point (點對點)
1 package idv.steven.ems; 2 3 import java.util.concurrent.Callable; 4 5 import javax.jms.Connection; 6 import javax.jms.ConnectionFactory; 7 import javax.jms.DeliveryMode; 8 import javax.jms.Destination; 9 import javax.jms.MessageProducer; 10 import javax.jms.Session; 11 import javax.jms.TextMessage; 12 13 public class QueueProducer implements Callable<TextMessage> { 14 private String userName = "steven", password = "password", url = "tcp://localhost:7222"; 15 private String queueName = "queue01"; 16 17 @Override 18 public TextMessage call() throws Exception { 19 Connection connection = null; 20 21 ConnectionFactory factory = new com.tibco.tibjms.TibjmsConnectionFactory(url); 22 connection = factory.createConnection(userName,password); 23 Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); 24 Destination destination = session.createQueue(queueName); 25 MessageProducer msgProducer = session.createProducer(null); 26 TextMessage msg; 27 msg = session.createTextMessage(); 28 msg.setText("just for test"); 29 msg.setStringProperty("CustomerType","VIP"); 30 msg.setIntProperty("value",123); 31 msg.setIntProperty("Age",18); 32 msg.setStringProperty("Location","Taipei"); 33 msgProducer.send(destination, msg, DeliveryMode.PERSISTENT, 9, 60000); 34 System.out.println(msg); 35 36 connection.close(); 37 38 return msg; 39 } 40 }
程式說明:
- Line 21: 先建立一個連線的工廠類別,由這行程式清楚的看到,各廠商實作了 JMS 的介面。
- Line 22: 由連線工廠類別產生連線。
- Line 23: 由連線產生 session,session 是非常重要的一個工廠類別,由之後的程式可以看到,程式會用這個類別產生 queue、topic、producer、consumer … createSession() 的第二個參數是關於 acknowledge 的設定,關於這個設定,在初學階段都先設定為 AUTO_ACKNOWLEDGE,之後會再說明其它用法。
- Line 24: 不管是佇列模式或主題模式,都會產生這個 Destination 介面的實作類別,現在要用佇列模式,就用 session 產生一個佇列,如果是要用主題模式,就用 session 產生一個主題。這樣生產者送出訊息時,可不用在意是以佇列模式或主題模式送出。
- Line 28~32: 28 行是設定訊息 body 的值,29~32行是設定訊息的屬性值,設定 body 和屬性的 setter method 的名稱差別在於,設定屬性的 setter method 名稱是以 Property 為結尾,請參考「JMS: 訊息的標頭、屬性和種類」。
- Line 33: 將訊息送出到 Destination 介面的物件,在這裡當然就是送到佇列。
1 package idv.steven.ems; 2 3 import java.util.Hashtable; 4 import java.util.concurrent.Callable; 5 6 import javax.jms.Connection; 7 import javax.jms.ConnectionFactory; 8 import javax.jms.Destination; 9 import javax.jms.ExceptionListener; 10 import javax.jms.JMSException; 11 import javax.jms.Message; 12 import javax.jms.MessageConsumer; 13 import javax.jms.Session; 14 15 public class QueueConsumer implements Callable<String>, ExceptionListener { 16 private String userName = "steven", password = "password", url = "tcp://localhost:7222"; 17 private String queueName = "queue01"; 18 19 @Override 20 public String call() throws Exception { 21 Connection connection = null; 22 23 Hashtable prop = new Hashtable(); 24 prop.put("com.tibco.tibjms.reconnect.attemptcount", new Integer(10)); 25 prop.put("com.tibco.tibjms.reconnect.attemptdelay", new Integer(1000)); 26 ConnectionFactory factory = new com.tibco.tibjms.TibjmsConnectionFactory(url,null,prop); 27 connection = factory.createConnection(userName,password); 28 29 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 30 Destination destination = session.createQueue(queueName); 31 connection.setExceptionListener(this); 32 MessageConsumer msgConsumer = session.createConsumer(destination); 33 34 connection.start(); 35 36 Message msg = msgConsumer.receive(); 37 38 if (msg != null) 39 System.err.println("Received message: "+ msg); 40 41 connection.close(); 42 43 return msg.getJMSMessageID(); 44 } 45 46 @Override 47 public void onException(JMSException ex) { 48 ex.printStackTrace(); 49 } 50 }程式說明:
- Line 23~27: 建立連線,這裡在建立過程有傳入一些參數,這會在後續再說明。
- Line 34: 這一行很重要,呼叫 start() 後,才會開始接收資料。
- Line 36: 程式會停在這一行等待訊息到達。
1 package idv.steven.ems; 2 3 import java.util.concurrent.Callable; 4 import java.util.concurrent.ExecutionException; 5 import java.util.concurrent.FutureTask; 6 7 import javax.jms.JMSException; 8 import javax.jms.TextMessage; 9 10 public class QueueMsg { 11 12 public static void main(String[] args) { 13 QueueProducer producer = new QueueProducer(); 14 FutureTask<TextMessage> producerTask = new FutureTask<TextMessage>(producer); 15 Thread tProducer = new Thread(producerTask); 16 tProducer.start(); 17 18 try { 19 Thread.sleep(1000); 20 } catch (InterruptedException e1) { 21 } 22 23 Callable<String> consumer = new QueueConsumer(); 24 FutureTask<String> consumerTask = new FutureTask<String>(consumer); 25 Thread tConsumer = new Thread(consumerTask); 26 tConsumer.start(); 27 28 try { 29 if (producerTask.isDone()) { 30 TextMessage msg = producerTask.get(); 31 if (msg != null) { 32 System.out.println("producer's message: " + msg.getText()); 33 } 34 } 35 36 try { 37 Thread.sleep(300); 38 } catch (InterruptedException e1) { 39 } 40 41 if (consumerTask.isDone()) { 42 String text = consumerTask.get(); 43 System.out.println("consumer's message: " + text); 44 } 45 } catch (InterruptedException | ExecutionException | JMSException e) { 46 e.printStackTrace(); 47 } 48 } 49 }這個測試程式很簡單的先啟動生產者,再啟動消費者,並輸出生產者送出的資料及消費者收到的資料,以確定雙方確實有正確的溝通。
- Publish-and-Subscribe (發佈、訂閱)
1 package idv.steven.ems; 2 3 import java.util.concurrent.Callable; 4 5 import javax.jms.Connection; 6 import javax.jms.ConnectionFactory; 7 import javax.jms.DeliveryMode; 8 import javax.jms.MapMessage; 9 import javax.jms.Session; 10 import javax.jms.Topic; 11 import javax.jms.TopicConnection; 12 import javax.jms.TopicConnectionFactory; 13 import javax.jms.TopicPublisher; 14 import javax.jms.TopicSession; 15 16 import com.tibco.tibjms.Tibjms; 17 import com.tibco.tibjms.TibjmsConnectionFactory; 18 import com.tibco.tibjms.TibjmsTopicConnectionFactory; 19 20 public class MyTopicPublisher implements Callable<String> { 21 private String userName = "steven", password = "paasword", url = "tcp://localhost:7222"; 22 private String topicName = "topic01"; 23 24 @Override 25 public String call() throws Exception { 26 TopicConnection connection = null; 27 28 Tibjms.setEncoding("Big5"); 29 TopicConnectionFactory factory = new TibjmsTopicConnectionFactory(url); 30 connection = factory.createTopicConnection(userName,password); 31 TopicSession session = connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE); 32 Topic topic = session.createTopic(topicName); 33 TopicPublisher publisher = session.createPublisher(topic); 34 35 MapMessage msg = session.createMapMessage(); 36 msg.setInt("age",35); 37 msg.setString("address","台北市瑞湖街68號15樓"); 38 msg.setStringProperty("JMS_TIBCO_MSG_TRACE","body"); 39 msg.setBooleanProperty("JMS_TIBCO_PRESERVE_UNDELIVERED",true); 40 msg.setIntProperty("Level",123); 41 msg.setStringProperty("Location","Taipei"); 42 43 publisher.publish(msg, DeliveryMode.PERSISTENT, 9, 1000000); 44 String msgID = msg.getJMSMessageID(); 45 46 connection.close(); 47 48 return msgID; 49 } 50 }
程式說明:
- Line 31: 這裡建立的 session,第 2 個參數之後會詳加討論。
- Line 43: 在這裡生產者將訊息發佈出去,這和 31 行間的關係,也會在之後詳加討論。
- Line 44: 每個訊息送出時,server 都會給予一個不會重複的訊息編號。
程式說明:1 package idv.steven.ems; 2 3 import java.util.Hashtable; 4 import java.util.concurrent.Callable; 5 6 import javax.jms.Connection; 7 import javax.jms.ConnectionFactory; 8 import javax.jms.Destination; 9 import javax.jms.ExceptionListener; 10 import javax.jms.JMSException; 11 import javax.jms.Message; 12 import javax.jms.MessageConsumer; 13 import javax.jms.Session; 14 import javax.jms.Topic; 15 import javax.jms.TopicConnection; 16 import javax.jms.TopicConnectionFactory; 17 import javax.jms.TopicSession; 18 import javax.jms.TopicSubscriber; 19 20 import com.tibco.tibjms.TibjmsTopicConnectionFactory; 21 22 public class MyTopicSubscriber implements Callable<Message>, ExceptionListener { 23 private String userName = "steven", password = "password", url = "tcp://localhost:7222"; 24 private String topicName = "topic01"; 25 26 @Override 27 public Message call() throws Exception { 28 Message msg = null; 29 30 try { 31 Hashtable prop = new Hashtable(); 32 prop.put("com.tibco.tibjms.reconnect.attemptcount", new Integer(2)); 33 prop.put("com.tibco.tibjms.reconnect.attemptdelay", new Integer(10)); 34 35 TopicConnectionFactory factory = new TibjmsTopicConnectionFactory(url, null, prop); 36 TopicConnection connection = factory.createTopicConnection(userName,password); 37 TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 38 Topic topic = session.createTopic(topicName); 39 TopicSubscriber subscriber = session.createSubscriber(topic); 40 41 connection.setExceptionListener(this); 42 connection.start(); 43 44 msg = subscriber.receive(); 45 46 if (msg != null) 47 System.out.println("Received message: "+ msg); 48 49 connection.close(); 50 } catch (JMSException e) { 51 e.printStackTrace(); 52 } 53 54 return msg; 55 } 56 57 @Override 58 public void onException(JMSException e) { 59 e.printStackTrace(); 60 } 61 }
- Line 42: 不管是佇列模式、主題模式,都需要呼叫 start() 才會開始收訊息。
- Line 44: 程式會停在這一行以等待傾聽的主題的訊息到達。
1 package idv.steven.ems; 2 3 import java.util.concurrent.Callable; 4 import java.util.concurrent.ExecutionException; 5 import java.util.concurrent.FutureTask; 6 7 import javax.jms.JMSException; 8 import javax.jms.Message; 9 10 public class TopicMsg { 11 12 public static void main(String[] args) { 13 Callable<Message> consumer = new MyTopicSubscriber(); 14 FutureTask<Message> consumerTask = new FutureTask<Message>(consumer); 15 Thread tConsumer = new Thread(consumerTask); 16 tConsumer.start(); 17 18 try { 19 Thread.sleep(300); 20 } catch (InterruptedException e1) { 21 } 22 23 Callable<String> producer = new MyTopicPublisher(); 24 FutureTask<String> producerTask = new FutureTask<String>(producer); 25 Thread tProducer = new Thread(producerTask); 26 tProducer.start(); 27 28 try { 29 Thread.sleep(300); 30 } catch (InterruptedException e1) { 31 } 32 33 if (consumerTask.isDone()) { 34 try { 35 Message msg = consumerTask.get(); 36 if (msg != null) { 37 System.out.println("consumer's msgID: " + msg.getJMSMessageID()); 38 } 39 else { 40 System.out.println("msg is null"); 41 } 42 } catch (InterruptedException | ExecutionException | JMSException e) { 43 e.printStackTrace(); 44 } 45 } 46 47 if (producerTask.isDone()) { 48 try { 49 System.out.println("producer's msgID: " + producerTask.get()); 50 } catch (InterruptedException | ExecutionException e) { 51 e.printStackTrace(); 52 } 53 } 54 } 55 }主題模式在測試時要注意的就是,要確定消費者已經是在 active 狀態 (執行到 receive()),生產者再送出訊息,這樣消費者才會收到,如果生產者送出訊息時,消費者還沒有在 active 狀態,這個訊息就消失了,沒有人收到。
訂閱:
意見 (Atom)

