読者です 読者をやめる 読者になる 読者になる

echoサーバーを書いてみたときのメモ その3 マルチプロセス、ノンブロッキングI/O、I/O多重で複数クライアントを捌く

network

前回、前々回の続き。

echoサーバーを書いてみたときのメモ その1 ソケットAPIとTCP

echoサーバーを書いてみたときのメモ その2 なぜ複数クライアントを捌けないのか

複数クライアントを同時に捌くために以下の方法で対応してみる。

  • ブロッキングI/Oのまま。forkを使って複数プロセスを立ち上げ、1プロセス1クライアントで対応する
  • ブロッキングI/Oを使って対応する。1プロセスで複数クライアントに対応する
  • I/O多重を使って対応する。1プロセスで複数クライアントに対応する

今回もUNIXネットワークプログラミングにお世話になります。

UNIXネットワークプログラミング〈Vol.1〉ネットワークAPI:ソケットとXTI

UNIXネットワークプログラミング〈Vol.1〉ネットワークAPI:ソケットとXTI

  • 作者: W.リチャードスティーヴンス,W.Richard Stevens,篠田陽一
  • 出版社/メーカー: ピアソンエデュケーション
  • 発売日: 1999/07
  • メディア: 単行本
  • 購入: 8人 クリック: 151回
  • この商品を含むブログ (37件) を見る

forkを使ったechoサーバー

前々回作ったechoサーバーをforkを使って、1クライアント毎に1プロセスを割り当てる様にする。 accept 後に fork して、親プロセスではクライアントとの接続用ソケットである connect_dclose 、子プロセスではListen用ソケットである listener_dclose する。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <sys/wait.h>

void error(char *msg)
{
  fprintf(stderr, "%s:%s\n", msg, strerror(errno));
  exit(1);
}

int read_line(int socket, char *buf, int len)
{
  char *s = buf;
  int slen = len;
  int c = read(socket, s, slen);
  while ((c > 0) && (s[c - 1] != '\n')) {
    s += c;
    slen = -c;
    c = read(socket, s, slen);
  }
  if (c < 0) {
    return c;
  }
  return len - slen;
}

int main(int argc, char *argv[])
{
  int listener_d = socket(PF_INET, SOCK_STREAM, 0);
  if (listener_d == -1) {
    error("socket err");
  }

  struct sockaddr_in name;
  name.sin_family = AF_INET;
  name.sin_port = (in_port_t)htons(30000);
  name.sin_addr.s_addr = htonl(INADDR_ANY);
  if (bind(listener_d, (struct sockaddr *) &name, sizeof(name)) == -1) {
    error("bind err");
  }

  if (listen(listener_d, 1) == -1) {
    error("listen err");
  }

  puts("wait...");

  struct sockaddr_storage client_addr;
  unsigned int address_size = sizeof(client_addr);

  char buf[255];
  while(1) {
    int connect_d = accept(listener_d, (struct sockaddr *)&client_addr, &address_size);
    if (connect_d == -1) {
      error("accept err");
    }

    if (fork() == 0) {
      char *msg = "Hello World!\r\n";
      write(connect_d, msg, strlen(msg));

      read_line(connect_d, buf, sizeof(buf));
      write(connect_d, buf, strlen(buf));
      close(connect_d);
      exit(0);
    }
    close(connect_d);
  }
  return 0;
}

forkを使っていないechoサーバーの場合、 connect_dclose したらFINが送信されクライアントとの接続が切れた。 fork を使ったechoサーバーでは親プロセスが connect_dclose するわけだが大丈夫なのだろうか。UNIXネットワークプログラミングにはこう書いてある。

すべてのファイルやソケットが参照カウンタを持っていることを理解することが必要である

UNIXネットワークプログラミング P103

close すると参照カウンタが1つ減るが、これが0にならないとFINは送信されない。親プロセスで close しても参照カウンタが2から1に減るだけなので、FINは送信されずクライアントとの接続は切れない。

動かしてみる

echoサーバーを起動して確認。

$ netstat -an | grep 30000
tcp        0      0 0.0.0.0:30000           0.0.0.0:*               LISTEN

$ lsof -i:30000
COMMAND     PID   USER   FD   TYPE    DEVICE SIZE/OFF NODE NAME
fork_echo 11474 ubuntu    3u  IPv4 246242056      0t0  TCP *:30000 (LISTEN)

$ ll /proc/11474/fd
total 0
dr-x------ 2 ubuntu ubuntu  0 Mar 12 07:09 ./
dr-xr-xr-x 9 ubuntu ubuntu  0 Mar 12 07:09 ../
lrwx------ 1 ubuntu ubuntu 64 Mar 12 07:09 0 -> /dev/pts/1
lrwx------ 1 ubuntu ubuntu 64 Mar 12 07:09 1 -> /dev/pts/1
lrwx------ 1 ubuntu ubuntu 64 Mar 12 07:09 2 -> /dev/pts/1
lrwx------ 1 ubuntu ubuntu 64 Mar 12 07:09 3 -> socket:[246242056]

