RAW Socket / BPF(Berkeley Packet Filter)を用いたパケットキャプチャーツールの実装

パケットキャプチャーツールは、ネットワークを流れるすべてのパケットを受け取り解析します。 NIC(Network Interface Card)のほとんどはプロミスキャスモードとよばれるモードをサポートしており、これを有効にすることでアドレスにかかわらずNICはすべてのパケットをホストに渡します。 ソフトウェアとハードウェアが連携して動作するため、扱っているレイヤーが低く環境によってInterfaceに差異があります。

tcpdumpの開発者によってつくられたlibpcapというライブラリはUNIXのシステムの差異を吸収します。またWindowsにもWinPcapという名前で移植されています。 もしパケットキャプチャーを作る際にはlibpcapを利用することが一般的かと思いますが、今回は勉強も兼ねて LinuxmacOS で動作するパケットキャプチャーをlibpcapを使わずに1からC言語で実装してみました。

※ BPF VM(Berkeley Packet Filter Virtual Machine)によるFilteringの仕組みには今回は触れません。

f:id:nwpct1:20181028135417g:plain

目次

xpcap のソースコード(Github)

github.com

最近作りたいなと思っているパケットキャプチャー関連のソフトわがありそちらはGo言語で実装しているのですが、せっかくなら移植性を考えてPure Goで実装したいと思っています。こういったレイヤーのプログラムをいきなりCのAPIがベースにあってそれをGoで書くとドキュメントを追うのも大変なので、C言語でまずは書いてみたものがこちらです。

プロトコルは今のところARPIPv4IPv6TCPUDP、ICMPに対応していて、Ethernet Frameからパースした結果を標準出力に書き出します。

RAW SOCKETを用いたキャプチャー (Linux)

LinuxMACアドレスEthernet Frameのヘッダー情報までプログラムで扱うには、RAW Socketが必要です。 ソケットディスクリプタを取得する際には、アドレスファミリーとして AF_PACKET 、ソケットタイプとして SOCK_RAW そして第3引数のprotocolには htons(EATH_P_ALL) を指定します。全部を説明すると長くなるので手順と呼び出さないといけない関数を次に示します。 xpcapのソースコードと合わせてご覧ください。

  1. socket() ディスクリプタの取得
    • int soc = socket(AF_PACKET, SOCK_RAW, htons(ETH_P_ALL)))
  2. en0 などのインターフェイス名を指定してインターフェイスの情報を取得
    • ioctl(soc, SIOCGIFINDEX, &if_req)
  3. ソケットディスクリプターをインターフェイスにバインド
    • bind(soc, (struct sockaddr *) &sa, sizeof(sa))
  4. インターフェイスのフラグを取得
    • ioctl(soc, SIOCGIFFLAGS, &if_req)
  5. プロミスキャスモードを有効にし、インターフェイスをUP(動作中)にする
    • ioctl(soc, SIOCSIFFLAGS, &if_req)

これで準備が完了です。あとは selectepoll でソケットディスクリプターへの書き込みを監視しready担った状態で recv(2) で読み出せばOKです。

struct timeval timeout;
fd_set mask;
int width, len, ready;
while (g_gotsig == 0) {
    FD_ZERO(&mask);
    FD_SET(soc, &mask);
    width = doc + 1;

    timeout.tv_sec = 8;
    timeout.tv_usec = 0;
    ready = select(width, &mask, NULL, NULL, &timeout);
    if (ready == -1) {
        perror("select");
        break;
    } else if (ready == 0) {
        fprintf(stderr, "select timeout");
        break;
    }

    if (FD_ISSET(sniffer->fd, &mask)){
        if ((len = recv(soc, buffer, >buf_len, 0)) == -1){
            perror("recv:");
            return -1;
        }
    }
}

