Google Code Prettify

2015年8月30日 星期日

JMS: 訊息的標頭、屬性和種類

上一篇主要是讓初次接觸 JMS 的人快速了解 JMS,對於一些細節略而不談,JMS 的訊息結構分三部份:
  1. Header (required) - 標頭
  2. Properties (optional) - 屬性
  3. Body (optional) - 訊息本體
這一篇先說明訊息的標頭、屬性和訊息本體的種類。
  • Header
JMS 的標準定義了以下的 header ...
項目說明
1JMSDestination 傳送的目的地有可能是 quque 或 topic,client 程式可以利用這個 header,如下取得 queue 或 topic 值:
Topic destination = (Topic) message.getJMSDestination();
2JMSDeliveryMode 傳送模式有兩種 - DeliveryMode.PERSISTENT 及 DeliveryMode.NON_PERSISTENT,這個標頭值可以在 producer (生產者) 端以 setJMSDeliverMode(DeliveryMode.PERSISTENT); 的方式設定。當訊息為 PERSISTENT 時,訊息至少會被 server 傳送出一次,萬一傳送過程 server 出問題,當 server 服務恢復後,就會重傳,這表示,消費者端有可能同一個訊息收到兩次; 當訊息為 NON_PERSISTENT,訊息永遠只會被送出一次,所以,萬一因各種原因沒有送到消費者,也不會重送。
3JMSMessageID JMSMessageID 的型別是 String,是由 server 端產生的一個 unique 字串,JMS 沒有規範產生的規則,所以不同廠商的 server 產生的 JMSMessageID 的規則會不相同,但都會是唯一。client 要取得這個值的方法如下:
String messageId = message.getJMSMessageID();
4JMSTimestamp這是個 long 型別的值,當訊息由生產者送到 server 端時,EMS (或其它品牌的 message server) 會設定給它當時系統時間的值,即 JMSTimestamp 所記錄的是 server 收到 producer 送來訊息的時間。
5JMSExpiration也是一個長整數的值,記錄何時這個訊息就會失效,設定這個標頭值的方法是在生產者端呼叫 setJMSExpiration(...) 或 setTimeToLive(...),如果傳入的值為 0,表示永遠不失效。
6JMSRedelivered值為 true 或 false,用來表示 server 如果沒有收到消費者端的 acknowledge,訊息是否重傳。
7JMSPriority這個值由0~9 表示訊息的重要性,數字越大表示越重要,由生產者設定,一般的訊息應設定在 0~4,重要的訊息則設定其值為 5~9。
8JMSReplyTo這個值由生產者端設定,要求消費者端回覆時,回覆到那一個 queue 或 topic,所以在消費者端一定會使用 getJMSReplyTo() 取得值。
9JMSCorrelationID這個值用來標示訊息間的關係,一種普遍的作法是,消費者端回覆訊息給生產者時,將生產者傳來的訊息的 message id 設定到 JMSCorrelationID 再回覆給生產者,這樣生產者就知道這次回覆的訊息是那一個訊息的回覆訊息。
10JMSType用來標示訊息本體是那一種型別,可能的值有 MapMessage、TextMessage、ByteMessage 等。




  • Properties
程式開發者可以利用屬性欄位來傳送一些值,作為在生產者與消費者間溝通之用。屬性是 key、value 方式設定的,所以可能的方法如下:
TestMessage msg = session.createTextMessage();
msg.setText("just for test");
msg.setStringProperty("username", "steven");
publisher.publish(msg); 
屬性的型別可以是 boolean、byte、short、int、long、float、double、String 中任一種。JMS 定義了以下的幾個屬性,但除了 JMSXGroupID 及 JMSXGroupSeq 強制廠商一定要支援外,其餘不一定要支援,所以可能你選用的產品有部份的屬性是沒有的。
  1. JMSXUserID
  2. JMSXAppID
  3. JMSProducerTXID
  4. JMSConsumerTXID
  5. JMSRcvTimestamp
  6. JMSXDeliveryCount
  7. JMSXState
  8. JMSGroupID
  9. JMSXGroupSeq