クライアントから2つ接続してみる。

クライアント1

$ telnet 192.168.33.10 30000
Trying 192.168.33.10...
Connected to 192.168.33.10.
Escape character is '^]'.
Hello World!

クライアント2

$ telnet 192.168.33.10 30000
Trying 192.168.33.10...
Connected to 192.168.33.10.
Escape character is '^]'.
Hello World!

fork を使っていないechoサーバーの場合と異なり、両方に Hello World! とサーバーから返事が来ていることが分かる。

サーバーで確認。

$ ps -xf 
  PID TTY      STAT   TIME COMMAND
 2767 ?        S      0:00 sshd: ubuntu@pts/1
 2768 pts/1    Ss     0:00  \_ -bash
11474 pts/1    S+     0:00      \_ ./fork_echo_server
11479 pts/1    S+     0:00          \_ ./fork_echo_server
11480 pts/1    S+     0:00          \_ ./fork_echo_serve

親プロセスが1つ、子プロセスが2つできてる。さらに確認。

$ lsof -i:30000
COMMAND     PID   USER   FD   TYPE    DEVICE SIZE/OFF NODE NAME
fork_echo 11474 ubuntu    3u  IPv4 246242056      0t0  TCP *:30000 (LISTEN)
fork_echo 11479 ubuntu    4u  IPv4 246242057      0t0  TCP 192.168.33.10:30000->192.168.33.1:59013 (ESTABLISHED)
fork_echo 11480 ubuntu    4u  IPv4 246242079      0t0  TCP 192.168.33.10:30000->192.168.33.1:59014 (ESTABLISHED)

$ ll /proc/11474/fd
total 0
dr-x------ 2 ubuntu ubuntu  0 Mar 12 07:09 ./
dr-xr-xr-x 9 ubuntu ubuntu  0 Mar 12 07:09 ../
lrwx------ 1 ubuntu ubuntu 64 Mar 12 07:09 0 -> /dev/pts/1
lrwx------ 1 ubuntu ubuntu 64 Mar 12 07:09 1 -> /dev/pts/1
lrwx------ 1 ubuntu ubuntu 64 Mar 12 07:09 2 -> /dev/pts/1
lrwx------ 1 ubuntu ubuntu 64 Mar 12 07:09 3 -> socket:[246242056]

$ ll /proc/11479/fd
total 0
dr-x------ 2 ubuntu ubuntu  0 Mar 12 07:10 ./
dr-xr-xr-x 9 ubuntu ubuntu  0 Mar 12 07:10 ../
lrwx------ 1 ubuntu ubuntu 64 Mar 12 07:10 0 -> /dev/pts/1
lrwx------ 1 ubuntu ubuntu 64 Mar 12 07:10 1 -> /dev/pts/1
lrwx------ 1 ubuntu ubuntu 64 Mar 12 07:10 2 -> /dev/pts/1
lrwx------ 1 ubuntu ubuntu 64 Mar 12 07:10 4 -> socket:[246242057]

$ ll /proc/11480/fd
total 0
dr-x------ 2 ubuntu ubuntu  0 Mar 12 07:10 ./
dr-xr-xr-x 9 ubuntu ubuntu  0 Mar 12 07:10 ../
lrwx------ 1 ubuntu ubuntu 64 Mar 12 07:10 0 -> /dev/pts/1
lrwx------ 1 ubuntu ubuntu 64 Mar 12 07:10 1 -> /dev/pts/1
lrwx------ 1 ubuntu ubuntu 64 Mar 12 07:10 2 -> /dev/pts/1
lrwx------ 1 ubuntu ubuntu 64 Mar 12 07:10 4 -> socket:[246242079]

3つのプロセスがそれぞれ1つずつソケットを使ってる。親プロセスはLISTENしてるソケット。子プロセスはクライアントと接続してるソケット。

クライアント2の方で適当な文字列を入力すると、文字列が返ってきてサーバーから切断される。

$ telnet 192.168.33.10 30000
Trying 192.168.33.10...
Connected to 192.168.33.10.
Escape character is '^]'.
Hello World!
hoge
hoge
Connection closed by foreign host.

サーバーの状態を確認。接続済みソケットが1つになってることがわかる。子プロセスの close が実行された時にソケットの参照カウンタが0になるため、サーバーはFINを送信して、最終的に接続が切れる。

$ lsof -i:30000
COMMAND     PID   USER   FD   TYPE    DEVICE SIZE/OFF NODE NAME
fork_echo 11474 ubuntu    3u  IPv4 246242056      0t0  TCP *:30000 (LISTEN)
fork_echo 11479 ubuntu    4u  IPv4 246242057      0t0  TCP 192.168.33.10:30000->192.168.33.1:59013 (ESTABLISHED)

forkを使って、複数クライアントを同時に取り扱うことができた。

psでサーバー側のプロセスを確認してみる。