自分は Linuxネットワークプログラミングバイブル で勉強しましたが、この書籍以外にもLinuxで動くRAW SOCKETを使ったシンプルなパケットキャプチャーの作り方を解説している資料は多くあります。一方でBSD系のOSではアドレスファミリーとして AF_PACKET を指定できません。BSD系のOSでEthernet frameを読み出す方法を確認しましょう。

BPF(Berkeley Packet Filter)によるキャプチャー (macOS, BSD系)

これらはBPF(Berkeley Packet Filter)という仕組みを使う必要があります。 BPFにはBPF Virtual Machineという仕組みを使ってパケットをKernel側でフィルタリングすることで必要ないものまでユーザー空間に移さずオーバーヘッドを減らす仕組みのようです。読み出しには BPFデバイスというのを用います。ひとまずすべてキャプチャーするならBPF VMについては気にする必要はありません。

BPFデバイスは、 /dev/bpf* に存在します。これらを順にopenしながら、使用可能なBPFデバイスを探さなくてはいけません。

$ ls /dev/bpf?
/dev/bpf0 /dev/bpf1 /dev/bpf2 /dev/bpf3 /dev/bpf4 /dev/bpf5 /dev/bpf6 /dev/bpf7 /dev/bpf8 /dev/bpf9

手元では bpf255 ぐらいまで存在しますが、google/gopacketなどの実装では99までチェックしているようです。 NICの数以上に必要になるケースはほとんどなさそうなので99個は十分に余裕を持った値なんだと思います。

gopacket/bsd_bpf_sniffer.go at a35e09f9f224786863ce609de910bc82fc4d4faf · google/gopacket · GitHub

BPFデバイスが決まったらOpenします。その後次のような手順が準備に必要になります。

  1. bpfデバイスのopen
    • fd = open(params.device, O_RDWR)
  2. バッファ長の設定 or 取得。BIOCSBLEN の変更は、BPFデバイスNICアサインする BIOCSETIF より先に呼び出される必要があるので注意してください。これになかなか気づかず結構はまってしまいました。
    • ioctl(fd, BIOCSBLEN, &params.buf_len) : 設定
    • ioctl(fd, BIOCGBLEN, &params.buf_len) : 取得
  3. BPFデバイスとネットワークインターフェイスをバインド
    • ioctl(fd, BIOCSETIF, &if_req)
  4. プロミスキャスモードの有効化
    • ioctl(fd, BIOCPROMISC, NULL)

こちらはデバイスファイルなので recv(2) ではなく read(2) で読み出します。 読み出すとイーサネットのフレームではなくBPFパケットというものにくるまれています。 ヘッダーをパースするとデータ長が乗っているため、それをもとに次のBPFパケットの位置を求めてパースを繰り返していきます。

typedef struct {
    int fd;
    char device[11];
    unsigned int buf_len;
    char *buffer;
    unsigned int last_read_len;
    unsigned int read_bytes_consumed;
} Sniffer;

int
parse_bpf_packets(Sniffer *sniffer, CapturedInfo *info)
{
    if (sniffer->read_bytes_consumed + sizeof(sniffer->buffer) >= sniffer->last_read_len) {
        return 0;
    }

    info->bpf_hdr = (struct bpf_hdr*)((long)sniffer->buffer + (long)sniffer->read_bytes_consumed);
    info->data = sniffer->buffer + (long)sniffer->read_bytes_consumed + info->bpf_hdr->bh_hdrlen;
    sniffer->read_bytes_consumed += BPF_WORDALIGN(info->bpf_hdr->bh_hdrlen + info->bpf_hdr->bh_caplen);
    return info->bpf_hdr->bh_datalen;
}

あとはごりごりパースしていくのですが、そこはプラットフォームに変わらず同じです。 ゴリゴリ実装していくだけで解説してもしかたないのでマスタリングTCP/IPなどを頼りにソースコードを読んでみてください。

実行方法

build.sh でビルドできます。Vagrantfileも用意しているのでLinuxで試したいmacOSユーザーの方はご利用ください。実行結果は次のような感じです。

$ ./build.sh
$ ./xpcap en0 -v
device = en0, verbose = 1, port = 0

