Google Code Prettify

2015年9月8日 星期二

NIO.2: UDP 網路程式設計 (multicast)

同樣是採用 UDP 通訊協定,multicast 和前一篇 (UDP 網路程式設計 (request / response)) 點對點的傳播方式有什麼不同?
  1. multicast 是多點傳播,在 IPv4 時有廣播和群播,到了 IPv6 只有群播。群播即事先加入群組的消費者,就可以收到生產者廣播出來的資料。
  2. multicast 的參數中有個 TTL 值,這個值介於 1~255 之間,當封包每通過一個 router,就會被減 1 (有些 router 會減 2 或更多),當值為 0 時即停止傳播。
UDP 雖然是不保證送達的通訊協定,但是少了確認的封包,速度比 TCP 快約 3 倍,當一個生產者要同時送給非常多消費者時,TCP 很快就會拖垮伺服器的效能,UDP 則可以在消費者判斷漏收了資料時,才要求生產者重送,大量減少網路流量,又能保證送達,當然,程式會複雜許多。

在 NIO.2 中,UDP 程式都是使用 DatagramChannel 這個類別,回顧前一篇的類別圖會發現,點對點傳播時沒有用到 MulticastChannel 介面所定義的 method,從名稱就知道,這是給 multicast 使用的。

在開始說明 multicast 程式前,請先執行如下 oracle 附的程式:
import java.io.*;
import java.net.*;
import java.util.*;
import static java.lang.System.out;

public class ListNets {

    public static void main(String args[]) throws SocketException {
        Enumeration<NetworkInterface> nets = NetworkInterface.getNetworkInterfaces();
        for (NetworkInterface netint : Collections.list(nets))
            displayInterfaceInformation(netint);
    }

    static void displayInterfaceInformation(NetworkInterface netint) throws SocketException {
        out.printf("Display name: %s\n", netint.getDisplayName());
        out.printf("Name: %s\n", netint.getName());
        Enumeration<InetAddress> inetAddresses = netint.getInetAddresses();
        for (InetAddress inetAddress : Collections.list(inetAddresses)) {
            out.printf("InetAddress: %s\n", inetAddress);
        }
        out.printf("\n");
     }
}
這個程式會列出電腦上所有的網卡及其名稱,這個非常重要,因為 multicast 程式需指明要從那一張網卡收、送資料,接下來看看程式,底下的程式 server 會送出 10 次的系統時間,client 收到後將它輸出到 console,先看輸出結果,應該會類似如下:




  • Server
 1 package idv.steven.udp;
 2 
 3 import java.io.IOException;
 4 import java.net.InetAddress;
 5 import java.net.InetSocketAddress;
 6 import java.net.NetworkInterface;
 7 import java.net.StandardProtocolFamily;
 8 import java.net.StandardSocketOptions;
 9 import java.nio.ByteBuffer;
10 import java.nio.channels.DatagramChannel;
11 import java.util.Date;
12 
13 public class MulticastServer {
14     public static void main(String[] args) {
15         final int DEFAULT_PORT = 5555;
16         final String GROUP = "225.5.6.6";
17         ByteBuffer datetime;
18         
19         // 開啟一個 IPv4 的資料封包通道
20         try (DatagramChannel datagramChannel = DatagramChannel.open(StandardProtocolFamily.INET)) {
21             // 檢查是否成功開啟
22             if (datagramChannel.isOpen()) {
23                 // 取得名稱為 eth3 的網卡
24                 NetworkInterface networkInterface = NetworkInterface.getByName("eth3");
25                 // 設定 multicast 要使用的網卡
26                 datagramChannel.setOption(StandardSocketOptions.IP_MULTICAST_IF, networkInterface);
27                 datagramChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
28                 // 將 DatagramChannel 繫結到指定的埠號,要注意,這時還不需要指定 IP 群組。
29                 datagramChannel.bind(new InetSocketAddress(DEFAULT_PORT));
30                 
31                 for(int i=0; i<10; i++) {
32                     // 每 10 秒送出一筆資料
33                     try {
34                         Thread.sleep(10000); // 睡 10 秒
35                     } catch (InterruptedException ex) {}
36                     
37                     datetime = ByteBuffer.wrap(new Date().toString().getBytes());
38                     // 送出時才指定要送到那個 IP 群組,這表示 bind 後,
39                     // 還是可以看資料特性,送到不同 IP 群組。
40                     datagramChannel.send(datetime, new
41                             InetSocketAddress(InetAddress.getByName(GROUP), DEFAULT_PORT));
42                     datetime.flip();
43                 }
44             } else {
45                 System.out.println("通道開啟失敗");
46             }
47         } catch (IOException ex) {
48             ex.printStackTrace();
49         }
50     }
51 }
  • Client
 1 package idv.steven.udp;
 2 
 3 import java.io.IOException;
 4 import java.net.InetAddress;
 5 import java.net.InetSocketAddress;
 6 import java.net.NetworkInterface;
 7 import java.net.StandardProtocolFamily;
 8 import java.net.StandardSocketOptions;
 9 import java.nio.ByteBuffer;