$ ps -xf 
  PID TTY      STAT   TIME COMMAND
 2767 ?        S      0:00 sshd: ubuntu@pts/1
 2768 pts/1    Ss     0:00  \_ -bash
11474 pts/1    S+     0:00      \_ ./fork_echo_server
11479 pts/1    S+     0:00          \_ ./fork_echo_server
11480 pts/1    Z+     0:00          \_ [fork_echo_serve] <defunct>

接続が切れた子プロせすがゾンビ状態で残ってしまっている。この問題に対応するためにSIGCHLDシグナルを処理しましょう、といったこともUNIXネットワークプログラミング(5.8 Posixのシグナル処理、5.9 SIGCHLDシグナルの処理)に書いてある。

ノンブロッキングI/Oを使ったechoサーバー

ioctl を使って listener_d (Listen用ソケット)をノンブロッキングにする。ノンブロッキングなソケットに対して accept すると、クライアントからの接続が来てない場合、ブロックせずにすぐに EWOULDBLOCK を返す。

クライアントからの接続が来てる場合は accept でこれまで通り接続済みソケットを返す。この接続済みソケットも ioctl を使ってノンブロッキングにする。ノンブロッキングなソケットに対して read すると、データが到達していない場合、ブロックせずにすぐに EAGAIN を返す。

コードはこんな感じになった。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <sys/ioctl.h>

void error(char *msg)
{
  fprintf(stderr, "%s:%s\n", msg, strerror(errno));
  exit(1);
}

int read_line(int socket, char *buf, int len)
{
  char *s = buf;
  int slen = len;
  int c = read(socket, s, slen);
  while ((c > 0) && (s[c - 1] != '\n')) {
    s += c;
    slen = -c;
    c = read(socket, s, slen);
  }
  if (c < 0) {
    return c;
  }
  return len - slen;
}

int main(int argc, char *argv[])
{
  int listener_d = socket(PF_INET, SOCK_STREAM, 0);
  if (listener_d == -1) {
    error("socket err");
  }

  struct sockaddr_in name;
  name.sin_family = AF_INET;
  name.sin_port = (in_port_t)htons(30000);
  name.sin_addr.s_addr = htonl(INADDR_ANY);
  if (bind(listener_d, (struct sockaddr *) &name, sizeof(name)) == -1) {
    error("bind err");
  }

  if (listen(listener_d, 1) == -1) {
    error("listen err");
  }

  puts("wait...");

  struct sockaddr_storage client_addr;
  unsigned int address_size = sizeof(client_addr);

  // Listenソケットをノンブロッキングにする
  int val = 1;
  if (ioctl(listener_d, FIONBIO, &val) == -1) {
    error("ioctl err");
  }

  // 接続済みソケットを管理するための配列
  int fds[255];
  // 接続済みソケットの数
  int n = 0;

  char buf[255];
  while(1) {
    int connect_d = accept(listener_d, (struct sockaddr *)&client_addr, &address_size);
    if (connect_d < 0) {
      // クライアントの接続がない場合は EWOULDBLOCK
      if (errno != EWOULDBLOCK) {
        error("accept err");
      }
    } else {
      // 接続済みソケットをノンブロッキングにする
      if (ioctl(connect_d, FIONBIO, &val) == -1) {
        error("ioctl err");
      }
      fds[n] = connect_d;
      n++;

      char *msg = "Hello World!\r\n";
      write(connect_d, msg, strlen(msg));
    }

    int i = 0;
    while (i < n) {
      // 接続済みソケットを順番に処理
      int conn = fds[i];
      if (read_line(conn, buf, sizeof(buf)) < 0) {
        // データが届いてない場合は EAGAIN
        if (errno != EAGAIN) {
          error("read err");
        }
        i++;
      } else {
        // データが届いてる場合はechoしてclose
        write(conn, buf, strlen(buf));
        close(conn);
        n--;
      }
    }
  }
  return 0;
}

ここで、UNIXネットワークプログラミングに書いてあった2つのステップを思い出す。

入力操作は次の2段階で構成されている。 - データの用意ができるまで待ち、 - そのデータをカーネルからプロセスにコピーする。

ソケットに関する入力では、最初のステップでは普通のネットワークからのデータの到着を待つ。パケットが到着すると、カーネル内のバッファにコピーされる。2つ目のステップでは、このデータをカーネルのバッファからアプリケーションのバッファにコピーすることになる。

Unixネットワーキングプログラミング P.140

acceptread がステップ1でブロックしてしまうのが原因で、ブロッキングI/Oを使ったechoサーバーでは複数クライアントを捌けなかった。ノンブロッキングI/Oではステップ1でデータが用意されていない場合、ブロックせずに何らかの返り値をすぐに返す。そのため、ブロックせずに複数クライアントを同時に捌くことができる。

動かしてみる

ノンブロッキングI/Oを使ったechoサーバーを起動して、確認してみる。

$ ps -xf
  PID TTY      STAT   TIME COMMAND
 2767 ?        S      0:00 sshd: ubuntu@pts/1
 2768 pts/1    Ss     0:00  \_ -bash
