I/O待ちを緩和させるのに効果的な Reactor パターン

クラウド上で動作するプログラムを組んでいく際、スケールすることを狙って多くのロジックがデカップリングされ、複数のサーバに分散されることと思いますが、そこで顕著になってくるボトルネックの一つがネットワークなどのI/O待ちです。

このI/O待ちを減少させるのに効果的なのが Reactor パターン。
このパターン自体は特に目新しいものでもないのですが、近年のクラウドブームで再び脚光を浴びそうなので自分の備忘録もかねて紹介します。

Reactor パターン
http://en.wikipedia.org/wiki/Reactor_pattern

この Reactor パターンはどういった場合に使用するかというと

  • 複数のI/O待ちが想定される場合

というのが代表格のようです。今回はネットワークI/Oを想定していますが、データベースへの問い合わせに時間がかかる際にも有用となります。


例としてクローラを初歩的な仕様で書いてみると

URL配列にクロール先URLを格納
ループ:
 URL配列からURLを取り出す
 URLに問い合わせを行い、取得した内容を出力する

というような感じに書けますね。


では今度は Reactor パターン

URL配列にクロール先URLを格納
ループ:
 URL配列からURLを取り出す
 URLに問い合わせを行い、結果が帰ってきたときの処理内容を登録(この時点では結果が帰ってくるのを待たない)
ループ:
 先ほどの問い合わせ群の中に内容を取得可能なものがあるか判別:
  あれば内容を取得し、1番目のループで登録されている処理を実行

と、だいたいこういった流れになります。
Reactor パターンのほうが若干複雑な書き方になっていますが、何が嬉しいのかというと2番目のループのなかで取得可能なものから順に内容を取得し、処理することができるということです。(本当はイベント駆動の形なのですが、わかりやすいようにイベント系の用語を使わず書いてみました)
問い合わせを行うURL群の中にはひょっとしたら結果が返ってくるまで10秒かかるものがあるかもしれません。シーケンシャルに問い合わせを行うだけではその10秒間をただ待ち続けるだけでその間何もすることが出来ませんが、Reactorパターンだと10秒待つ間にほかのURL(取得できたもの)の内容を処理可能になるという大きな利点があります。
この辺は頭ではすぐ理解できるのですが実際にコードに落とし込むのはなかなか大変です。ただ、各言語にはモジュール等の形で Reactor パターンをうまく使える仕組みがあるのでそちらを利用することが一般的だと思われます。

Python だと Twisted、Ruby だと Eventmachine、JavaScript だと nodejs、Perl は今なら AnyEvent + Coro という感じでしょうか。


だんだん説明に自信がなくなってきたので実際に各言語で初歩的なクローラを実装してみます。
下記のプログラム中にでてくるsleep10.phpなどというのは10秒スリープしてから結果を返すような簡単なphpプログラムで、10、5、0という順で配列に格納してあるのは非同期であればsleep0.phpのファイル内容が先に返されることを確認するためです。


Python (Twisted)

from twisted.web.client import getPage
from twisted.internet import reactor,defer

def printBody(body):
    print body

urlList = ['http://127.0.0.1/sleep10.php', 'http://127.0.0.1/sleep5.php', 'http://127.0.0.1/sleep0.php']
dfrList = []

for url in urlList:
    dfrList.append(getPage(url).addCallback(printBody))

defer.DeferredList(dfrList).addCallback(lambda _: reactor.stop())
reactor.run()

Ruby (EventMachine)

require 'rubygems'
require 'eventmachine'
require 'em-http'

urls = ['http://127.0.0.1/sleep10.php', 'http://127.0.0.1/sleep5.php', 'http://127.0.0.1/sleep0.php']
pending = urls.size

EM.run {
  urls.each { |url|
    puts url
    http = EM::HttpRequest.new(url).get
    http.callback { |data|
      puts data.response
      EM.stop_event_loop if (pending -= 1) < 1
    }
  }
}

Javascript (nodejs)

var sys = require('sys'),
   http = require('http'),
   url = require('url');

var urlArray = new Array('http://127.0.0.1/sleep10.php', 'http://127.0.0.1/sleep5.php', 'http://127.0.0.1/sleep0.php');

for(var i = 0; i < urlArray.length; i++ ) {
    var host = url.parse(urlArray[i]);
    var c = http.createClient(80, host.hostname);
    var request = c.request('GET', host.pathname, {'host': host.hostname});
    request.addListener('response', function (response) {
      response.addListener('data', function (body) {
        sys.puts(body);
      });
    });
    request.end();
}

Perl (AnyEvent + Coro)

use strict;
use AnyEvent::HTTP;
use Coro;
use AnyEvent;

my @urlArray  = ("http://127.0.0.1/sleep10.php", "http://127.0.0.1/sleep5.php", "http://127.0.0.1/sleep0.php");
my @coroArray;

foreach my $url (@urlArray) {
    push @coroArray, async {
        coro_cb($url);
    }
}

sub coro_cb {
        my $url = shift;
        http_get($url, Coro::rouse_cb);
        my @result = Coro::rouse_wait;
        print $result[0];
}

$_->join for @coroArray;

ちなみに上記のプログラムはURL取得の際のエラー処理を全くしていないということと、URLの数が膨大になると select システムコールを使っているようなものはパンクしてしまいそうといった欠点があり、このままでは実用に耐えないと思われるのでご注意を。