PIPEによるプロセス間通信とselect, poll, epollの話

エキスパートPythonプログラミング 改訂2版 (アスキードワンゴ)

エキスパートPythonプログラミング 改訂2版 (アスキードワンゴ)

先日、 tokibito 先生(id:nullpobug)と勉強していたpipe, select, poll, epollあたりについてメモ。

os.fork

os.fork : 子プロセス(child process)をつくれる。

import os

a = 0
print(a)

os.fork()

a += 1
print(a)

子は親の複製。親のデータ、ヒープ、スタックの各空間の複製を取得。 メモリのこれらの部分は共有されないので、実行結果は次のようになる(テキストセグメントは共有される)。

$ python3.5 fork.py
0
1
1

ちなみに親と子、どちらが先に実行され1を出力したのかは、カーネルが使うスケジューリングアルゴリズムに依存。もし親と子で同期をとりたいならプロセス間通信が必要となる。

プロセス記述子(pid)

ちなみに os.fork は、親側には子のPID、子側には0を返します。

  • 親や子が自身のPIDを知りたいときは、 os.getpid()
  • 子が親のPIDを知りたいときは、 os.getppid()

をよびだします。

import os

pid = os.fork()
if pid:  # Parent Process
    print('親:自身のPID', os.getpid())
    print('親:子のPID', pid)
else:  # Child Process
    print('子:自身のPID', os.getpid())
    print('子:親のPID', os.getppid())
$ python3.5 fork.py
親:自身のPID 37938
親:子のPID 37939
子:自身のPID 37939
子:親のPID 37938

詳解UNIXプログラミングを読んでいて、たしかにと思ったんですが、この関数は1度だけ呼び出すけど2度戻る。なんか不思議な感じ。

パイプによるプロセス間通信

パイプは os.pipe で作成。ファイル記述子のペア (r, w) を返し、それぞれ読み出し、書き込み用に使うことができます。これを使ってプロセス間通信を試してみる。

import os


def main():
    read_fd, write_fd = os.pipe()
    pid = os.fork()
    if pid:  # Parent process
        os.close(read_fd)
        write_pipe = os.fdopen(write_fd, 'w')
        write_pipe.write('hello')
        write_pipe.close()
    else:  # Child process
        os.close(write_fd)
        read_pipe = os.fdopen(read_fd, 'r')
        content = read_pipe.read()
        read_pipe.close()
        print(content)


if __name__ == '__main__':
    main()
$ python3.5 process-communication.py
hello

bytesを読み書きしたい場合は、

def bytes_main():
    read_pipe, write_pipe = os.fdopen(read_fd, 'rb'), os.fdopen(write_fd, 'wb')

    pid = os.fork()
    if pid:  # Parent Process
        read_pipe.close()
        write_pipe.write(b'hello')
        write_pipe.close()
    else:  # Child Process
        write_pipe.close()
        content = read_pipe.read(5)
        print(content)
        read_pipe.close()

NON BLOCKINGモード

ノンブロッキングで読み書きしてみる さっきまでは read_pipe.read() を呼び出すと、値がくるまでブロックしてしまいます。 値が来てない間は、ずっと待ったりせずに他の処理をしたいことがあるかもしれません。

値はあれば返すし、なければ何も無かったよってことを返す。 そうすると、じゃあその間他の処理をしようって判断ができる。

FCNTLというものがあって、File Descriptorにnon blockingモード(os.NON_BLOCKING)を指定出来たりする。

import os
import time
import fcntl


def main():
    read_fd, write_fd = os.pipe()

    pid = os.fork()
    if pid:
        print('Parent:', os.getpid())
        write_pipe = os.fdopen(write_fd, 'wb')
        for w in b'Hello':
            time.sleep(1)
            write_pipe.write(bytes([w]))
            write_pipe.flush()
        write_pipe.close()
    else:
        print('Child:', os.getpid())
        fcntl.fcntl(read_fd, fcntl.F_SETFL, os.O_NONBLOCK)
        read_pipe = os.fdopen(read_fd, 'rb')

        bytes_read = 0
        content = b''
        while True:
            read_data = read_pipe.read()
            if read_data:
                bytes_read += len(read_data)
                content +=read_data
                print(bytes(read_data))
            if bytes_read >= 5:
                break
        read_pipe.close()
        print(content)

main()

注意点としては、書き込み側も出来るだけ連続したところに書きたいために、Bufferにためています。 writeした時点では、バッファに乗ってるだけなので、ちゃんとフラッシュしてあげないといけません。 (さっきまではcloseする時に、バッファのコピーがフラッシュされていました)

f:id:nwpct1:20161008160111g:plain

しかし今のプログラムには1つ致命的な問題があります。 これを実行中に、別タブ開いてtop コマンドを叩いてみてください。 CPUの使用率が100%となっています。

select, poll, epoll

selectを触ってみる

import os
import time
import select


def main():
    read_fd, write_fd = os.pipe()

    pid = os.fork()
    if pid:  # Parent Process
        print('parent', os.getpid())
        write_pipe = os.fdopen(write_fd, 'wb')
        for w in b'Hello':
            time.sleep(1)
            write_pipe.write(bytes([w]))
            write_pipe.flush()
        write_pipe.close()

    else:  # Child Process
        print('child', os.getpid())
        read_pipe = os.fdopen(read_fd, 'rb')
        bytes_read = 0
        content = b''
        exit_flag = False
        rready, _, _ = select.select([read_fd], [], [])

        while True:
            for fd in rready:
                read_data = read_pipe.read(1)
                print(read_data)

                bytes_read += len(read_data)
                content += read_data
                if bytes_read >= 5:
                    exit_flag = True
                    break
            if exit_flag:
                break
        read_pipe.close()
        print(content)

