エキスパートPythonプログラミング 改訂2版 (アスキードワンゴ)
- 作者: Michal Jaworski,TarekZiade,稲田直哉,芝田将,渋川よしき,清水川貴之,森本哲也
- 出版社/メーカー: ドワンゴ
- 発売日: 2018/02/26
- メディア: Kindle版
- この商品を含むブログを見る
先日、 tokibito 先生(id:nullpobug)と勉強していたpipe, select, poll, epollあたりについてメモ。
os.fork
os.fork
: 子プロセス(child process)をつくれる。
import os a = 0 print(a) os.fork() a += 1 print(a)
子は親の複製。親のデータ、ヒープ、スタックの各空間の複製を取得。 メモリのこれらの部分は共有されないので、実行結果は次のようになる(テキストセグメントは共有される)。
$ python3.5 fork.py 0 1 1
ちなみに親と子、どちらが先に実行され1を出力したのかは、カーネルが使うスケジューリングアルゴリズムに依存。もし親と子で同期をとりたいならプロセス間通信が必要となる。
プロセス記述子(pid)
ちなみに os.fork は、親側には子のPID、子側には0を返します。
- 親や子が自身のPIDを知りたいときは、
os.getpid()
- 子が親のPIDを知りたいときは、
os.getppid()
をよびだします。
import os pid = os.fork() if pid: # Parent Process print('親:自身のPID', os.getpid()) print('親:子のPID', pid) else: # Child Process print('子:自身のPID', os.getpid()) print('子:親のPID', os.getppid())
$ python3.5 fork.py 親:自身のPID 37938 親:子のPID 37939 子:自身のPID 37939 子:親のPID 37938
詳解UNIXプログラミングを読んでいて、たしかにと思ったんですが、この関数は1度だけ呼び出すけど2度戻る。なんか不思議な感じ。
パイプによるプロセス間通信
パイプは os.pipe で作成。ファイル記述子のペア (r, w)
を返し、それぞれ読み出し、書き込み用に使うことができます。これを使ってプロセス間通信を試してみる。
import os def main(): read_fd, write_fd = os.pipe() pid = os.fork() if pid: # Parent process os.close(read_fd) write_pipe = os.fdopen(write_fd, 'w') write_pipe.write('hello') write_pipe.close() else: # Child process os.close(write_fd) read_pipe = os.fdopen(read_fd, 'r') content = read_pipe.read() read_pipe.close() print(content) if __name__ == '__main__': main()
$ python3.5 process-communication.py hello
bytesを読み書きしたい場合は、
def bytes_main(): read_pipe, write_pipe = os.fdopen(read_fd, 'rb'), os.fdopen(write_fd, 'wb') pid = os.fork() if pid: # Parent Process read_pipe.close() write_pipe.write(b'hello') write_pipe.close() else: # Child Process write_pipe.close() content = read_pipe.read(5) print(content) read_pipe.close()
NON BLOCKINGモード
ノンブロッキングで読み書きしてみる
さっきまでは read_pipe.read()
を呼び出すと、値がくるまでブロックしてしまいます。
値が来てない間は、ずっと待ったりせずに他の処理をしたいことがあるかもしれません。
値はあれば返すし、なければ何も無かったよってことを返す。 そうすると、じゃあその間他の処理をしようって判断ができる。
FCNTLというものがあって、File Descriptorにnon blockingモード(os.NON_BLOCKING)を指定出来たりする。
import os import time import fcntl def main(): read_fd, write_fd = os.pipe() pid = os.fork() if pid: print('Parent:', os.getpid()) write_pipe = os.fdopen(write_fd, 'wb') for w in b'Hello': time.sleep(1) write_pipe.write(bytes([w])) write_pipe.flush() write_pipe.close() else: print('Child:', os.getpid()) fcntl.fcntl(read_fd, fcntl.F_SETFL, os.O_NONBLOCK) read_pipe = os.fdopen(read_fd, 'rb') bytes_read = 0 content = b'' while True: read_data = read_pipe.read() if read_data: bytes_read += len(read_data) content +=read_data print(bytes(read_data)) if bytes_read >= 5: break read_pipe.close() print(content) main()
注意点としては、書き込み側も出来るだけ連続したところに書きたいために、Bufferにためています。 writeした時点では、バッファに乗ってるだけなので、ちゃんとフラッシュしてあげないといけません。 (さっきまではcloseする時に、バッファのコピーがフラッシュされていました)
しかし今のプログラムには1つ致命的な問題があります。 これを実行中に、別タブ開いてtop コマンドを叩いてみてください。 CPUの使用率が100%となっています。
select, poll, epoll
selectを触ってみる
import os import time import select def main(): read_fd, write_fd = os.pipe() pid = os.fork() if pid: # Parent Process print('parent', os.getpid()) write_pipe = os.fdopen(write_fd, 'wb') for w in b'Hello': time.sleep(1) write_pipe.write(bytes([w])) write_pipe.flush() write_pipe.close() else: # Child Process print('child', os.getpid()) read_pipe = os.fdopen(read_fd, 'rb') bytes_read = 0 content = b'' exit_flag = False rready, _, _ = select.select([read_fd], [], []) while True: for fd in rready: read_data = read_pipe.read(1) print(read_data) bytes_read += len(read_data) content += read_data if bytes_read >= 5: exit_flag = True break if exit_flag: break read_pipe.close() print(content) main()
実行してみると、
poll, epoll
pollとepollは同じようなインタフェースです。 tokibito先生がepollのサンプルをgistで公開しているので、ここでは省略
違いとか
どれを使うかは結構議論されてるとこなのかな。とりあえずtokibito先生と話してたこと:
- pollとselectはそもそもインタフェースが違う。
- selectの方が簡単そう?
- pollとepollは、Pythonレベルでは同じインタフェース
- pollは毎回ファイルディスクリプタのリストを渡す必要がある。
- つまりPython側で管理している。毎回、kernel領域にメモリコピーをしないといけない。
- epollはregisterの時にファイルディスクリプタのリストを渡すと、kernel側で保持しておいてくれる。
- Man page of EPOLL
- pollと違って、毎回kernel空間にメモリコピーしなくていい
- time.sleep はosのapi呼んでいる。これでもCPU使用率は下げられる
- poll とか使うとイベントが来た時に動ける
- time.sleep だと読む必要がない時に読みに行くし、読まないと行けない時にすぐ読まない
- daemon 系のスクリプト書くときとかもpollとかselect使ったりする
環境によってこれ使えないよとかがある。詳しくはドキュメンテーションに書いてますが、てっとり速く確認するならselectモジュールの中覗けばいい。
Mac OSX
>>> dir(select) ['error', 'kevent', 'kqueue', 'poll', 'select']
>>> dir(select) ['epoll', 'error', 'poll', 'select']
補足
全く本題とは違うんですが、tokibito先生と話していて知ったこととかを、自分用のメモとしてここに書いておきます。
補足1: ファイル共有
forkは、親のオープンしているすべてのファイル記述子を二重化(各記述子について関数dup を読んだのと同じ)します。 親と子はオープンしている各記述子のファイルテーブルエントリを共有。 親と子が同じ記述子へ書き出す場合、親が子を待つ(wait)などの同期をとらないと出力が混ざる。
>>> import os >>> os.fork() 36897 >>> 0 >>> ^C KeyboardInterrupt >>> KeyboardInterrupt >>> >>>
補足2: スレッドについて
当然ですが、スレッドなら名前空間共有されてる。
import threading a = 0 print(a) def main(): global a a += 1 print(a) t1 = threading.Thread(target=main) t2 = threading.Thread(target=main) t1.start() t2.start()
$ python3.5 thread_test.py 0 1 2
補足3: スレッドについて2
OSスレッドとは違って、Erlangのような言語は軽量スレッド・マイクロスレッドと呼ばれるものが実装されていて、OSが提供するスレッド機能の限界を超えた数の並列処理が出来たり、OSのスケジューリングの影響を受けなかったりするらしい。呼ばれるものがあるらしい。
自分の持っているサーバでは、
$ cat /proc/sys/vm/max_map_count
65530
PythonのthreadingモジュールとかはOSが提供するスレッドを生成する。
import threading import time def main(): while True: time.sleep(1) t = threading.Thread(target=main) t.start()
この時にpsでthreadまで表示。
$ ps aux -L | grep python mainte 29700 29700 0.0 2 0.7 206504 7528 pts/3 Sl+ 00:42 0:00 /opt/python-3.5.0/bin/python3 mainte 29700 29762 0.0 2 0.7 206504 7528 pts/3 Sl+ 00:46 0:00 /opt/python-3.5.0/bin/python3
2列目がPID、3列目がPPID。同じプロセスでスレッドが2つ動いているのが確認できる。
Linux におけるスレッド数の上限 | yunabe.jp によると、システム全体でのスレッド数の上限は、下記コマンドで確認できるらしい。 Golangのgoroutineとかもマイクロスレッドなんだけど、 Go言語の低レイテンシGC実現のための取り組み | POSTD とか読むと1プロセスで150万のgoroutineを生成してると聞いて驚いた。
$ ulimit -u # 現在のユーザが起動できるプロセス数の上限( /etc/security/limits.conf で調整可能) 3893 $ cat /proc/sys/kernel/threads-max # システム全体でのスレッド数( /etc/sysctl.conf から調整可能) 7787 $ cat /proc/sys/vm/max_map_count # これを半分にした数を超えてスレッドを作ることはできない 65530
補足4: mod_wsgi について
mod-wsgiはPython VMをつくってる。1プロセス内でメモリ空間(そこにpython vmがあるかんじ)を区切ってる。 これはCからしか叩け無いAPIがあるらしい。 cpythonのプログラムが全部あるわけではなくて、sysモジュールとかそういうのがpython vm内にある。 他は共有してるらしい。
面白いので読んでみるといいとのこと。
補足5: signal
gunicornのソースとかpure pythonで読みやすいし、勉強になるよって話になった。 シグナルとかの扱いを知っておくといいとのこと。
import signal import time import sys def handler(signum, frmae): print('signal: {}'.format(signum)) sys.exit() # ここでexitしなければ動き続ける signal.signal(signal.SIGTERM, handler) # handlerを登録. sigtermが来た時にhandlerを実行 while True: time.sleep(1)
補足6: concurrent.futures
ソース(pure python)を読むと分かるけど、 ProcessPoolExecutorはただのpythonのcollections.queueを用いてじっそうしたQueueオブジェクトでためてる。 そのなかではthread safeにするために(同時にアクセスされても大丈夫)threading.Lock()を呼び出す。
ちなみにロックの実装方法は色々あって、fileをopenができたらlockがとれたみたいなことする実装もある。
補足7: 他への応用
ここではPIPEによるプロセス間通信でselectやpoll, epollを試したんですが、Socket通信とかをNon Blockingにするのもファイルディスクリプタに対して同じようにできる。
References
- 作者: W. Richard Stevens,Stephen A. Rago
- 出版社/メーカー: 翔泳社
- 発売日: 2014/06/05
- メディア: Kindle版
- この商品を含むブログ (7件) を見る