================================================================================
[TCP6]
ether_header--------------------------------------------------------------------
ether_dhost = XX:XX:XX:XX:XX:XX
ether_shost = XX:XX:XX:XX:XX:XX
ether_type = 86DD(IPv6)
ip6-----------------------------------------------------------------------------
ip6_vfc = 96
ip6_flow = 2363892320
ip6_plen = 15104
(TCP), ip6_hlim = 56
ip6_src = xxxx:xxxx:xxxx:x::xxxx:xxxx
ip6_dst = yyyy:yy:yyyy:yyyy:yyyy:yyyy:yyyy:yyyy
tcphdr--------------------------------------------------------------------------
source: 47873
destination: 59083
sequence number: 1148644729
ack number = 2897299570
data offset = 5, control flag = 24, window = 49152, checksum = 54057, urgent pointer = 0
data----------------------------------------------------------------------------
00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00    ..something..data..
================================================================================

================================================================================
[ARP]
ether_header--------------------------------------------------------------------
ether_dhost = XX:XX:XX:XX:XX:XX
ether_shost = XX:XX:XX:XX:XX:XX
ether_type = 806(Address resolution)
ether_arp-----------------------------------------------------------------------
arp_hrd = 1(Ethernet 10/100Mbps.), arp_pro = 2048(IP)
arp_hln = 6, arp_pln = 4, arp_op = 1(ARP request.)
arp_sha = 34:76:C5:77:5D:4C
arp_spa = 192.168.0.1
arp_tha = 00:00:00:00:00:00
arp_tpa = 192.168.0.8
================================================================================

================================================================================
[UDP]
ether_header--------------------------------------------------------------------
ether_dhost = XX:XX:XX:XX:XX:XX
ether_shost = XX:XX:XX:XX:XX:XX
ether_type = 800(IP)
ip------------------------------------------------------------------------------
ip_v = 4, ip_hl = 5, ip_tos = 0, ip_len = 149
ip_id = 29282, ip_off = 0, 0
ip_ttl = 255, ip_p = 17(UDP), ip_sum = 42831
ip_src = yyy.yyy.yyy.yyy
ip_dst = xxx.xxx.xxx.xxx
udphdr--------------------------------------------------------------------------
source = 5353, dest = 5353
len = 129, check = 38825
data----------------------------------------------------------------------------
00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00    ..something..data..
================================================================================

参考ソースコード

困ったときは次のコードが参考になりました。

またLinuxネットワークプログラミングバイブルはかなりおすすめの書籍です。

Linuxネットワークプログラミングバイブル

Linuxネットワークプログラミングバイブル

実践 パケット解析 第3版 ―Wiresharkを使ったトラブルシューティング

実践 パケット解析 第3版 ―Wiresharkを使ったトラブルシューティング

Google Translate APIを使ったSphinxドキュメントの自動翻訳

多言語への翻訳は大変な作業ですが、近年は機械翻訳の精度も上がってきました。 ふと思いついて .po 形式の翻訳ファイルをGoogle翻訳を通して自動で入力するスクリプト を作ったのですが、サクッと書いた割に予想以上に便利で料金も思ったより安かったので記事にしました。また実際に自分が公開している日本語で書かれたSphinxの資料を、このスクリプトを使って英語に翻訳してみます。

f:id:nwpct1:20181029112031p:plain

追記: ライセンスについて

id:beatdjam さんのコメントが気になったので共有です。 自分もGoogle Translate APIのドキュメントを読み返してみますが、利用される方も確認してからご利用ください。

以前こういった事例もあったので、OSSで利用することを推奨して良いのか心配。ドキュメントだけなら平気なのかな https://anond.hatelabo.jp/20170225195916

作ったもの

.po 形式の翻訳ファイルをパースし好きな言語にGoogle Translate APIを用いて翻訳するスクリプトを用意しました。 実行には google-cloud-translate とGCP service accountが必要です。

