JMS 提供兩種訊息模式 (Messaging Models) - Point-to-point 及 Publish-and-subscribe (如下圖),這兩種模式有什麼不同?
在開始說明之前,先說明一個習慣說法,一般來說,在左邊送出訊息的稱為"生產者" (producer),在右邊收訊息的稱為"消費者" (consumer)。
- Point-to-Point (點對點)
- 點對點一定是佇列模式 (queue),生產者送訊息到佇列,消費者由佇列取得訊息。
- 一個佇列可以有多個生產者送訊息過來,有多個消費者來取出訊息,但是,一個訊息是由一個生產者送出到佇列後,只會由一個消費者取得。
- 透過這個方式,在負載平衡上的運用很廣泛,生產者將要處理的訊息放入佇列,如果消費者取出訊息後,需要花比較長的時間處理,就可以創建多個消費者來處理這些訊息,每處理完一個就再去佇列中取出未處理的資料繼續處理,如此就可將負載分配到各個消費者。
- Publish-and-Subscribe (發行者和訂閱者)
- 發佈訂閱是 topic 模式,生產者發佈指定 topic (主題) 的訊息到 message server,所有訂閱該 topic 的消費者即會由 server 收到該訊息。
- 同一個 topic 可以同時有多個消費者產生,產生後也都會送給所有訂閱的消費者。
- 當生產者送出訊息時,消費者必須是在 active 狀態 (等待接收訊息的狀態),消費者才能接到訊息,否則即沒有收到該訊息。
- 如果要確定消費者一定可以收到訊息,可以創建"durable"的消費者(訂閱者),那麼訊息在發佈時,如果該消費者沒有 active,訊息會先放在 server 上,直到出現下列三個條件之一,訊息才會被刪除。(1) 訊息過期 (訊息送出前可以設定過期時間),(2) 訊息被消費者取走,(3) server 的儲存空間超過限制。
- 當 client 重新創建相同的 ID 的 durable 消費者,server 上此消費者有訂閱該 topic 的訊息都會送給這個消費者。
General API | Point-to-Point API | Publish-and-Subscribe API |
---|---|---|
ConnectionFactory | QueueConnectionFactory | TopicConnectionFactory |
Destination | Queue | Topic |
Connection | QueueConnection | TopicConnection |
Session | QueueSession | TopicSession |
MessageConsumer | QueueSender | TopicPublisher |
MessageProducer | QueueReceiver | TopicSubscriber |
上面說明的是一些最基本的觀念,在開始寫程式前,當然要先把環境搞定,我在電腦裡,安裝了 Tibco EMS 8.1 development 版本,首先開啟「EMS Administration Tool」,用它來建立一個群組、一個使用者、一個佇列、一個主題(topic),當然要讓這個使用者可以存取佇列和主題。
- connect: 連線到 EMS,預設會用 admin,密碼為空白,EMS 預設的 port 是 7222,如下圖。
- create group ap "programmer": 建立一個命名為 ap 的群組,寫程式的人就加入這個群組。(create group group_name ["description"])
- create user steven "programmer01" password=password: 建立一個帳號 steven,密碼為 password。(create user user_name ["description"] [password=password]
- add member ap steven: 將 steven 加入 ap 群組。(add member group_name user_name [,user2, user3,...])
- create queue queue01: 建立一個命名為 queue01 的佇列。(create queue queue_name [properties])
- grant queue queue01 group=ap send,receive,browse: 授權給群組 ap,可以送訊息到 queue01、從 queue01 收訊息及瀏覽 queue01 的訊息。最後一個參數是權限,有三個值 send、receive、browse,如上面所示,要一次授權多種權限,用逗點分隔。(grant queue queue-name user=name | group=name permissions)
- create topic topic01: 建立一個命名為 topic01 的主題。(create topic topic_name [properties])
- grant topic topic01 group=ap subscribe,publish,durable,use_durable: 授權給群組 ap,可以有 subscribe (訂閱)、publish (發佈)、durable (持久的)、use_durable (使用持久的) 權限。(grant topic topic-name user=name | group=name permissions)
接下來看看程式怎麼寫 ...
- Point-to-Point (點對點)
1 package idv.steven.ems; 2 3 import java.util.concurrent.Callable; 4 5 import javax.jms.Connection; 6 import javax.jms.ConnectionFactory; 7 import javax.jms.DeliveryMode; 8 import javax.jms.Destination; 9 import javax.jms.MessageProducer; 10 import javax.jms.Session; 11 import javax.jms.TextMessage; 12 13 public class QueueProducer implements Callable<TextMessage> { 14 private String userName = "steven", password = "password", url = "tcp://localhost:7222"; 15 private String queueName = "queue01"; 16 17 @Override 18 public TextMessage call() throws Exception { 19 Connection connection = null; 20 21 ConnectionFactory factory = new com.tibco.tibjms.TibjmsConnectionFactory(url); 22 connection = factory.createConnection(userName,password); 23 Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); 24 Destination destination = session.createQueue(queueName); 25 MessageProducer msgProducer = session.createProducer(null); 26 TextMessage msg; 27 msg = session.createTextMessage(); 28 msg.setText("just for test"); 29 msg.setStringProperty("CustomerType","VIP"); 30 msg.setIntProperty("value",123); 31 msg.setIntProperty("Age",18); 32 msg.setStringProperty("Location","Taipei"); 33 msgProducer.send(destination, msg, DeliveryMode.PERSISTENT, 9, 60000); 34 System.out.println(msg); 35 36 connection.close(); 37 38 return msg; 39 } 40 }
程式說明:
- Line 21: 先建立一個連線的工廠類別,由這行程式清楚的看到,各廠商實作了 JMS 的介面。
- Line 22: 由連線工廠類別產生連線。
- Line 23: 由連線產生 session,session 是非常重要的一個工廠類別,由之後的程式可以看到,程式會用這個類別產生 queue、topic、producer、consumer … createSession() 的第二個參數是關於 acknowledge 的設定,關於這個設定,在初學階段都先設定為 AUTO_ACKNOWLEDGE,之後會再說明其它用法。
- Line 24: 不管是佇列模式或主題模式,都會產生這個 Destination 介面的實作類別,現在要用佇列模式,就用 session 產生一個佇列,如果是要用主題模式,就用 session 產生一個主題。這樣生產者送出訊息時,可不用在意是以佇列模式或主題模式送出。
- Line 28~32: 28 行是設定訊息 body 的值,29~32行是設定訊息的屬性值,設定 body 和屬性的 setter method 的名稱差別在於,設定屬性的 setter method 名稱是以 Property 為結尾,請參考「JMS: 訊息的標頭、屬性和種類」。
- Line 33: 將訊息送出到 Destination 介面的物件,在這裡當然就是送到佇列。
1 package idv.steven.ems; 2 3 import java.util.Hashtable; 4 import java.util.concurrent.Callable; 5 6 import javax.jms.Connection; 7 import javax.jms.ConnectionFactory; 8 import javax.jms.Destination; 9 import javax.jms.ExceptionListener; 10 import javax.jms.JMSException; 11 import javax.jms.Message; 12 import javax.jms.MessageConsumer; 13 import javax.jms.Session; 14 15 public class QueueConsumer implements Callable<String>, ExceptionListener { 16 private String userName = "steven", password = "password", url = "tcp://localhost:7222"; 17 private String queueName = "queue01"; 18 19 @Override 20 public String call() throws Exception { 21 Connection connection = null; 22 23 Hashtable prop = new Hashtable(); 24 prop.put("com.tibco.tibjms.reconnect.attemptcount", new Integer(10)); 25 prop.put("com.tibco.tibjms.reconnect.attemptdelay", new Integer(1000)); 26 ConnectionFactory factory = new com.tibco.tibjms.TibjmsConnectionFactory(url,null,prop); 27 connection = factory.createConnection(userName,password); 28 29 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 30 Destination destination = session.createQueue(queueName); 31 connection.setExceptionListener(this); 32 MessageConsumer msgConsumer = session.createConsumer(destination); 33 34 connection.start(); 35 36 Message msg = msgConsumer.receive(); 37 38 if (msg != null) 39 System.err.println("Received message: "+ msg); 40 41 connection.close(); 42 43 return msg.getJMSMessageID(); 44 } 45 46 @Override 47 public void onException(JMSException ex) { 48 ex.printStackTrace(); 49 } 50 }程式說明:
- Line 23~27: 建立連線,這裡在建立過程有傳入一些參數,這會在後續再說明。
- Line 34: 這一行很重要,呼叫 start() 後,才會開始接收資料。
- Line 36: 程式會停在這一行等待訊息到達。
1 package idv.steven.ems; 2 3 import java.util.concurrent.Callable; 4 import java.util.concurrent.ExecutionException; 5 import java.util.concurrent.FutureTask; 6 7 import javax.jms.JMSException; 8 import javax.jms.TextMessage; 9 10 public class QueueMsg { 11 12 public static void main(String[] args) { 13 QueueProducer producer = new QueueProducer(); 14 FutureTask<TextMessage> producerTask = new FutureTask<TextMessage>(producer); 15 Thread tProducer = new Thread(producerTask); 16 tProducer.start(); 17 18 try { 19 Thread.sleep(1000); 20 } catch (InterruptedException e1) { 21 } 22 23 Callable<String> consumer = new QueueConsumer(); 24 FutureTask<String> consumerTask = new FutureTask<String>(consumer); 25 Thread tConsumer = new Thread(consumerTask); 26 tConsumer.start(); 27 28 try { 29 if (producerTask.isDone()) { 30 TextMessage msg = producerTask.get(); 31 if (msg != null) { 32 System.out.println("producer's message: " + msg.getText()); 33 } 34 } 35 36 try { 37 Thread.sleep(300); 38 } catch (InterruptedException e1) { 39 } 40 41 if (consumerTask.isDone()) { 42 String text = consumerTask.get(); 43 System.out.println("consumer's message: " + text); 44 } 45 } catch (InterruptedException | ExecutionException | JMSException e) { 46 e.printStackTrace(); 47 } 48 } 49 }這個測試程式很簡單的先啟動生產者,再啟動消費者,並輸出生產者送出的資料及消費者收到的資料,以確定雙方確實有正確的溝通。
- Publish-and-Subscribe (發佈、訂閱)
1 package idv.steven.ems; 2 3 import java.util.concurrent.Callable; 4 5 import javax.jms.Connection; 6 import javax.jms.ConnectionFactory; 7 import javax.jms.DeliveryMode; 8 import javax.jms.MapMessage; 9 import javax.jms.Session; 10 import javax.jms.Topic; 11 import javax.jms.TopicConnection; 12 import javax.jms.TopicConnectionFactory; 13 import javax.jms.TopicPublisher; 14 import javax.jms.TopicSession; 15 16 import com.tibco.tibjms.Tibjms; 17 import com.tibco.tibjms.TibjmsConnectionFactory; 18 import com.tibco.tibjms.TibjmsTopicConnectionFactory; 19 20 public class MyTopicPublisher implements Callable<String> { 21 private String userName = "steven", password = "paasword", url = "tcp://localhost:7222"; 22 private String topicName = "topic01"; 23 24 @Override 25 public String call() throws Exception { 26 TopicConnection connection = null; 27 28 Tibjms.setEncoding("Big5"); 29 TopicConnectionFactory factory = new TibjmsTopicConnectionFactory(url); 30 connection = factory.createTopicConnection(userName,password); 31 TopicSession session = connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE); 32 Topic topic = session.createTopic(topicName); 33 TopicPublisher publisher = session.createPublisher(topic); 34 35 MapMessage msg = session.createMapMessage(); 36 msg.setInt("age",35); 37 msg.setString("address","台北市瑞湖街68號15樓"); 38 msg.setStringProperty("JMS_TIBCO_MSG_TRACE","body"); 39 msg.setBooleanProperty("JMS_TIBCO_PRESERVE_UNDELIVERED",true); 40 msg.setIntProperty("Level",123); 41 msg.setStringProperty("Location","Taipei"); 42 43 publisher.publish(msg, DeliveryMode.PERSISTENT, 9, 1000000); 44 String msgID = msg.getJMSMessageID(); 45 46 connection.close(); 47 48 return msgID; 49 } 50 }
程式說明:
- Line 31: 這裡建立的 session,第 2 個參數之後會詳加討論。
- Line 43: 在這裡生產者將訊息發佈出去,這和 31 行間的關係,也會在之後詳加討論。
- Line 44: 每個訊息送出時,server 都會給予一個不會重複的訊息編號。
程式說明:1 package idv.steven.ems; 2 3 import java.util.Hashtable; 4 import java.util.concurrent.Callable; 5 6 import javax.jms.Connection; 7 import javax.jms.ConnectionFactory; 8 import javax.jms.Destination; 9 import javax.jms.ExceptionListener; 10 import javax.jms.JMSException; 11 import javax.jms.Message; 12 import javax.jms.MessageConsumer; 13 import javax.jms.Session; 14 import javax.jms.Topic; 15 import javax.jms.TopicConnection; 16 import javax.jms.TopicConnectionFactory; 17 import javax.jms.TopicSession; 18 import javax.jms.TopicSubscriber; 19 20 import com.tibco.tibjms.TibjmsTopicConnectionFactory; 21 22 public class MyTopicSubscriber implements Callable<Message>, ExceptionListener { 23 private String userName = "steven", password = "password", url = "tcp://localhost:7222"; 24 private String topicName = "topic01"; 25 26 @Override 27 public Message call() throws Exception { 28 Message msg = null; 29 30 try { 31 Hashtable prop = new Hashtable(); 32 prop.put("com.tibco.tibjms.reconnect.attemptcount", new Integer(2)); 33 prop.put("com.tibco.tibjms.reconnect.attemptdelay", new Integer(10)); 34 35 TopicConnectionFactory factory = new TibjmsTopicConnectionFactory(url, null, prop); 36 TopicConnection connection = factory.createTopicConnection(userName,password); 37 TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 38 Topic topic = session.createTopic(topicName); 39 TopicSubscriber subscriber = session.createSubscriber(topic); 40 41 connection.setExceptionListener(this); 42 connection.start(); 43 44 msg = subscriber.receive(); 45 46 if (msg != null) 47 System.out.println("Received message: "+ msg); 48 49 connection.close(); 50 } catch (JMSException e) { 51 e.printStackTrace(); 52 } 53 54 return msg; 55 } 56 57 @Override 58 public void onException(JMSException e) { 59 e.printStackTrace(); 60 } 61 }
- Line 42: 不管是佇列模式、主題模式,都需要呼叫 start() 才會開始收訊息。
- Line 44: 程式會停在這一行以等待傾聽的主題的訊息到達。
1 package idv.steven.ems; 2 3 import java.util.concurrent.Callable; 4 import java.util.concurrent.ExecutionException; 5 import java.util.concurrent.FutureTask; 6 7 import javax.jms.JMSException; 8 import javax.jms.Message; 9 10 public class TopicMsg { 11 12 public static void main(String[] args) { 13 Callable<Message> consumer = new MyTopicSubscriber(); 14 FutureTask<Message> consumerTask = new FutureTask<Message>(consumer); 15 Thread tConsumer = new Thread(consumerTask); 16 tConsumer.start(); 17 18 try { 19 Thread.sleep(300); 20 } catch (InterruptedException e1) { 21 } 22 23 Callable<String> producer = new MyTopicPublisher(); 24 FutureTask<String> producerTask = new FutureTask<String>(producer); 25 Thread tProducer = new Thread(producerTask); 26 tProducer.start(); 27 28 try { 29 Thread.sleep(300); 30 } catch (InterruptedException e1) { 31 } 32 33 if (consumerTask.isDone()) { 34 try { 35 Message msg = consumerTask.get(); 36 if (msg != null) { 37 System.out.println("consumer's msgID: " + msg.getJMSMessageID()); 38 } 39 else { 40 System.out.println("msg is null"); 41 } 42 } catch (InterruptedException | ExecutionException | JMSException e) { 43 e.printStackTrace(); 44 } 45 } 46 47 if (producerTask.isDone()) { 48 try { 49 System.out.println("producer's msgID: " + producerTask.get()); 50 } catch (InterruptedException | ExecutionException e) { 51 e.printStackTrace(); 52 } 53 } 54 } 55 }主題模式在測試時要注意的就是,要確定消費者已經是在 active 狀態 (執行到 receive()),生產者再送出訊息,這樣消費者才會收到,如果生產者送出訊息時,消費者還沒有在 active 狀態,這個訊息就消失了,沒有人收到。
沒有留言:
張貼留言