另外,Tibco EMS 自定義了以下的屬性:
  1. JMS_TIBCO_CM_PUBLISHER
  2. JMS_TIBCO_CM_SEQUENCE
  3. JMS_TIBCO_COMPRESS
  4. JMS_TIBCO_DISABLE_SENDER
  5. JMS_TIBCO_IMPORTED
  6. JMS_TIBCO_MSG_EXT
  7. JMS_TIBCO_MSG_TRACE
  8. JMS_TIBCO_PRESERVE_UNDELIVERED
  9. JMS_TIBCO_SENDER
  10. JMS_TIBCO_SS_SENDER
  • Types
  1. TextMessage
  2. StreamMessage
  3. MapMessage
  4. ObjectMessage
  5. BytesMessage
這是 JMS 定義的五個介面,這五個介面都繼承 Message 這個介面,這是 body 的五個種類。

JMS: getting started

Tibco EMS 是眾多遵循 JMS (Java Message Service) 規格的產品之一,這裡要整理的 JMS 相關資料,會以 EMS 來寫相關 sample code,所以如果有 EMS 的特異功能,也會儘量說明 (如果我知道的話)。

JMS 提供兩種訊息模式 (Messaging Models) - Point-to-point 及 Publish-and-subscribe (如下圖),這兩種模式有什麼不同?
在開始說明之前,先說明一個習慣說法,一般來說,在左邊送出訊息的稱為"生產者" (producer),在右邊收訊息的稱為"消費者" (consumer)。
  • Point-to-Point (點對點)
  1. 點對點一定是佇列模式 (queue),生產者送訊息到佇列,消費者由佇列取得訊息。
  2. 一個佇列可以有多個生產者送訊息過來,有多個消費者來取出訊息,但是,一個訊息是由一個生產者送出到佇列後,只會由一個消費者取得。
  3. 透過這個方式,在負載平衡上的運用很廣泛,生產者將要處理的訊息放入佇列,如果消費者取出訊息後,需要花比較長的時間處理,就可以創建多個消費者來處理這些訊息,每處理完一個就再去佇列中取出未處理的資料繼續處理,如此就可將負載分配到各個消費者。
  • Publish-and-Subscribe (發行者和訂閱者)
  1. 發佈訂閱是 topic 模式,生產者發佈指定 topic (主題) 的訊息到 message server,所有訂閱該 topic 的消費者即會由 server 收到該訊息。
  2. 同一個 topic 可以同時有多個消費者產生,產生後也都會送給所有訂閱的消費者。
  3. 當生產者送出訊息時,消費者必須是在 active 狀態 (等待接收訊息的狀態),消費者才能接到訊息,否則即沒有收到該訊息。
  4. 如果要確定消費者一定可以收到訊息,可以創建"durable"的消費者(訂閱者),那麼訊息在發佈時,如果該消費者沒有 active,訊息會先放在 server 上,直到出現下列三個條件之一,訊息才會被刪除。(1) 訊息過期 (訊息送出前可以設定過期時間),(2) 訊息被消費者取走,(3) server 的儲存空間超過限制。
  5. 當 client 重新創建相同的 ID 的 durable 消費者,server 上此消費者有訂閱該 topic  的訊息都會送給這個消費者。
JMS 本身是一個規範,定義了許多介面,由各家廠商實作這些介面,由上面的說明看起來,雖然兩種模式的運作有許多的不同,但是 JMS API 則是差不多的,程式的寫法有差距不大,下表是 Point-to-Point 和 Publis-and-Subscribe 間的差別:

General APIPoint-to-Point APIPublish-and-Subscribe API
ConnectionFactoryQueueConnectionFactoryTopicConnectionFactory
DestinationQueueTopic
ConnectionQueueConnectionTopicConnection
SessionQueueSessionTopicSession
MessageConsumerQueueSenderTopicPublisher
MessageProducerQueueReceiverTopicSubscriber