main()

実行してみると、

f:id:nwpct1:20161008163123g:plain

poll, epoll

pollとepollは同じようなインタフェースです。 tokibito先生がepollのサンプルをgistで公開しているので、ここでは省略

forkとpipeとepollのサンプルコード

違いとか

どれを使うかは結構議論されてるとこなのかな。とりあえずtokibito先生と話してたこと:

  • pollとselectはそもそもインタフェースが違う。
    • selectの方が簡単そう?
  • pollとepollは、Pythonレベルでは同じインタフェース
  • time.sleep はosのapi呼んでいる。これでもCPU使用率は下げられる
    • poll とか使うとイベントが来た時に動ける
    • time.sleep だと読む必要がない時に読みに行くし、読まないと行けない時にすぐ読まない
  • daemon 系のスクリプト書くときとかもpollとかselect使ったりする

環境によってこれ使えないよとかがある。詳しくはドキュメンテーションに書いてますが、てっとり速く確認するならselectモジュールの中覗けばいい。

Mac OSX

>>> dir(select)
['error', 'kevent', 'kqueue', 'poll', 'select']

CentOS

>>> dir(select)
['epoll', 'error', 'poll', 'select']

補足

全く本題とは違うんですが、tokibito先生と話していて知ったこととかを、自分用のメモとしてここに書いておきます。

補足1: ファイル共有

forkは、親のオープンしているすべてのファイル記述子を二重化(各記述子について関数dup を読んだのと同じ)します。 親と子はオープンしている各記述子のファイルテーブルエントリを共有。 親と子が同じ記述子へ書き出す場合、親が子を待つ(wait)などの同期をとらないと出力が混ざる。

>>> import os
>>> os.fork()
36897
>>> 0
>>> ^C

KeyboardInterrupt
>>> KeyboardInterrupt
>>> >>>

補足2: スレッドについて

当然ですが、スレッドなら名前空間共有されてる。

import threading

a = 0
print(a)

def main():
    global a
    a += 1
    print(a)

t1 = threading.Thread(target=main)
t2 = threading.Thread(target=main)

t1.start()
t2.start()
$ python3.5 thread_test.py
0
1
2

補足3: スレッドについて2

OSスレッドとは違って、Erlangのような言語は軽量スレッド・マイクロスレッドと呼ばれるものが実装されていて、OSが提供するスレッド機能の限界を超えた数の並列処理が出来たり、OSのスケジューリングの影響を受けなかったりするらしい。呼ばれるものがあるらしい。

自分の持っているサーバでは、

$ cat /proc/sys/vm/max_map_count
65530

PythonのthreadingモジュールとかはOSが提供するスレッドを生成する。

import threading
import time
def main():
    while True:
        time.sleep(1)

t = threading.Thread(target=main)
t.start()

この時にpsでthreadまで表示。

$ ps aux -L | grep python
mainte   29700 29700  0.0    2  0.7 206504  7528 pts/3    Sl+  00:42   0:00 /opt/python-3.5.0/bin/python3
mainte   29700 29762  0.0    2  0.7 206504  7528 pts/3    Sl+  00:46   0:00 /opt/python-3.5.0/bin/python3

2列目がPID、3列目がPPID。同じプロセスでスレッドが2つ動いているのが確認できる。

Linux におけるスレッド数の上限 | yunabe.jp によると、システム全体でのスレッド数の上限は、下記コマンドで確認できるらしい。 Golangのgoroutineとかもマイクロスレッドなんだけど、 Go言語の低レイテンシGC実現のための取り組み | POSTD とか読むと1プロセスで150万のgoroutineを生成してると聞いて驚いた。

$ ulimit -u  # 現在のユーザが起動できるプロセス数の上限( /etc/security/limits.conf で調整可能)
3893
$ cat /proc/sys/kernel/threads-max  # システム全体でのスレッド数( /etc/sysctl.conf から調整可能)
7787
$ cat /proc/sys/vm/max_map_count  # これを半分にした数を超えてスレッドを作ることはできない
65530

補足4: mod_wsgi について

mod-wsgiPython VMをつくってる。1プロセス内でメモリ空間(そこにpython vmがあるかんじ)を区切ってる。 これはCからしか叩け無いAPIがあるらしい。 cpythonのプログラムが全部あるわけではなくて、sysモジュールとかそういうのがpython vm内にある。 他は共有してるらしい。

面白いので読んでみるといいとのこと。

補足5: signal

gunicornのソースとかpure pythonで読みやすいし、勉強になるよって話になった。 シグナルとかの扱いを知っておくといいとのこと。

import signal
import time
import sys

def handler(signum, frmae):
    print('signal: {}'.format(signum))
    sys.exit()  # ここでexitしなければ動き続ける

signal.signal(signal.SIGTERM, handler)  # handlerを登録. sigtermが来た時にhandlerを実行

while True:
    time.sleep(1)

補足6: concurrent.futures

ソース(pure python)を読むと分かるけど、 ProcessPoolExecutorはただのpythonのcollections.queueを用いてじっそうしたQueueオブジェクトでためてる。 そのなかではthread safeにするために(同時にアクセスされても大丈夫)threading.Lock()を呼び出す。

ちなみにロックの実装方法は色々あって、fileをopenができたらlockがとれたみたいなことする実装もある。

補足7: 他への応用

ここではPIPEによるプロセス間通信でselectやpoll, epollを試したんですが、Socket通信とかをNon Blockingにするのもファイルディスクリプタに対して同じようにできる。

References

詳解UNIXプログラミング 第3版

詳解UNIXプログラミング 第3版