10 import java.nio.CharBuffer;
11 import java.nio.channels.DatagramChannel;
12 import java.nio.channels.MembershipKey;
13 import java.nio.charset.Charset;
14 import java.nio.charset.CharsetDecoder;
15 
16 public class MulticastClient {
17 
18     public static void main(String[] args) {
19         final int DEFAULT_PORT = 5555;
20         final int MAX_PACKET_SIZE = 65507;
21         final String GROUP = "225.5.6.6";
22         
23         CharBuffer charBuffer = null;
24         Charset charset = Charset.defaultCharset();
25         CharsetDecoder decoder = charset.newDecoder();
26         ByteBuffer datetime = ByteBuffer.allocateDirect(MAX_PACKET_SIZE);
27         
28         try (DatagramChannel datagramChannel = DatagramChannel.open(StandardProtocolFamily.INET)) {
29             // 要傾聽的 IP 群組
30             InetAddress group = InetAddress.getByName(GROUP);
31             // 檢查是否為合法的多址傳播群組
32             if (group.isMulticastAddress()) {
33                 // 檢查是否成功開啟通道
34                 if (datagramChannel.isOpen()) {
35                     // 設定要傾聽的網卡
36                     NetworkInterface networkInterface = NetworkInterface.getByName("eth3");
37                     datagramChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
38                     // 將 DatagramChannel 繫結到指定的埠號,要注意,這時還不需要指定 IP 群組。
39                     datagramChannel.bind(new InetSocketAddress(DEFAULT_PORT));
40                     // 加入要傾聽的群組
41                     MembershipKey key = datagramChannel.join(group, networkInterface);
42                     
43                     while (true) {
44                         if (key.isValid()) {
45                             // 開啟等待 server 端送來的封包
46                             datagramChannel.receive(datetime);
47                             datetime.flip();
48                             charBuffer = decoder.decode(datetime);
49                             System.out.println(charBuffer.toString());
50                             datetime.clear();
51                         } else {
52                             break;
53                         }
54                     }
55                 } else {
56                     System.out.println("通道無法開啟");
57                 }
58             } else {
59                 System.out.println("這個 IP 不是合法的多址傳播 IP");
60             }
61         } catch (IOException ex) {
62             ex.printStackTrace();
63         }
64     }
65 }

2015年9月6日 星期日

NIO.2: UDP 網路程式設計 (request / response)

一般人寫網路程式比較少用到 UDP,大部份的書著墨不多,NIO.2 的 UDP API 也一直到 Java 7 出來後,DatagramChannel 相關的 API 才趨於完整,至於眾所期盼的非同步 UDP,一直到 Java 8 都還沒有被加入。
在開始說明 DatagramChannel 之前,先說明一下 UDP 的幾個重要觀念:
  1. IP 封包的 header 長度為 20 bytes、UDP 封包的 header 長度為 8 bytes,所以,每個封包最大為 65507 bytes (65535 - 28 = 65507)。
  2. 以 UDP 進行廣播時,其群組的 IP 位址的範圍 (IPv4) 為 224.0.0.1 ~ 239.255.255.255,其中 224.0.0.1 為保留 IP,一般的程式不可使用。
  3. 當封包數量超過緩衝區的容量時,額外的封包會被拋棄,而且不會有任何通知! (所以,UDP 網路傳輸,量大的時候,掉幾個封包是正常的 …)
上面的第 3 點看起來好像 UDP 不太實用? 其實並不會! UDP 的速度比 TCP 快很多,當傳輸的資料量非常大,且偶而掉幾個封包不影響結果時,是很好用的! 像是台灣證券交易所、櫃買中心,他們每天台股開盤到收盤間,傳送的股價資料就是用 UDP,因為股票交易非常頻繁,在整個交易時間內,股價會一直波動,如果不用 UDP,要即時的傳送那麼大量的資料會有困難,而且因為股價一直波動,偶而掉一兩個封包並不會影響輸出的結果,像是台積電,每天有大量交易,如果在 10:15:20.025 這個時間交易成功一筆,股價是 130.5 元,這個封包掉了,在 10:15:20.030 這個時間又有新的交易,股價為 131 元,在那麼短的瞬間少顯示一個價位,在線圖上並沒有影響。