11508 pts/1    R+     0:02      \_ ./non_blocking_echo_server

$ lsof -i:30000
COMMAND     PID   USER   FD   TYPE    DEVICE SIZE/OFF NODE NAME
non_block 11508 ubuntu    3u  IPv4 246242707      0t0  TCP *:30000 (LISTEN)

$ ll /proc/11508/fd
total 0
dr-x------ 2 ubuntu ubuntu  0 Mar 12 07:49 ./
dr-xr-xr-x 9 ubuntu ubuntu  0 Mar 12 07:49 ../
lrwx------ 1 ubuntu ubuntu 64 Mar 12 07:50 0 -> /dev/pts/1
lrwx------ 1 ubuntu ubuntu 64 Mar 12 07:50 1 -> /dev/pts/1
lrwx------ 1 ubuntu ubuntu 64 Mar 12 07:49 2 -> /dev/pts/1
lrwx------ 1 ubuntu ubuntu 64 Mar 12 07:50 3 -> socket:[246242707]

クライアントから2つ接続してみる。

クライアント1

$ telnet 192.168.33.10 30000
Trying 192.168.33.10...
Connected to 192.168.33.10.
Escape character is '^]'.
Hello World!

クライアント2

$ telnet 192.168.33.10 30000
Trying 192.168.33.10...
Connected to 192.168.33.10.
Escape character is '^]'.
Hello World!

forkを使ったechoサーバーと同様に、両方とも Hello World! とサーバーから返事が来てる。

サーバー側を確認。

$ ps -xf 
  PID TTY      STAT   TIME COMMAND
 2767 ?        S      0:00 sshd: ubuntu@pts/1
 2768 pts/1    Ss     0:00  \_ -bash
11508 pts/1    R+     3:14      \_ ./non_blocking_echo_server

forkを使ったechoサーバーと異なり、echoサーバーのプロセスは1つしかない。さらに確認してみる。

$ lsof -i:30000
COMMAND     PID   USER   FD   TYPE    DEVICE SIZE/OFF NODE NAME
non_block 11508 ubuntu    3u  IPv4 246242707      0t0  TCP *:30000 (LISTEN)
non_block 11508 ubuntu    4u  IPv4 303901660      0t0  TCP 192.168.33.10:30000->192.168.33.1:59337 (ESTABLISHED)
non_block 11508 ubuntu    5u  IPv4 305166582      0t0  TCP 192.168.33.10:30000->192.168.33.1:59338 (ESTABLISHED)

$ ll /proc/11508/fd
total 0
dr-x------ 2 ubuntu ubuntu  0 Mar 12 07:49 ./
dr-xr-xr-x 9 ubuntu ubuntu  0 Mar 12 07:49 ../
lrwx------ 1 ubuntu ubuntu 64 Mar 12 07:50 0 -> /dev/pts/1
lrwx------ 1 ubuntu ubuntu 64 Mar 12 07:50 1 -> /dev/pts/1
lrwx------ 1 ubuntu ubuntu 64 Mar 12 07:49 2 -> /dev/pts/1
lrwx------ 1 ubuntu ubuntu 64 Mar 12 07:50 3 -> socket:[246242707]
lrwx------ 1 ubuntu ubuntu 64 Mar 12 07:53 4 -> socket:[303901660]
lrwx------ 1 ubuntu ubuntu 64 Mar 12 07:53 5 -> socket:[305166582]

echoサーバーのプロセスが3つのソケット(Listen用1つ、接続済み2つ)を使ってる。

クライアント2の方で適当な文字列を入力すると、文字列が返ってきてサーバーから切断される。

$ telnet 192.168.33.10 30000
Trying 192.168.33.10...
Connected to 192.168.33.10.
Escape character is '^]'.
Hello World!
hoge
hoge
Connection closed by foreign host.

サーバーの状態を確認。接続済みソケットが1つになってることが分かる。

$ lsof -i:30000
COMMAND     PID   USER   FD   TYPE    DEVICE SIZE/OFF NODE NAME
non_block 11508 ubuntu    3u  IPv4 246242707      0t0  TCP *:30000 (LISTEN)
non_block 11508 ubuntu    4u  IPv4 303901660      0t0  TCP 192.168.33.10:30000->192.168.33.1:59337 (ESTABLISHED)

$ ll /proc/11508/fd
total 0
dr-x------ 2 ubuntu ubuntu  0 Mar 12 07:49 ./
dr-xr-xr-x 9 ubuntu ubuntu  0 Mar 12 07:49 ../
lrwx------ 1 ubuntu ubuntu 64 Mar 12 07:50 0 -> /dev/pts/1
lrwx------ 1 ubuntu ubuntu 64 Mar 12 07:50 1 -> /dev/pts/1
lrwx------ 1 ubuntu ubuntu 64 Mar 12 07:49 2 -> /dev/pts/1
lrwx------ 1 ubuntu ubuntu 64 Mar 12 07:50 3 -> socket:[246242707]
lrwx------ 1 ubuntu ubuntu 64 Mar 12 07:53 4 -> socket:[303901660]