上面說明的是一些最基本的觀念,在開始寫程式前,當然要先把環境搞定,我在電腦裡,安裝了 Tibco EMS 8.1 development 版本,首先開啟「EMS Administration Tool」,用它來建立一個群組、一個使用者、一個佇列、一個主題(topic),當然要讓這個使用者可以存取佇列和主題。
  • connect: 連線到 EMS,預設會用 admin,密碼為空白,EMS 預設的 port 是 7222,如下圖。
  • create group ap "programmer": 建立一個命名為 ap 的群組,寫程式的人就加入這個群組。(create group group_name ["description"])
  • create user steven "programmer01" password=password: 建立一個帳號 steven,密碼為 password。(create user user_name ["description"] [password=password]
  • add member ap steven: 將 steven 加入 ap 群組。(add member group_name user_name [,user2, user3,...])
  • create queue queue01: 建立一個命名為 queue01 的佇列。(create queue queue_name [properties])
  • grant queue queue01 group=ap send,receive,browse: 授權給群組 ap,可以送訊息到 queue01、從 queue01 收訊息及瀏覽 queue01 的訊息。最後一個參數是權限,有三個值 send、receive、browse,如上面所示,要一次授權多種權限,用逗點分隔。(grant queue queue-name user=name | group=name permissions)
  • create topic topic01: 建立一個命名為 topic01 的主題。(create topic topic_name [properties])
  • grant topic topic01 group=ap subscribe,publish,durable,use_durable: 授權給群組 ap,可以有 subscribe (訂閱)、publish (發佈)、durable (持久的)、use_durable (使用持久的) 權限。(grant topic topic-name user=name | group=name permissions)
(EMS Administration Tool 的各種指令,請參閱官方文件「TIBCO Enterprise Message Service User's Guide」第六章。)

接下來看看程式怎麼寫 ...
  • Point-to-Point (點對點)
** 生產者
1 package idv.steven.ems;
 2 
 3 import java.util.concurrent.Callable;
 4 
 5 import javax.jms.Connection;
 6 import javax.jms.ConnectionFactory;
 7 import javax.jms.DeliveryMode;
 8 import javax.jms.Destination;
 9 import javax.jms.MessageProducer;
10 import javax.jms.Session;
11 import javax.jms.TextMessage;
12 
13 public class QueueProducer implements Callable<TextMessage> {
14     private String userName = "steven", password = "password", url = "tcp://localhost:7222";
15     private String queueName = "queue01";
16 
17     @Override
18     public TextMessage call() throws Exception {
19         Connection connection   = null;
20         
21         ConnectionFactory factory = new com.tibco.tibjms.TibjmsConnectionFactory(url);
22         connection = factory.createConnection(userName,password);
23         Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
24         Destination destination = session.createQueue(queueName);
25         MessageProducer msgProducer = session.createProducer(null);
26         TextMessage msg;
27         msg = session.createTextMessage();
28         msg.setText("just for test");
29         msg.setStringProperty("CustomerType","VIP");
30         msg.setIntProperty("value",123);
31         msg.setIntProperty("Age",18);
32         msg.setStringProperty("Location","Taipei");
33         msgProducer.send(destination, msg, DeliveryMode.PERSISTENT, 9, 60000);
34         System.out.println(msg);
35         
36         connection.close();
37         
38         return msg;
39     }
40 }


程式說明:
  1. Line 21: 先建立一個連線的工廠類別,由這行程式清楚的看到,各廠商實作了 JMS 的介面。
  2. Line 22: 由連線工廠類別產生連線。
  3. Line 23: 由連線產生 session,session 是非常重要的一個工廠類別,由之後的程式可以看到,程式會用這個類別產生 queue、topic、producer、consumer … createSession() 的第二個參數是關於 acknowledge 的設定,關於這個設定,在初學階段都先設定為 AUTO_ACKNOWLEDGE,之後會再說明其它用法。
  4. Line 24: 不管是佇列模式或主題模式,都會產生這個 Destination 介面的實作類別,現在要用佇列模式,就用 session 產生一個佇列,如果是要用主題模式,就用 session 產生一個主題。這樣生產者送出訊息時,可不用在意是以佇列模式或主題模式送出。
  5. Line 28~32: 28 行是設定訊息 body 的值,29~32行是設定訊息的屬性值,設定 body 和屬性的 setter method 的名稱差別在於,設定屬性的 setter method 名稱是以 Property 為結尾,請參考「JMS: 訊息的標頭、屬性和種類」。
  6. Line 33: 將訊息送出到 Destination 介面的物件,在這裡當然就是送到佇列。
** 消費者
 1 package idv.steven.ems;
 2 
 3 import java.util.Hashtable;
 4 import java.util.concurrent.Callable;
 5 
 6 import javax.jms.Connection;
 7 import javax.jms.ConnectionFactory;
 8 import javax.jms.Destination;
 9 import javax.jms.ExceptionListener;
10 import javax.jms.JMSException;
11 import javax.jms.Message;
12 import javax.jms.MessageConsumer;
13 import javax.jms.Session;
14 
15 public class QueueConsumer implements Callable<String>, ExceptionListener {
16     private String userName = "steven", password = "password", url = "tcp://localhost:7222";
17     private String queueName = "queue01";
18     
19     @Override
20     public String call() throws Exception {
21         Connection      connection  = null;
22         
23         Hashtable prop = new Hashtable();
24         prop.put("com.tibco.tibjms.reconnect.attemptcount", new Integer(10));
25         prop.put("com.tibco.tibjms.reconnect.attemptdelay", new Integer(1000));
26         ConnectionFactory factory = new com.tibco.tibjms.TibjmsConnectionFactory(url,null,prop);
27         connection = factory.createConnection(userName,password);
28         
29         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
30         Destination destination = session.createQueue(queueName);
31         connection.setExceptionListener(this);
32         MessageConsumer msgConsumer = session.createConsumer(destination);
33         
34         connection.start();
35         
36         Message msg = msgConsumer.receive();
37 
38         if (msg != null)
39             System.err.println("Received message: "+ msg);
40         
41         connection.close();
42             
43         return msg.getJMSMessageID();
44     }
45 
46     @Override
47     public void onException(JMSException ex) {
48         ex.printStackTrace();
49     }
50 }
程式說明:
  1. Line 23~27: 建立連線,這裡在建立過程有傳入一些參數,這會在後續再說明。
  2. Line 34: 這一行很重要,呼叫 start() 後,才會開始接收資料。
  3. Line 36: 程式會停在這一行等待訊息到達。
** 測試程式
 1 package idv.steven.ems;
 2 
 3 import java.util.concurrent.Callable;
 4 import java.util.concurrent.ExecutionException;
 5 import java.util.concurrent.FutureTask;
 6 
 7 import javax.jms.JMSException;
 8 import javax.jms.TextMessage;
 9 
10 public class QueueMsg {
11 
12     public static void main(String[] args) {
13         QueueProducer producer = new QueueProducer();
14         FutureTask<TextMessage> producerTask = new FutureTask<TextMessage>(producer);
15         Thread tProducer = new Thread(producerTask);
16         tProducer.start();
17         
18         try {
19             Thread.sleep(1000);
20         } catch (InterruptedException e1) {
21         }
22 
23         Callable<String> consumer = new QueueConsumer();
24         FutureTask<String> consumerTask = new FutureTask<String>(consumer);
25         Thread tConsumer = new Thread(consumerTask);
26         tConsumer.start();        
27         
28         try {
29             if (producerTask.isDone()) {
30                 TextMessage msg = producerTask.get();
31                 if (msg != null) {
32                     System.out.println("producer's message: " + msg.getText());
33                 }
34             }
35             
36             try {
37                 Thread.sleep(300);
38             } catch (InterruptedException e1) {
39             }
40             
41             if (consumerTask.isDone()) {
42                 String text = consumerTask.get();
43                 System.out.println("consumer's message: " + text);
44             }
45         } catch (InterruptedException | ExecutionException | JMSException e) {
46             e.printStackTrace();
47         }
48     }
49 }
這個測試程式很簡單的先啟動生產者,再啟動消費者,並輸出生產者送出的資料及消費者收到的資料,以確定雙方確實有正確的溝通。
  • Publish-and-Subscribe (發佈、訂閱)
** 生產者
 1 package idv.steven.ems;
 2 
 3 import java.util.concurrent.Callable;
 4 
 5 import javax.jms.Connection;
 6 import javax.jms.ConnectionFactory;
 7 import javax.jms.DeliveryMode;
 8 import javax.jms.MapMessage;
 9 import javax.jms.Session;
10 import javax.jms.Topic;
11 import javax.jms.TopicConnection;
12 import javax.jms.TopicConnectionFactory;
13 import javax.jms.TopicPublisher;
14 import javax.jms.TopicSession;
15 
16 import com.tibco.tibjms.Tibjms;
17 import com.tibco.tibjms.TibjmsConnectionFactory;
18 import com.tibco.tibjms.TibjmsTopicConnectionFactory;
19 
20 public class MyTopicPublisher implements Callable<String> {
21     private String userName = "steven", password = "paasword", url = "tcp://localhost:7222";
22     private String topicName = "topic01";
23     
24     @Override
25     public String call() throws Exception {
26         TopicConnection connection = null;
27         
28         Tibjms.setEncoding("Big5");
29         TopicConnectionFactory factory = new TibjmsTopicConnectionFactory(url);
30         connection = factory.createTopicConnection(userName,password);
31         TopicSession session = connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
32         Topic topic = session.createTopic(topicName);
33         TopicPublisher publisher = session.createPublisher(topic);
34         
35         MapMessage msg = session.createMapMessage();
36         msg.setInt("age",35);
37         msg.setString("address","台北市瑞湖街68號15樓");
38         msg.setStringProperty("JMS_TIBCO_MSG_TRACE","body");
39         msg.setBooleanProperty("JMS_TIBCO_PRESERVE_UNDELIVERED",true);
40         msg.setIntProperty("Level",123);
41         msg.setStringProperty("Location","Taipei");
42         
43         publisher.publish(msg, DeliveryMode.PERSISTENT, 9, 1000000);
44         String msgID = msg.getJMSMessageID();
45         
46         connection.close();
47         
48         return msgID;
49     }
50 }
程式說明:
  1. Line 31: 這裡建立的 session,第 2 個參數之後會詳加討論。
  2. Line 43: 在這裡生產者將訊息發佈出去,這和 31 行間的關係,也會在之後詳加討論。
  3. Line 44: 每個訊息送出時,server 都會給予一個不會重複的訊息編號。
** 消費者
 1 package idv.steven.ems;
 2 
 3 import java.util.Hashtable;
 4 import java.util.concurrent.Callable;
 5 
 6 import javax.jms.Connection;
 7 import javax.jms.ConnectionFactory;
 8 import javax.jms.Destination;
 9 import javax.jms.ExceptionListener;
10 import javax.jms.JMSException;
11 import javax.jms.Message;
12 import javax.jms.MessageConsumer;
13 import javax.jms.Session;
14 import javax.jms.Topic;
15 import javax.jms.TopicConnection;
16 import javax.jms.TopicConnectionFactory;
17 import javax.jms.TopicSession;
18 import javax.jms.TopicSubscriber;
19 
20 import com.tibco.tibjms.TibjmsTopicConnectionFactory;
21 
22 public class MyTopicSubscriber implements Callable<Message>, ExceptionListener {
23     private String userName = "steven", password = "password", url = "tcp://localhost:7222";
24     private String topicName = "topic01";
25     
26     @Override
27     public Message call() throws Exception {
28         Message msg = null;
29         
30         try {
31             Hashtable prop = new Hashtable();
32             prop.put("com.tibco.tibjms.reconnect.attemptcount", new Integer(2));
33             prop.put("com.tibco.tibjms.reconnect.attemptdelay", new Integer(10));
34             
35             TopicConnectionFactory factory = new TibjmsTopicConnectionFactory(url, null, prop);
36             TopicConnection connection = factory.createTopicConnection(userName,password);
37             TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
38             Topic topic = session.createTopic(topicName);
39             TopicSubscriber subscriber = session.createSubscriber(topic);
40             
41             connection.setExceptionListener(this);
42             connection.start();
43             
44             msg = subscriber.receive();
45             
46             if (msg != null)
47                 System.out.println("Received message: "+ msg);
48 
49             connection.close();
50         } catch (JMSException e) {
51             e.printStackTrace();
52         }
53         
54         return msg;
55     }
56     
57     @Override
58     public void onException(JMSException e) {
59         e.printStackTrace();
60     }
61 }
程式說明:
  1. Line 42: 不管是佇列模式、主題模式,都需要呼叫 start() 才會開始收訊息。
  2. Line 44: 程式會停在這一行以等待傾聽的主題的訊息到達。
** 測試程式
 1 package idv.steven.ems;
 2 
 3 import java.util.concurrent.Callable;
 4 import java.util.concurrent.ExecutionException;
 5 import java.util.concurrent.FutureTask;
 6 
 7 import javax.jms.JMSException;
 8 import javax.jms.Message;
 9 
10 public class TopicMsg {
11 
12     public static void main(String[] args) {
13         Callable<Message> consumer = new MyTopicSubscriber();
14         FutureTask<Message> consumerTask = new FutureTask<Message>(consumer);
15         Thread tConsumer = new Thread(consumerTask);
16         tConsumer.start();
17         
18         try {
19             Thread.sleep(300);
20         } catch (InterruptedException e1) {
21         }
22         
23         Callable<String> producer = new MyTopicPublisher();
24         FutureTask<String> producerTask = new FutureTask<String>(producer);
25         Thread tProducer = new Thread(producerTask);
26         tProducer.start();
27         
28         try {
29             Thread.sleep(300);
30         } catch (InterruptedException e1) {
31         }
32         
33         if (consumerTask.isDone()) {
34             try {
35                 Message msg = consumerTask.get();
36                 if (msg != null) {
37                     System.out.println("consumer's msgID: " + msg.getJMSMessageID());
38                 }
39                 else {
40                     System.out.println("msg is null");
41                 }
42             } catch (InterruptedException | ExecutionException | JMSException e) {
43                 e.printStackTrace();
44             }
45         }
46         
47         if (producerTask.isDone()) {
48             try {
49                 System.out.println("producer's msgID: " + producerTask.get());
50             } catch (InterruptedException | ExecutionException e) {
51                 e.printStackTrace();
52             }
53         }
54     }
55 }
主題模式在測試時要注意的就是,要確定消費者已經是在 active 狀態 (執行到 receive()),生產者再送出訊息,這樣消費者才會收到,如果生產者送出訊息時,消費者還沒有在 active 狀態,這個訊息就消失了,沒有人收到。

2015年8月16日 星期日

java.util.concurrent - ExecutorService

Java 1.x 時 Java 可以說只是個寫 applet 的語言,Java 1.2 之後成為寫 web 後端程式最重要的語言,Java 1.4 引入 NIO 讓讀寫更有效率,Java 5 最大的改變應該就是增加 java.util.concurrent 這個 package,對平行處理 (多執行緒) 提供完整的支援,這個 package 的內容一直到 Java 8 都持續增加,使其更加完善。這一篇要介紹的是 ExecutorService,類別圖如下: (如往例,這不是完整的類別圖,只是這篇會說明到的部份。)
ExecutorService 是個介面,繼承了 Executor 介面,Executor 介面只定義了一個稱為 execute() 的 method,實作 Runnable 的類別物件,可以將本身委派給實作 ExecutorService 的類別物件產生另一個執行緒,進行平行運算。
接下來看看範例程式;
 1 package idv.steven.concurrency;
 2 
 3 import java.util.concurrent.ExecutorService;
 4 import java.util.concurrent.Executors;
 5 
 6 public class ExecutorDemo {
 7 
 8     public static void main(String[] args) {
 9         ExecutorService executorService = Executors.newSingleThreadExecutor();
10 
11         executorService.execute(new Runnable() {
12             public void run() {
13                 System.out.println("Asynchronous task");
14             }
15         });
16 
17         executorService.shutdown();
18     }
19 }
程式說明:
  • Executors 是一個工廠類別,定義了許多 static 的 method,用來產生與 java.util.cuncurrent 套件中相關的類別,這裡 (line 9) 用來產生一個 ExecutorService 的物件。
  • 在第 11~15 行中,有一個匿名的 Runnable 物件,當然,如果程式很大,就不要用匿名的寫法,這裡因為只是一行 output,用匿名比較簡單。這個物件傳入 executorService 物件中會產生一個新的 thread。
  • executorService 執行完要自行關閉,即第 17 行呼叫 shutdown(),呼叫 shutdown() 後,就不能再傳別的 Runnable 給這個 executorService 物件了,否則會產生 exception。 
  • 第 9 行也可以改寫成 ExecutorService executorService = Executors.newFixedThreadPool(10); 一般來說,會使用 thread pool 的狀況是,程式會有很多 thread,且這些 thread 的執行時間很短,在這種狀況下要系統一直建立新的 thread 顯然會花很多時間,所以使用 thread pool。另外,任何一個系統也不能無限制的建立 thread,否則各個 thread 間頻繁的 context switch 反而會拖垮系統效能,所以會給它一個最大值。
  • ExecutorService 中的 shutdown() 和 shutdownNow() 這兩個 method 有什麼差別? shutdown() 會執行完已委派的 Runnable 物件後才將 ExecutorService 關閉,shutdownNow 則會立刻關閉,尚未被執行的 Runnable 物件則以 List<Runnable> 傳回。




上面的程式我們要來改寫一下,注意看類別圖,ExecutorService 中有兩個 submit() method,一個傳入的參數是  Runnable 物件,另一個傳入的是 Callable 物件,使用 submit 也可以產生一個新的執行緒,先看一下傳入 Runnable 物件的程式。
 1 package idv.steven.concurrency;
 2 
 3 import java.util.concurrent.ExecutorService;
 4 import java.util.concurrent.Executors;
 5 import java.util.concurrent.Future;
 6 
 7 public class ExecutorDemo {
 8 
 9     public static void main(String[] args) {
10         ExecutorService executorService = Executors.newSingleThreadExecutor();
11 
12         Future future = executorService.submit(new Runnable() {
13             public void run() {
14                 System.out.println("Asynchronous task");
15             }
16         });
17 
18         executorService.shutdown();
19     }
20 }
修改的部份是第 12~ 16 行,得到的結果與第一個程式是完全一樣的! 再來看一下傳入 Callable 物件的程式。
 1 package idv.steven.concurrency;
 2 
 3 import java.util.concurrent.Callable;
 4 import java.util.concurrent.ExecutionException;
 5 import java.util.concurrent.ExecutorService;
 6 import java.util.concurrent.Executors;
 7 import java.util.concurrent.Future;
 8 
 9 public class ExecutorDemo {
10 
11     public static void main(String[] args) throws InterruptedException, ExecutionException {
12         ExecutorService executorService = Executors.newSingleThreadExecutor();
13 
14         Future future = executorService.submit(new Callable(){
15             public Object call() throws Exception {
16                 System.out.println("Asynchronous Callable");
17                 return "Callable Result";
18             }
19         });
20 
21         System.out.println("future.get() = " + future.get());
22 
23         executorService.shutdown();
24     }
25 }
傳入 Runnable 物件和傳入 Callable 物件最大的差別就在於,傳入 Callable 物件可以有傳回值,所以當 21 行呼叫 future.get() 時,可以取得 call() 的傳回值 (line 17),如果是傳入 Runnable 物件,呼叫 get() method 會傳回 null。