雖然 UDP 用在廣播、群播的情況較多,但是為了效率點對點傳輸也可以採用 UDP,所以這篇先說明點對點傳輸。那麼,現在進入主題,底下是 DatagramChannel 的類別圖,寫程式時,記得回頭來看這張類別圖,會對於 DatagramChannel 的整體架構更有感覺。






底下的範例程式是一個 echo server 和 echo client,由 echo client 送出一段字串給 echo server,echo server 原封不動的回傳給 client,程式的說明直接寫在註解裡。
  • Echo Server
1 package idv.steven.udp;
 2 
 3 import java.net.InetSocketAddress;
 4 import java.net.SocketAddress;
 5 import java.net.StandardProtocolFamily;
 6 import java.net.StandardSocketOptions;
 7 import java.nio.ByteBuffer;
 8 import java.nio.channels.DatagramChannel;
 9 
10 public class EchoServer {
11     public static void main(String[] args) {
12         final int LOCAL_PORT = 7335;
13         final String LOCAL_IP = "127.0.0.1";
14         final int MAX_PACKET_SIZE = 65507;
15         
16         ByteBuffer buffer = ByteBuffer.allocateDirect(MAX_PACKET_SIZE);
17         // DatagramChannel 是一個 abstract class,沒辦法用 new 創建,而要用它的 static method - open。
18         // StandardProtocolFamily 有兩個常數 INET、INET6,分別表示要用 IPv4 或 IPv6。
19         try (DatagramChannel datagramChannel = DatagramChannel.open(StandardProtocolFamily.INET)) {
20             // 檢查 channel 是否成功被開啟
21             if (datagramChannel.isOpen()) {
22                 System.out.println("Echo server was successfully opened!");
23                 // 設定送出與接收的緩衝區大小
24                 datagramChannel.setOption(StandardSocketOptions.SO_RCVBUF, 4 * 1024);
25                 datagramChannel.setOption(StandardSocketOptions.SO_SNDBUF, 4 * 1024);
26                 // 在執行 bind 之後,socket 和這個網址產生繫結,一直到 close 為止。
27                 datagramChannel.bind(new InetSocketAddress(LOCAL_IP, LOCAL_PORT));
28 
29                 while (true) {
30                     // 等待接收資料
31                     SocketAddress clientAddress = datagramChannel.receive(buffer);
32                     // 收到資料後, buffer 調整游標,並由寫入模式轉為讀取模式。
33                     buffer.flip();
34                     System.out.println("收到 " + buffer.limit() + " bytes 資料");
35                     // 將資料送回給 client
36                     datagramChannel.send(buffer, clientAddress);
37                     // 清除 buffer
38                     buffer.clear();
39                 }
40             } else {
41                 System.out.println("channel 開啟失敗");
42             }
43         } catch (Exception ex) {
44             ex.printStackTrace();
45         }
46     }
47 }
  • Echo Client
1 package idv.steven.udp;
 2 
 3 import java.io.IOException;
 4 import java.net.InetSocketAddress;
 5 import java.net.StandardProtocolFamily;
 6 import java.net.StandardSocketOptions;
 7 import java.nio.ByteBuffer;
 8 import java.nio.CharBuffer;
 9 import java.nio.channels.DatagramChannel;