複数クライアントを同時に捌くことができた。だけど、複数のソケットを管理するコードが微妙だし、 strace すると分かるがクライアントからの接続がないとずっとループ処理が動いている。これは無駄。次のI/O多重を使うともっと良くなる。

I/Oの多重化を使ったechoサーバー

epoll を使ってI/Oの多重化を使ったechoサーバーを書く。

readは、2つのステップを行うが、I/Oの多重化では2つのステップのうちの1つ目、データの用意ができるまで待つ、という部分だけを切り出して行う。データが用意できるまでブロックし、データが用意できたらカーネルからプロセスに戻ってくる。この時、1つのディスクリプタだけでなく複数のディスクリプタに対してデータの用意を待つことができる。

I/Oの多重化のためのシステムコールはいくつかあるが、 epoll を使う。 epoll では、 epoll_create epoll_ctl epoll_wait という3つのシステムコールを組み合わせる。

epoll_create

epoll_create の定義。manを読むと size の値は正でないといけないがなんでもいいらしい、現在は使われていない。

#include <sys/epoll.h>

int epoll_create(int size);

以下のような感じでepollファイルディスクリプタをオープンする。

// epollファイルディスクリプタをオープン
int epfd;
if ((epfd = epoll_create(100)) < 0) {
  error("epoll_create err");
}

epoll_ctl

epoll_ctl の定義。epollファイルディスクリプタと監視対象のディスクリプタとの関連を操作する。 op の値として EPOLL_CTL_ADD を指定すると監視対象として追加できる。

#include <sys/epoll.h>

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

epoll_ctlを使って、echoサーバーの listener_d をepollの監視対象にする。

// listener_dソケットをepollの監視対象とする
struct epoll_event ev;
memset(&ev, 0, sizeof ev);
ev.events = EPOLLIN;
ev.data.fd = listener_d;
if ((epoll_ctl(epfd, EPOLL_CTL_ADD, listener_d, &ev)) < 0) {
  error("epoll_ctl error");
}

epoll_wait

epoll ファイルディスクリプタの I/O イベントを待つ。 timeout に-1を指定すると準備ができたファイルディスクリプタができるまで待ち続ける。返り値は準備ができているファイルディスクリプタの数。

第2引数の events には呼び出し可能なイベントが格納される。

#include <sys/epoll.h>

int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

listener_d を監視対象にしてから、 epoll_wait を呼ぶ。 timeout-1 を指定しているので、 listener_d のクライアントからの接続が来るまでブロックする。

  struct epoll_event events[MAX_EVENTS];
  while(1) {
    int fd_count = epoll_wait(epfd, events, MAX_EVENTS, -1);

    // 準備ができたディスクリプタを順番に処理
    int i;
    for (i = 0; i < fd_count; i++) {
      if (events[i].data.fd == listener_d ){
        // クライアントが接続してきた時の処理
      } else {
        // 準備ができたディスクリプタがlisterner_dではない場合の処理
      }
    }
  }
}

クライアントが接続してきた時の処理は、単一クライアントにしか対応できないechoサーバーの時と同様に accept を呼び出す。 epoll_wait でソケットの準備ができるまで待ったので、この accept ではブロックしない。この accept で取得できる接続済みソケットのファイルディスクリプタをepollの監視対象とする。

  struct epoll_event events[MAX_EVENTS];
  while(1) {
    int fd_count = epoll_wait(epfd, events, MAX_EVENTS, -1);

    // 準備ができたディスクリプタを順番に処理
    int i;
    for (i = 0; i < fd_count; i++) {
      if (events[i].data.fd == listener_d ){
        // クライアントが接続してきた時の処理
        int connect_d = accept(listener_d, (struct sockaddr *)&client_addr, &address_size);
        if (connect_d == -1) {
          error("accept err");
        }

        char *msg = "Hello World!\r\n";
        write(connect_d, msg, strlen(msg));

        // connect_dソケットを監視対象とする
        memset(&ev, 0, sizeof ev);
        ev.events = EPOLLIN;
        ev.data.fd = connect_d;
        if ((epoll_ctl(epfd, EPOLL_CTL_ADD, connect_d, &ev)) < 0) {
          error("epoll_ctl error");
        }
      } else {
        // 準備ができたディスクリプタがlisterner_dではない場合の処理
      }
    }
  }
}