$ pip install --upgrade google-cloud-translate
$ export GOOGLE_SERVICE_ACCOUNT_JSON=/path/to/service-account-credential.json
$ python translate_po.py --help
usage: translate_po.py [-h] [--lang LANG] [--currency CURRENCY] filepath

positional arguments:
  filepath

optional arguments:
  -h, --help           show this help message and exit
  --lang LANG          target language (default: "ja")
  --currency CURRENCY  dollar per your currency. (default currency is yen: 111.90)

現状はとりあえずファイルの上書きオプションなどは用意せず、stdoutに書き出すようにしています。 Google Translate APIは、100万文字あたり20ドルかかります。 本一冊とかになると数百円かそれ以上かかりそうですが、手元の文章を翻訳したいなどの用途なら数十円に収まることがほとんどです。 ちなみにマルチバイト文字でも1文字は1文字としてカウントしてくれるようなので、日本語から英語の翻訳などは比較的お得です。 Google Translate APIに投げたテキストの文字数からかかった金額も算出し表示するようにしています。

$ python translate_po.py ./po/index.po 1>./po/index_ja.po
Cost: 2.1417659999999996 yen

また翻訳結果はキャッシュしていて、実行したディレクトリの直下に json ファイルを書き出します。 なので2回目の実行は、キャッシュが効きお金を節約できます。

$ python translate_po.py ./po/index.po 1>./po/index_ja.po
Cost: 0 yen

ソースコードGithubで公開しています。

github.com

実際に翻訳してみる

Webアプリケーションフレームワークの作り方 in Python — c-bata.link (Githubはこちら) はSphinxで書かれた日本語の資料です。 今回はこちらを英語に翻訳していきます。Sphinxのドキュメントの国際化の方法は次のページに非常によくまとまっています。

まず sphinx-intl をインストールします。

$ pip install sphinx-intl
$ vim source/conf.py
# add following settings
# locale_dirs = ['locale/']
# gettext_compact = False
$ make gettext
$ ls build/locale/
index.pot      kobin.pot      middleware.pot request.pot    response.pot   routing.pot    server.pot     sphinx.pot     template.pot   wsgi.pot

potファイルができました。今回は日本語から英語に翻訳するので、次のようにします。

$ sphinx-intl update -p build/locale -l ja
Create: source/locale/ja/LC_MESSAGES/kobin.po
Create: source/locale/ja/LC_MESSAGES/template.po
Create: source/locale/ja/LC_MESSAGES/middleware.po
Create: source/locale/ja/LC_MESSAGES/sphinx.po
Create: source/locale/ja/LC_MESSAGES/request.po
Create: source/locale/ja/LC_MESSAGES/routing.po
Create: source/locale/ja/LC_MESSAGES/wsgi.po
Create: source/locale/ja/LC_MESSAGES/response.po
Create: source/locale/ja/LC_MESSAGES/index.po
Create: source/locale/ja/LC_MESSAGES/server.po

poファイルが出来上がったら変換をかけます。 このスクリプトは今のところ上書き用のオプションを用意していないので、一度stdoutをファイルに書き出して置き換える必要があります。 いくつかファイルがあるので変換用のスクリプトを用意しました。