10 import java.nio.charset.Charset;
11 import java.nio.charset.CharsetDecoder;
12 
13 public class EchoClient {
14     public static void main(String[] args) throws IOException {
15         final int REMOTE_PORT = 7335;
16         final String REMOTE_IP = "127.0.0.1"; //modify this accordingly if you want to test remote
17         final int MAX_PACKET_SIZE = 65507;
18         
19         CharBuffer charBuffer = null;
20         // 網路傳輸時,資訊基本上就是 byte array,但是同樣的資料用不同編碼當然會不一樣,
21         // 所以,client 和 server 一定會約定好編碼,上面的 server 因為只是直接回傳同樣的資料,
22         // 才沒有處理編碼的問題。 
23         Charset charset = Charset.defaultCharset();
24         CharsetDecoder decoder = charset.newDecoder();
25         ByteBuffer text = ByteBuffer.wrap("島國程式員!".getBytes()); // 將傳到 server 的字串
26         ByteBuffer echoedText = ByteBuffer.allocateDirect(MAX_PACKET_SIZE); // 回傳的字串
27         
28         try (DatagramChannel datagramChannel = DatagramChannel.open(StandardProtocolFamily.INET)) {
29             if (datagramChannel.isOpen()) {
30                 datagramChannel.setOption(StandardSocketOptions.SO_RCVBUF, 4 * 1024);
31                 datagramChannel.setOption(StandardSocketOptions.SO_SNDBUF, 4 * 1024);
32 
33                 // 將資料送出到 server
34                 int sent = datagramChannel.send(text, new InetSocketAddress(REMOTE_IP, REMOTE_PORT));
35                 // 等待 server 回應
36                 datagramChannel.receive(echoedText);
37                 echoedText.flip();
38 
39                 // 收到回應的資料後,將它輸出到 console。
40                 charBuffer = decoder.decode(echoedText);
41                 System.out.println(charBuffer.toString());
42                 echoedText.clear();
43             } else {
44                 System.out.println("channel 開啟失敗");
45             }
46         }
47         catch (Exception ex) {
48             ex.printStackTrace();
49         }
50     }
51 }
這個範例也可以換成用 TCP 來寫,用 UDP 和 TCP 的差異在於,TCP 有確實建立 client 到 server 間的連線,而 UDP 並沒有! 另一個要注意的是,在類別圖中可以看到 DatagramChannel 也有實作 ReadableByteChannel 及 WritableByteChannel 兩個介面,在檔案的 I/O 中正是透過這兩個介面的 read() 和 write() 來讀寫資料 (詳見: NIO.2 開檔、讀檔、寫檔),那麼 DatagramChannel 既然實作了這兩個介面,是否也可以使用 read() 和 write() method 來接收和送出資料呢? 答案是當然可以! 不然為什麼要實作這兩個介面?? 現在來改寫一下 client 程式。
1 package idv.steven.udp;
 2 
 3 import java.io.IOException;
 4 import java.net.InetSocketAddress;
 5 import java.net.SocketAddress;
 6 import java.net.StandardProtocolFamily;
 7 import java.net.StandardSocketOptions;
 8 import java.nio.ByteBuffer;
 9 import java.nio.CharBuffer;
10 import java.nio.channels.DatagramChannel;
11 import java.nio.charset.Charset;
12 import java.nio.charset.CharsetDecoder;
13 
14 public class EchoClient2 {
15     public static void main(String[] args) throws IOException {
16         final int REMOTE_PORT = 7335;
17         final String REMOTE_IP = "127.0.0.1"; //modify this accordingly if you want to test remote
18         final int MAX_PACKET_SIZE = 65507;
19         
20         CharBuffer charBuffer = null;
21         // 網路傳輸時,資訊基本上就是 byte array,但是同樣的資料用不同編碼當然會不一樣,
22         // 所以,client 和 server 一定會約定好編碼,上面的 server 因為只是直接回傳同樣的資料,
23         // 才沒有處理編碼的問題。 
24         Charset charset = Charset.defaultCharset();
25         CharsetDecoder decoder = charset.newDecoder();
26         ByteBuffer text = ByteBuffer.wrap("島國程式員!".getBytes()); // 將傳到 server 的字串
27         ByteBuffer echoedText = ByteBuffer.allocateDirect(MAX_PACKET_SIZE); // 回傳的字串
28         
29         try (DatagramChannel datagramChannel = DatagramChannel.open(StandardProtocolFamily.INET)) {
30             if (datagramChannel.isOpen()) {
31                 datagramChannel.setOption(StandardSocketOptions.SO_RCVBUF, 4 * 1024);
32                 datagramChannel.setOption(StandardSocketOptions.SO_SNDBUF, 4 * 1024);
33                 
34                 // 使用 read()、write() 前,一定要先呼叫 connect() 建立與 server 間的連線。
35                 SocketAddress remote = new InetSocketAddress(REMOTE_IP, REMOTE_PORT);
36                 datagramChannel.connect(remote);
37                 
38                 datagramChannel.write(text);
39                 datagramChannel.read(echoedText);
40                 echoedText.flip();
41 
42                 // 收到回應的資料後,將它輸出到 console。
43                 charBuffer = decoder.decode(echoedText);
44                 System.out.println(charBuffer.toString());
45                 echoedText.clear();
46             } else {
47                 System.out.println("channel 開啟失敗");
48             }
49         }
50         catch (Exception ex) {
51             ex.printStackTrace();
52         }
53     }
54 }