準備ができたディスクリプタlisterner_d ではない場合というのは、つまり準備ができたディスクリプタがクライアントとの接続用ソケットのディスクリプタである場合である。なので、クライアントから文字列が届きソケットから取得する準備ができたということである。この状態で read を呼び出してもブロックされる時間は、step2のデータをカーネルからプロセスにコピーする部分だけなので、ほとんどない。

  struct epoll_event events[MAX_EVENTS];
  while(1) {
    int fd_count = epoll_wait(epfd, events, MAX_EVENTS, -1);

    // 準備ができたディスクリプタを順番に処理
    int i;
    for (i = 0; i < fd_count; i++) {
      if (events[i].data.fd == listener_d ){
        // クライアントが接続してきた時の処理
        int connect_d = accept(listener_d, (struct sockaddr *)&client_addr, &address_size);
        if (connect_d == -1) {
          error("accept err");
        }

        char *msg = "Hello World!\r\n";
        write(connect_d, msg, strlen(msg));

        // connect_dソケットを監視対象とする
        memset(&ev, 0, sizeof ev);
        ev.events = EPOLLIN;
        ev.data.fd = connect_d;
        if ((epoll_ctl(epfd, EPOLL_CTL_ADD, connect_d, &ev)) < 0) {
          error("epoll_ctl error");
        }
      } else {
        // 準備ができたディスクリプタがlisterner_dではない場合の処理
        int connect_d = events[i].data.fd;

        read_line(connect_d, buf, sizeof(buf));

        write(connect_d, buf, strlen(buf));
        close(connect_d);

        // closeしたソケットを監視対象から削除
        epoll_ctl(epfd, EPOLL_CTL_DEL, connect_d, &ev);
      }
    }
  }
}

epollを使ったechoサーバー

コードはこんな感じになった。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <sys/epoll.h>

const int MAX_EVENTS = 10;

void error(char *msg)
{
  fprintf(stderr, "%s:%s\n", msg, strerror(errno));
  exit(1);
}

int read_line(int socket, char *buf, int len)
{
  char *s = buf;
  int slen = len;
  int c = read(socket, s, slen);
  while ((c > 0) && (s[c - 1] != '\n')) {
    s += c;
    slen = -c;
    c = read(socket, s, slen);
  }
  if (c < 0) {
    return c;
  }
  return len - slen;
}

int main(int argc, char *argv[])
{
  int listener_d = socket(PF_INET, SOCK_STREAM, 0);
  if (listener_d == -1) {
    error("socket err");
  }

  struct sockaddr_in name;
  name.sin_family = AF_INET;
  name.sin_port = (in_port_t)htons(30000);
  name.sin_addr.s_addr = htonl(INADDR_ANY);
  if (bind(listener_d, (struct sockaddr *) &name, sizeof(name)) == -1) {
    error("bind err");
  }

  if (listen(listener_d, 1) == -1) {
    error("listen err");
  }

  puts("wait...");

  struct sockaddr_storage client_addr;
  unsigned int address_size = sizeof(client_addr);

  char buf[255];

  // epollファイルディスクリプタをオープン
  int epfd;
  if ((epfd = epoll_create(100)) < 0) {
    error("epoll_create err");
  }

  // listener_dソケットをepollの監視対象とする
  struct epoll_event ev;
  memset(&ev, 0, sizeof ev);
  ev.events = EPOLLIN;
  ev.data.fd = listener_d;
  if ((epoll_ctl(epfd, EPOLL_CTL_ADD, listener_d, &ev)) < 0) {
    error("epoll_ctl error");
  }

  struct epoll_event events[MAX_EVENTS];
  while(1) {
    int fd_count = epoll_wait(epfd, events, MAX_EVENTS, -1);

    // 準備ができたディスクリプタを順番に処理
    int i;
    for (i = 0; i < fd_count; i++) {
      if (events[i].data.fd == listener_d ){
        // 準備ができたディスクリプタがlistener_dということは
        // 新しいクライアントが接続してきたということ
        int connect_d = accept(listener_d, (struct sockaddr *)&client_addr, &address_size);
        if (connect_d == -1) {
          error("accept err");
        }

        char *msg = "Hello World!\r\n";
        write(connect_d, msg, strlen(msg));

        // connect_dソケットを監視対象とする
        memset(&ev, 0, sizeof ev);
        ev.events = EPOLLIN;
        ev.data.fd = connect_d;
        if ((epoll_ctl(epfd, EPOLL_CTL_ADD, connect_d, &ev)) < 0) {
          error("epoll_ctl error");
        }

      } else {
        // connect_dの準備ができたということは
        // クライアントからのデータが届いたということ
        int connect_d = events[i].data.fd;

        read_line(connect_d, buf, sizeof(buf));

        write(connect_d, buf, strlen(buf));
        close(connect_d);

        // closeしたソケットを監視対象から削除
        epoll_ctl(epfd, EPOLL_CTL_DEL, connect_d, &ev);
      }
    }
  }
  return 0;
}

動かしてみる

I/Oの多重化を使ったechoサーバーを起動して、確認してみる。

$ lsof -i:30000
COMMAND    PID   USER   FD   TYPE DEVICE SIZE/OFF NODE NAME
epoll_ech 2836 ubuntu    3u  IPv4  57737      0t0  TCP *:30000 (LISTEN)