$ cat > translate.sh <<EOF
#!/bin/bash
 function translate {
  for f in ./ja/LC_MESSAGES/*.po; do
    python translate_po.py --lang en $f 1>${f%.po}_en.po
    mv ${f%.po}_en.po $f;
  done; 
}
 translate
EOF
$ chmod +x ./translate.sh
$ ./translate.sh
Cost: 3.6904620000000006 yen
Cost: 0.024617999999999998 yen
Cost: 1.60017 yen
Cost: 4.728894 yen
Cost: 3.8784539999999996 yen
Cost: 5.8188 yen
Cost: 1.087668 yen
Cost: 1.4009880000000001 yen
Cost: 7.00494 yen

トータル30円くらいかかりました。翻訳精度を考えるとすごくお得に感じます。 最後はこれをbuildしてみましょう。

$ make -e SPHINXOPTS="-D language='ja'" html
$ open build/html/index.html

結果は次のような感じです。

f:id:nwpct1:20181028231948p:plain

f:id:nwpct1:20181028232035p:plain

f:id:nwpct1:20181028232044p:plain

reSTのリンクが壊れたり、いくつか変な文字が混ざっていたりはしますが予想以上にそれっぽくなりました。

エキスパートPythonプログラミング 改訂2版 (アスキードワンゴ)

エキスパートPythonプログラミング 改訂2版 (アスキードワンゴ)

Double forkによるプロセスのデーモン化と、ファイル変更時の自動サーバーリロードの実装 (Python)

Pythonで約100行のシンプルなWSGIサーバーを書いてみる - c-bata webWSGIサーバーを作ってみました。 100行程度の非常に簡易的なものでしたが、実際にDjangoアプリケーションを動かすこともできました。 前回作ったWSGIサーバーをもう少し便利に使えるようにいくつか機能を追加したのですが、 その中でもWSGIサーバーに限らず知っておくとよさそうな3つの実装を紹介します。

目次:

Double Fork によるサーバープロセスのデーモン化

WSGIサーバーのように長時間動かすようなプログラムはデーモン化しておきたい場合があります。gunicornでも daemon オプションが用意されていて設定で簡単に切り替えることができます。 ところでバックグラウンドで実行するだけなら端末上でコマンドを入力した後に & をつけることもできます。 デーモン化とはなにか違うのでしょうか。少し試してみましょう。

$ python3.7 -m http.server 8000 &
[1] 37682
a14737: ~
$ Serving HTTP on 0.0.0.0 port 8000 (http://0.0.0.0:8000/) ...

この例では & をつけたことでバックグラウンドになり、別のコマンドを実行できる状態になりましたが、標準ストリームはまだ繋がっていてプロセスの出力が表示されてしまっています。例えば localhost:8000 にアクセスしてみると 127.0.0.1 - - [29/Sep/2018 22:51:32] "GET / HTTP/1.1" 200 - のログがこの端末上に流れます。これ以外にもいくつか問題が残っていて、プログラムによってはSSH接続を切ったときに送られる SIGHUP を受け取ると終了してしまいます。デーモン化というのはこのようにただバックグラウンドにするわけではなく、プロセスから制御端末(tty)を完全に切り離します。

制御端末からプロセスを切り離すために有効なのは setsid() システムコールです*1。これについて知るためにはプロセスグループやセッションという概念を理解しておく必要があります。ただぐぐると詳しい記事が出てくるので詳しい説明はそちらにおまかせします *2

サーバーにログインすると、ログインセッションが作られます。このセッションのリーダーであるプロセス1つが制御端末(tty)とやり取りを行います。システムコール setsid() を呼び出すと新しいセッションを作成しそのセッショングループのリーダーになります。ここで作成されたセッションはまだ制御端末(tty)を取得していないのでプロセス上に制御端末がなくなったことになります。ただこのとき2つ注意点があります。

  1. もし setsid() システムコールを呼び出したプロセスが、プロセスグループリーダーだった場合はエラーが返ってきます。
  2. setsid() を呼ぶことで新しいセッションのセッショングループリーダーになりますが、セッショングループリーダーは唯一制御端末とやりとりすることが可能です。プログラムの実装によってはあとで制御端末を取得してしまうかもしれません。

下図に示す「double fork」と呼ばれるテクニックを利用することでこの2つの懸念が解消されます。

f:id:nwpct1:20181001191511p:plain

double fork の名の通り、2回 fork(2) を呼び出します。

  1. 最初に子プロセスを生成した後、親プロセス側で sys.exit(0) を呼びすぐに終了する。子プロセス側は必ずグループリーダーではないため、 setsid() システムコールが呼び出せます。
  2. 子プロセス側で sys.setsid() を呼び出し、セッショングループリーダーになり制御端末から切り離します。しかしこの子プロセスはセッショングループリーダーです。さらに孫プロセスを生成し、セッショングループリーダーである子プロセスを終了してしまえば、このセッションに制御端末が取得されることはありません。

ソースコードは次のとおりです。

# https://github.com/kobinpy/kwsgi/pull/2

def daemonize():
    """Detaches the server from the controlling terminal and enters the background."""
    if os.fork():
        # Exit a parent process because setsid() will be fail
        # if you're a process group leader.
        sys.exit(0)

    # Detaches the process from the controlling terminal.
    os.setsid()

    if os.fork():
        sys.exit(0)

    # Continue to run application if directory is removed.
    os.chdir('/')

    # 0o22 means 755 (777-755)
    os.umask(0o22)

    # Remap all of stdin, stdout and stderr on to /dev/null.
    # Please caution that this way couldn't support following execution:
    #   $ kwsgi ... > output.log 2>&1
    os.closerange(0, 3)
    fd_null = os.open(DEVNULL, os.O_RDWR)
    if fd_null != 0:
        os.dup2(fd_null, 0)
    os.dup2(fd_null, 1)
    os.dup2(fd_null, 2)


Pythonファイルの更新に検知してサーバーを自動で再起動する

開発用途のサーバーとしては、ファイルの更新を検知して自動でサーバーが再起動してくれると便利そうです。 これは実装が少しだけ複雑ですが、面白い実装になっています。 ソースコードも少し長くなるので細かい解説までは今回は省くことにしましたが、ソースコードにコメントを多めに入れたので読んでみてください。 このプログラムのポイントは次のあたりです。

  1. Pythonが読み込んだモジュールのファイル一覧と最終更新時刻を取得する
    1. sys.modules から読み込んでいるモジュールの一覧を取得
    2. getattr(module, '__file__') を読み出し、 os.path.exists でファイルの存在をチェック
    3. os.stat(path).st_mtime より最終更新時刻を取得する。
  2. 呼び出しコマンドをサブプロセスでもう一回実行してWSGIサーバーを実行するワーカープロセスを生成する
    1. args = [sys.executable] + sys.argvsubprocessモジュールで実行する
    2. 再帰的にコマンドが実行され続けて大変なことになるのを避けるため、環境変数 KWSGI_CHILD で制御する。
    3. Exit ステータスコードを正常終了(0)と異常終了(1)、リロード(3) で使い分けて、3なら再度ワーカープロセスを生成する。
  3. メインプロセスとワーカープロセスのやりとりは tmp 領域に作成したファイルを経由して行う
    1. tempfile.mkstemp(prefix='kwsgi.', suffix='.lock') を実行して temp 領域にファイルを作成
    2. ワーカープロセス呼び出し時の KWSGI_LOCKFILE 環境変数でファイルパスを伝える
    3. lockfileの状態を確認してファイルが削除されていたら異常終了し、更新されていたら reloadを意味する 3 をExit Statusに設定して終了する。
import os
import sys
import threading
import time
import _thread
import subprocess
import tempfile


# For reloading server when detected python files changes.
EXIT_STATUS_RELOAD = 3


class FileCheckerThread(threading.Thread):
    def __init__(self, lockfile, interval):
        threading.Thread.__init__(self)
        self.daemon = True
        self.lockfile, self.interval = lockfile, interval
        self.status = None # 'reload', 'error' or 'exit'


    def run(self):
        files = dict()

        # sys.modules からPythonモジュールの一覧を取得
        for module in list(sys.modules.values()):
            # __file__ が定義されていればそこからファイルパスが取得できる。
            path = getattr(module, '__file__', '')
            if path[-4:] in ('.pyo', '.pyc'):
                path = path[:-1]
            if path and os.path.exists(path):
                files[path] = os.stat(path).st_mtime

        while not self.status:
            # lockfile が削除 or 更新されていたら status を 'error' にして例外をraiseして通知する.
            if not os.path.exists(self.lockfile) or \
                    os.stat(self.lockfile).st_mtime < time.time() - self.interval - 5:
                self.status = 'error'
                _thread.interrupt_main()
            for path, last_mtime in files.items():
                if not os.path.exists(path) or os.stat(path).st_mtime > last_mtime:
                    # ファイルが更新されていたら status を 'reload' にセットして例外をraiseして通知する。
                    self.status = 'reload'
                    _thread.interrupt_main()
                    break
            time.sleep(self.interval)

    def __enter__(self):
        self.start()

    def __exit__(self, exc_type, *_):
        if not self.status:
            self.status = 'exit'  # silent exit
        self.join()
        return exc_type is not None and issubclass(exc_type, KeyboardInterrupt)


class AutoReloadServer:
    def __init__(self, func, args=None, kwargs=None):
        self.func = func
        self.func_args = args
        self.func_kwargs = kwargs

    def run_forever(self, interval):
        # メインプロセスから呼び出された子プロセスには 'KWSGI_CHILD' 環境変数が存在。
        # 無限に呼び出されてしまうため、環境変数で子プロセスであることを教える
        if not os.environ.get('KWSGI_CHILD'):
            lockfile = None
            try:
                fd, lockfile = tempfile.mkstemp(prefix='kwsgi.', suffix='.lock')
                os.close(fd)  # We only need this file to exist. We never write to it
                while os.path.exists(lockfile):
                    # ユーザーが端末で実行したプログラムをもう一度実行する。
                    args = [sys.executable] + sys.argv
                    environ = os.environ.copy()
                    # 無限に呼び出されてしまうため、環境変数で子プロセスであることを教える
                    environ['KWSGI_CHILD'] = 'true'
                    # ワーカープロセスとのやり取りに用いるファイルも環境変数で渡す。
                    environ['KWSGI_LOCKFILE'] = lockfile
                    p = subprocess.Popen(args, env=environ)
                    while p.poll() is None:  # Busy wait...
                        os.utime(lockfile, None)  # Alive! If lockfile is unlinked, it raises FileNotFoundError.
                        time.sleep(interval)
                    # 終了ステータスをチェックする。Reload(3) 以外なら終了する。
                    if p.poll() != EXIT_STATUS_RELOAD:
                        if os.path.exists(lockfile):
                            os.unlink(lockfile)
                            sys.exit(p.poll())
            except KeyboardInterrupt:
                pass
            finally:
                if os.path.exists(lockfile):
                    os.unlink(lockfile)
            return

        # ワーカープロセスの処理を記述する
        # ワーカープロセスはコマンド呼び出し時、 KWSGI_LOCKFILE を環境変数で指定する。
        # lockfileを通してメインプロセスと通信する。ファイルが削除されていたら終了。ファイルが更新されていたらリロードを意味する。
        try:
            lockfile = os.environ.get('KWSGI_LOCKFILE')
            bgcheck = FileCheckerThread(lockfile, interval)
            with bgcheck:
                self.func(*self.func_args, **self.func_kwargs)
            if bgcheck.status == 'reload':
                sys.exit(EXIT_STATUS_RELOAD)
        except KeyboardInterrupt:
            pass
        except (SystemExit, MemoryError):
            raise
        except:
            time.sleep(interval)
            sys.exit(EXIT_STATUS_RELOAD)

次のように使います。

server = AutoReloadServer(something_func, kwargs={'app': app, 'host': host, 'port': port})
server.run_forever(interval)

Bottleの実装を参考に勉強したのですが、lockfileによるプロセス間のステータスのやりとりや、環境変数を使ったワーカープロセスの判定は自分にとって珍しく面白い実装でした。kwsgiでは次のファイルで実装しています。

github.com


文字列で指定したPythonオブジェクトを動的に読み込んで実行する方法

最後は、コマンドラインアプリケーションをつくるときに知っておくと便利なTipsです。 gunicorn や uWSGI といったWSGIサーバーのコマンドラインインターフェイスでは、実行対象のファイル名とそこに書かれてあるWSGIアプリケーションをコマンドライン引数で指定します。

# hello.py
def application(env, start_response):
    start_response("200 OK", [("Content-Type", "text/plain")]
    return [b"Hello World"]

このようなファイルを用意して、 gunicorn -w 1 hello:application と指定すると、hello.py というファイルを読み込んでからその中にある application と名前のついたオブジェクトを動的に取り出し、WSGIサーバーに読み込ませる必要があります。

やりかたはいくつかありますが、Python3だけを対象にするなら importlib.machinery.SourceFileLoader を使った方法が手軽です。

from importlib.machinery import SourceFileLoader

filepath = 'target.py'
app_name = 'application'

def insert_import_path_to_sys_modules(import_path):
    abspath = os.path.abspath(import_path)
    if os.path.isdir(abspath):
        sys.path.insert(0, abspath)
    else:
        sys.path.insert(0, os.path.dirname(abspath))

insert_import_path_to_sys_modules(os.path.abspath(filepath))
module = SourceFileLoader('module', filepath).load_module()
app = getattr(t, app_name)

使い方はこのように非常に簡単です。ファイルをモジュールとして動的に読み込み getattr() で対象のオブジェクトを取得します。もしPython 2で同様のことをやるなら importlib が使えないため exec()compile() 関数を使った少しトリッキーな実装が必要になります。

import types

filepath = 'target.py'
app_name = 'application'

t = types.ModuleType('app')

with open(filepath) as config_file:
    exec(compile(config_file.read(), src.name, 'exec'), t.__dict__)

app = getattr(t, app_name)

対象のファイルを open() し、 compile() 関数により動的にファイルの中身をコンパイルしてコードオブジェクトを取得します *3。その後 exec() 関数により types.ModuleType() で用意した変数に module オブジェクトを割り当てます。 こちらの方法を使うことはもうほとんど無いかと思うので、参考までに。ちなみに以前自分が作っていたWSGIフレームワークKobinで、 exec() を使った実装から importlib を使った実装に書き換えたときのコミットは↓のrevisionです、参考までに。

github.com

謝辞

自分がdouble forkというものを知ったのは tell-k さんのPyCon JPの発表 Pythonでざっくり学ぶUnixプロセス がきっかけでした。 少し解釈に自信がない部分もあったので、本記事も tell-k さんにレビューいただきました。ありがとうございます。 double forkの2回目のforkをする理由については、別の理由とかではないか少し心配だったのですが tell-kさんも基本的には同じ解釈のようです。 2回目のforkの意図について確認した際に、tell-kさんから次の返事がきました。

私も資料作ってる時に同じ疑問に思いました。そしたら下記リンクにたどり着きましたので共有しておきます。 http://q.hatena.ne.jp/1320139299

setsid = セッションリーダーで、技術的に制御端末の割り当てが可能だから、確実に割り当てできないようにセッションリーダーではない孫を作るっていう認識は私も一緒です。

あとは daemon化とは直接関係なさそうな気はしますが、double fork することで、ゾンビプロセスを抑制できるって話は面白かったです。 http://d.hatena.ne.jp/sleepy_yoshi/20100228/p1

どうやら親が死んで init (システム上の一番最初のプロセス。pid=1, ppid=0)の養子になることが、ゾンビプロセスの発生を抑えることにも繋がっているとのことでした。なぜ init が頻繁にwaitを呼んでいるのか、その理由までは調べきれていませんが参考までに。

他にも今回はgunicornのdouble forkの実装を参考にしました。 gunicornの実装では、 chdir() を呼んでいないのですがこちらの理由もまた分かったらまた追記しておこうかとおもいます。

今日紹介した実装はどれも書いていて楽しいコードでした。 次は時間があるときにでもWSGIサーバーのパフォーマンス最適化のための方法を勉強してまとめたいなと思います。