$ ll /proc/2836/fd
total 0
dr-x------ 2 ubuntu ubuntu  0 Mar  5 12:50 ./
dr-xr-xr-x 9 ubuntu ubuntu  0 Mar  5 12:50 ../
lrwx------ 1 ubuntu ubuntu 64 Mar  5 12:50 0 -> /dev/pts/1
lrwx------ 1 ubuntu ubuntu 64 Mar  5 12:50 1 -> /dev/pts/1
lrwx------ 1 ubuntu ubuntu 64 Mar  5 12:50 2 -> /dev/pts/1
lrwx------ 1 ubuntu ubuntu 64 Mar  5 12:50 3 -> socket:[57737]
lrwx------ 1 ubuntu ubuntu 64 Mar  5 12:50 4 -> anon_inode:[eventpoll]

ソケットだけではなく、epoll用のディスクリプタを使っていることが分かる。

クライアントから2つ接続してみる。

クライアント1

$ telnet 192.168.33.10 30000
Trying 192.168.33.10...
Connected to 192.168.33.10.
Escape character is '^]'.
Hello World!

クライアント2

$ telnet 192.168.33.10 30000
Trying 192.168.33.10...
Connected to 192.168.33.10.
Escape character is '^]'.
Hello World!

forkを使ったechoサーバーと同様に、両方とも Hello World! とサーバーから返事が来てる。

サーバー側を確認。

$ lsof -i:30000
COMMAND    PID   USER   FD   TYPE DEVICE SIZE/OFF NODE NAME
epoll_ech 2836 ubuntu    3u  IPv4  57737      0t0  TCP *:30000 (LISTEN)
epoll_ech 2836 ubuntu    5u  IPv4  60491      0t0  TCP 192.168.33.10:30000->192.168.33.1:49526 (ESTABLISHED)
epoll_ech 2836 ubuntu    6u  IPv4  60493      0t0  TCP 192.168.33.10:30000->192.168.33.1:49527 (ESTABLISHED)

$ ll /proc/2836/fd
total 0
dr-x------ 2 ubuntu ubuntu  0 Mar  5 12:50 ./
dr-xr-xr-x 9 ubuntu ubuntu  0 Mar  5 12:50 ../
lrwx------ 1 ubuntu ubuntu 64 Mar  5 12:50 0 -> /dev/pts/1
lrwx------ 1 ubuntu ubuntu 64 Mar  5 12:50 1 -> /dev/pts/1
lrwx------ 1 ubuntu ubuntu 64 Mar  5 12:50 2 -> /dev/pts/1
lrwx------ 1 ubuntu ubuntu 64 Mar  5 12:50 3 -> socket:[57737]
lrwx------ 1 ubuntu ubuntu 64 Mar  5 12:50 4 -> anon_inode:[eventpoll]
lrwx------ 1 ubuntu ubuntu 64 Mar  5 12:53 5 -> socket:[60491]
lrwx------ 1 ubuntu ubuntu 64 Mar  5 12:53 6 -> socket:[60493]

$ sudo strace -s 1024 -p 2836
strace: Process 2836 attached
epoll_wait(4,

$ ps x | grep epoll_echo_server
 2836 pts/1    S+     0:00 ./epoll_echo_server

2つの接続済みソケットが作成されている。また、 strace の結果から read ではなく epoll_waitブロッキングしていることが分かる。 ps の結果からecho_serverのプロセスは1つだけであることも分かる。

epollを使ってI/O多重を行うことで、1つのプロセスでも複数クライアントを捌くことができた。

ブロックしてはいけない

I/O多重によってソケットのブロックをなくし、1プロセスで複数クライアントを捌けるようになったが、ブロックしてしまう可能性はまだ残っている。これは問題で、1プロセスで動いているのでブロックが発生してしまうとechoサーバー全体の処理が止まってしまう。

まず、epollでディスクリプタの準備ができたソケットから読み込むようにしたが、準備ができた かもしれない だけであって実施に読み込むとブロッキングしてしまう場合があるらしい。そのため、たとえepollを使ったとしてもソケットをノンブロッキングI/Oにしておくなどの対応が必要である。

他のI/Oでもブロックしてはいけない。例えば、ファイル読み込みや他のサーバーとの通信など。これらのI/Oでも、ノンブロッキングI/Oや非同期I/O(POSIX AIO インターフェース)、またはスレッドを使いI/O処理を別スレッドに任せるような非同期処理を行う必要がある。

また、ソケットからの読み込みのブロックはなくすことができたが、書き込みでブロックしてしまう場合がある。アプリケーションからカーネル内のバッファに書き込むが、この時バッファが一杯だとブロックしてしまう。ここでもやはりブロックしないようにノンブロッキングI/OやI/O多重、非同期I/O、スレッドなどで工夫する必要がある。

レベルトリガーとエッジトリガー

epollの通知方法としてレベルトリガーとエッジトリガーとがある。デフォルトはレベルトリガー。エッジトリガーの方が良い場面が良く分からなかったけど、どうやら書き込みのときに便利っぽい。

2008-07-07 - kazuhoのメモ置き場

epoll, エッジトリガー, EPOLLRDHUP - はざまブログ

libev

epollはLinuxで使えるが、BSDでは使えないらしい。代わりにkqueueというシステムコールがある。このようなプラットフォーム依存を隠蔽化したライブラリとしてlibevがある。libevを使ったechoサーバーも書いてみた。libevではepollの場合と異なり、ループ処理を自分で書く必要がなく、コールバックを登録するコードになる。

libevをインストール

$ sudo apt-get install libev-dev

ソースコード

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <unistd.h>

#include <ev.h>

void error(char *msg)
{
  fprintf(stderr, "%s:%s\n", msg, strerror(errno));
  exit(1);
}

int read_line(int socket, char *buf, int len)
{
  char *s = buf;
  int slen = len;
  int c = read(socket, s, slen);
  while ((c > 0) && (s[c - 1] != '\n')) {
    s += c;
    slen = -c;
    c = read(socket, s, slen);
  }
  if (c < 0) {
    return c;
  }
  return len - slen;
}

void connect_callback(EV_P_ ev_io *watcher, int revents)
{
  char buf[255];
  read_line(watcher->fd, buf, sizeof(buf));
  write(watcher->fd, buf, strlen(buf));
  ev_io_stop(EV_A_ watcher);
  close(watcher->fd);
  free(watcher);
}

void listener_callback(EV_P_ ev_io *watcher, int revents)
{
  struct sockaddr_storage client_addr;
  unsigned int address_size = sizeof(client_addr);

  int connect_d = accept(watcher->fd, (struct sockaddr *)&client_addr, &address_size);
  if (connect_d == -1) {
    error("accept err");
  }
  char *msg = "Hello World!\r\n";
  write(connect_d, msg, strlen(msg));

  struct ev_loop *l;
  ev_io *connect_watcher;
  connect_watcher = malloc(sizeof(client_addr));
  l = watcher->data;

  // connect_dを監視
  ev_io_init(connect_watcher, connect_callback, connect_d, EV_READ);
  ev_io_start(l, connect_watcher);
}

int main(int argc, char *argv[])
{
  int listener_d = socket(PF_INET, SOCK_STREAM, 0);
  if (listener_d == -1) {
    error("socket err");
  }

  struct sockaddr_in name;
  name.sin_family = AF_INET;
  name.sin_port = (in_port_t)htons(30000);
  name.sin_addr.s_addr = htonl(INADDR_ANY);
  if (bind(listener_d, (struct sockaddr *) &name, sizeof(name)) == -1) {
    error("bind err");
  }

  if (listen(listener_d, 1) == -1) {
    error("listen err");
  }

  puts("wait...");

  // イベントループの初期化
  struct ev_loop *loop;
  ev_io watcher;

  loop = ev_default_loop(0);
  watcher.data = loop;

  // listener_dを監視
  ev_io_init(&watcher, listener_callback, listener_d, EV_READ);
  ev_io_start(loop, &watcher);

  // イベントループ開始
  ev_loop(loop, 0);

  close(listener_d);
  return 0;
}

コンパイル

$  gcc libev_echo_server.c -l ev -o libev_echo_server

動かして

$ ./libev_echo_server

確認。 epoll_wait でブロックしてることがわかる。

$ lsof -i:30000
COMMAND     PID   USER   FD   TYPE    DEVICE SIZE/OFF NODE NAME
libev_ech 13301 ubuntu    3u  IPv4 566626035      0t0  TCP *:30000 (LISTEN)

$ ll /proc/13301/fd
total 0
dr-x------ 2 ubuntu ubuntu  0 Mar 18 13:03 ./
dr-xr-xr-x 9 ubuntu ubuntu  0 Mar 18 13:03 ../
lrwx------ 1 ubuntu ubuntu 64 Mar 18 13:03 0 -> /dev/pts/0
lrwx------ 1 ubuntu ubuntu 64 Mar 18 13:03 1 -> /dev/pts/0
lrwx------ 1 ubuntu ubuntu 64 Mar 18 13:03 2 -> /dev/pts/0
lrwx------ 1 ubuntu ubuntu 64 Mar 18 13:03 3 -> socket:[566626035]
lrwx------ 1 ubuntu ubuntu 64 Mar 18 13:03 4 -> anon_inode:[eventpoll]
lrwx------ 1 ubuntu ubuntu 64 Mar 18 13:03 5 -> anon_inode:[eventfd]

$ sudo strace -s 1024 -p 13301
strace: Process 13301 attached
epoll_wait(4,

おしまい

echoサーバーを書きながら、いろいろなI/Oモデルを試してみた。Node.jsは元々libevとlibeio(スレッドを使った非同期処理でI/Oを行うためのライブラリ)を使っていたが、現在は両方ともlibuvが使われているらしい。

UNIXネットワークプログラミングを読みながらコードを書くと、ネットワークの話とコードがつながっていく感じがして楽しかった。

コードはこちら。

github.com