PIPEによるプロセス間通信とselect, poll, epollの話

エキスパートPythonプログラミング 改訂2版 (アスキードワンゴ)

エキスパートPythonプログラミング 改訂2版 (アスキードワンゴ)

先日、 tokibito 先生(id:nullpobug)と勉強していたpipe, select, poll, epollあたりについてメモ。

os.fork

os.fork : 子プロセス(child process)をつくれる。

import os

a = 0
print(a)

os.fork()

a += 1
print(a)

子は親の複製。親のデータ、ヒープ、スタックの各空間の複製を取得。 メモリのこれらの部分は共有されないので、実行結果は次のようになる(テキストセグメントは共有される)。

$ python3.5 fork.py
0
1
1

ちなみに親と子、どちらが先に実行され1を出力したのかは、カーネルが使うスケジューリングアルゴリズムに依存。もし親と子で同期をとりたいならプロセス間通信が必要となる。

プロセス記述子(pid)

ちなみに os.fork は、親側には子のPID、子側には0を返します。

  • 親や子が自身のPIDを知りたいときは、 os.getpid()
  • 子が親のPIDを知りたいときは、 os.getppid()

をよびだします。

import os

pid = os.fork()
if pid:  # Parent Process
    print('親:自身のPID', os.getpid())
    print('親:子のPID', pid)
else:  # Child Process
    print('子:自身のPID', os.getpid())
    print('子:親のPID', os.getppid())
$ python3.5 fork.py
親:自身のPID 37938
親:子のPID 37939
子:自身のPID 37939
子:親のPID 37938

詳解UNIXプログラミングを読んでいて、たしかにと思ったんですが、この関数は1度だけ呼び出すけど2度戻る。なんか不思議な感じ。

パイプによるプロセス間通信

パイプは os.pipe で作成。ファイル記述子のペア (r, w) を返し、それぞれ読み出し、書き込み用に使うことができます。これを使ってプロセス間通信を試してみる。

import os


def main():
    read_fd, write_fd = os.pipe()
    pid = os.fork()
    if pid:  # Parent process
        os.close(read_fd)
        write_pipe = os.fdopen(write_fd, 'w')
        write_pipe.write('hello')
        write_pipe.close()
    else:  # Child process
        os.close(write_fd)
        read_pipe = os.fdopen(read_fd, 'r')
        content = read_pipe.read()
        read_pipe.close()
        print(content)


if __name__ == '__main__':
    main()
$ python3.5 process-communication.py
hello

bytesを読み書きしたい場合は、

def bytes_main():
    read_pipe, write_pipe = os.fdopen(read_fd, 'rb'), os.fdopen(write_fd, 'wb')

    pid = os.fork()
    if pid:  # Parent Process
        read_pipe.close()
        write_pipe.write(b'hello')
        write_pipe.close()
    else:  # Child Process
        write_pipe.close()
        content = read_pipe.read(5)
        print(content)
        read_pipe.close()

NON BLOCKINGモード

ノンブロッキングで読み書きしてみる さっきまでは read_pipe.read() を呼び出すと、値がくるまでブロックしてしまいます。 値が来てない間は、ずっと待ったりせずに他の処理をしたいことがあるかもしれません。

値はあれば返すし、なければ何も無かったよってことを返す。 そうすると、じゃあその間他の処理をしようって判断ができる。

FCNTLというものがあって、File Descriptorにnon blockingモード(os.NON_BLOCKING)を指定出来たりする。

import os
import time
import fcntl


def main():
    read_fd, write_fd = os.pipe()

    pid = os.fork()
    if pid:
        print('Parent:', os.getpid())
        write_pipe = os.fdopen(write_fd, 'wb')
        for w in b'Hello':
            time.sleep(1)
            write_pipe.write(bytes([w]))
            write_pipe.flush()
        write_pipe.close()
    else:
        print('Child:', os.getpid())
        fcntl.fcntl(read_fd, fcntl.F_SETFL, os.O_NONBLOCK)
        read_pipe = os.fdopen(read_fd, 'rb')

        bytes_read = 0
        content = b''
        while True:
            read_data = read_pipe.read()
            if read_data:
                bytes_read += len(read_data)
                content +=read_data
                print(bytes(read_data))
            if bytes_read >= 5:
                break
        read_pipe.close()
        print(content)

main()

注意点としては、書き込み側も出来るだけ連続したところに書きたいために、Bufferにためています。 writeした時点では、バッファに乗ってるだけなので、ちゃんとフラッシュしてあげないといけません。 (さっきまではcloseする時に、バッファのコピーがフラッシュされていました)

f:id:nwpct1:20161008160111g:plain

しかし今のプログラムには1つ致命的な問題があります。 これを実行中に、別タブ開いてtop コマンドを叩いてみてください。 CPUの使用率が100%となっています。

select, poll, epoll

selectを触ってみる

import os
import time
import select


def main():
    read_fd, write_fd = os.pipe()

    pid = os.fork()
    if pid:  # Parent Process
        print('parent', os.getpid())
        write_pipe = os.fdopen(write_fd, 'wb')
        for w in b'Hello':
            time.sleep(1)
            write_pipe.write(bytes([w]))
            write_pipe.flush()
        write_pipe.close()

    else:  # Child Process
        print('child', os.getpid())
        read_pipe = os.fdopen(read_fd, 'rb')
        bytes_read = 0
        content = b''
        exit_flag = False
        rready, _, _ = select.select([read_fd], [], [])

        while True:
            for fd in rready:
                read_data = read_pipe.read(1)
                print(read_data)

                bytes_read += len(read_data)
                content += read_data
                if bytes_read >= 5:
                    exit_flag = True
                    break
            if exit_flag:
                break
        read_pipe.close()
        print(content)

main()

実行してみると、

f:id:nwpct1:20161008163123g:plain

poll, epoll

pollとepollは同じようなインタフェースです。 tokibito先生がepollのサンプルをgistで公開しているので、ここでは省略

forkとpipeとepollのサンプルコード

違いとか

どれを使うかは結構議論されてるとこなのかな。とりあえずtokibito先生と話してたこと:

  • pollとselectはそもそもインタフェースが違う。
    • selectの方が簡単そう?
  • pollとepollは、Pythonレベルでは同じインタフェース
  • time.sleep はosのapi呼んでいる。これでもCPU使用率は下げられる
    • poll とか使うとイベントが来た時に動ける
    • time.sleep だと読む必要がない時に読みに行くし、読まないと行けない時にすぐ読まない
  • daemon 系のスクリプト書くときとかもpollとかselect使ったりする

環境によってこれ使えないよとかがある。詳しくはドキュメンテーションに書いてますが、てっとり速く確認するならselectモジュールの中覗けばいい。

Mac OSX

>>> dir(select)
['error', 'kevent', 'kqueue', 'poll', 'select']

CentOS

>>> dir(select)
['epoll', 'error', 'poll', 'select']

補足

全く本題とは違うんですが、tokibito先生と話していて知ったこととかを、自分用のメモとしてここに書いておきます。

補足1: ファイル共有

forkは、親のオープンしているすべてのファイル記述子を二重化(各記述子について関数dup を読んだのと同じ)します。 親と子はオープンしている各記述子のファイルテーブルエントリを共有。 親と子が同じ記述子へ書き出す場合、親が子を待つ(wait)などの同期をとらないと出力が混ざる。

>>> import os
>>> os.fork()
36897
>>> 0
>>> ^C

KeyboardInterrupt
>>> KeyboardInterrupt
>>> >>>

補足2: スレッドについて

当然ですが、スレッドなら名前空間共有されてる。

import threading

a = 0
print(a)

def main():
    global a
    a += 1
    print(a)

t1 = threading.Thread(target=main)
t2 = threading.Thread(target=main)

t1.start()
t2.start()
$ python3.5 thread_test.py
0
1
2

補足3: スレッドについて2

OSスレッドとは違って、Erlangのような言語は軽量スレッド・マイクロスレッドと呼ばれるものが実装されていて、OSが提供するスレッド機能の限界を超えた数の並列処理が出来たり、OSのスケジューリングの影響を受けなかったりするらしい。呼ばれるものがあるらしい。

自分の持っているサーバでは、

$ cat /proc/sys/vm/max_map_count
65530

PythonのthreadingモジュールとかはOSが提供するスレッドを生成する。

import threading
import time
def main():
    while True:
        time.sleep(1)

t = threading.Thread(target=main)
t.start()

この時にpsでthreadまで表示。

$ ps aux -L | grep python
mainte   29700 29700  0.0    2  0.7 206504  7528 pts/3    Sl+  00:42   0:00 /opt/python-3.5.0/bin/python3
mainte   29700 29762  0.0    2  0.7 206504  7528 pts/3    Sl+  00:46   0:00 /opt/python-3.5.0/bin/python3

2列目がPID、3列目がPPID。同じプロセスでスレッドが2つ動いているのが確認できる。

Linux におけるスレッド数の上限 | yunabe.jp によると、システム全体でのスレッド数の上限は、下記コマンドで確認できるらしい。 Golangのgoroutineとかもマイクロスレッドなんだけど、 Go言語の低レイテンシGC実現のための取り組み | POSTD とか読むと1プロセスで150万のgoroutineを生成してると聞いて驚いた。

$ ulimit -u  # 現在のユーザが起動できるプロセス数の上限( /etc/security/limits.conf で調整可能)
3893
$ cat /proc/sys/kernel/threads-max  # システム全体でのスレッド数( /etc/sysctl.conf から調整可能)
7787
$ cat /proc/sys/vm/max_map_count  # これを半分にした数を超えてスレッドを作ることはできない
65530

補足4: mod_wsgi について

mod-wsgiPython VMをつくってる。1プロセス内でメモリ空間(そこにpython vmがあるかんじ)を区切ってる。 これはCからしか叩け無いAPIがあるらしい。 cpythonのプログラムが全部あるわけではなくて、sysモジュールとかそういうのがpython vm内にある。 他は共有してるらしい。

面白いので読んでみるといいとのこと。

補足5: signal

gunicornのソースとかpure pythonで読みやすいし、勉強になるよって話になった。 シグナルとかの扱いを知っておくといいとのこと。

import signal
import time
import sys

def handler(signum, frmae):
    print('signal: {}'.format(signum))
    sys.exit()  # ここでexitしなければ動き続ける

signal.signal(signal.SIGTERM, handler)  # handlerを登録. sigtermが来た時にhandlerを実行

while True:
    time.sleep(1)

補足6: concurrent.futures

ソース(pure python)を読むと分かるけど、 ProcessPoolExecutorはただのpythonのcollections.queueを用いてじっそうしたQueueオブジェクトでためてる。 そのなかではthread safeにするために(同時にアクセスされても大丈夫)threading.Lock()を呼び出す。

ちなみにロックの実装方法は色々あって、fileをopenができたらlockがとれたみたいなことする実装もある。

補足7: 他への応用

ここではPIPEによるプロセス間通信でselectやpoll, epollを試したんですが、Socket通信とかをNon Blockingにするのもファイルディスクリプタに対して同じようにできる。

References

詳解UNIXプログラミング 第3版

詳解UNIXプログラミング 第3版

AWS LambdaのFunctionを開発するときのMakefile

自分用のメモ

lambda-uploaderみたいなものもありますが、LambdaだけならAWS CLIで十分簡単に操作出来るのでMakefileを用意してみました。

f:id:nwpct1:20161006204151g:plain

API Gatewayに関しては、AWS CLIでやろうとすると1つのリソースあたり3個ぐらいコマンドを叩く必要があったり、AWS CLIで操作できるものではない印象でした。 まだ管理画面触ってる方が楽だと思います。 自動化したいならAWS CLIではなく、CloudFormationやServerlessフレームワークみたいなもの(例: serverless(node), Zappa(python2))を使ったほうが良さそうです(こちらについては、今作ってるフレームワークがあるのでまた後日記事書きます)。

Help text

$ make
Description:
    Quickly deployment tool for AWS Lambda.

Requirements:
    - AWS CLI
    - jq

Commands:
    deploy               Deploy to AWS Lambda
    functions            Show the list of AWS Lambda functions
    help                 Show help text
    invoke               Run lambda function and show the result and the log.
    ls-bucket            Show the files in S3 Bucket
    roles                Show the list of IAM Roles
    undeploy             Remove deploy package from s3 and Delete lambda function
    update               Update Lambda function

Deploy

  1. deploy packageの作成
  2. deploy packageをs3にupload
  3. Lambda関数の登録

Invoke

  1. input.jsonの中身をeventとして、Lambdaを呼び出す
  2. base64 encodeでログが送られてくるので、jqでパースして、base64 decode。
  3. 実行結果とログを保存して、表示

Makefile

Makefileは、ちゃんと書いたことがなかったので変な書き方してるかも。 何か変なところあれば教えてください。

.PHONY: help roles functions ls-bucket upload invoke undeploy deploy update

LAMBDA_NAME    := image-upload
LAMBDA_HANDLER := lambda_image_upload.lambda_handler
DESCRIPTION    := "Upload image posted by multipart/form-data."
TIME_OUT       := 16

INPUT_PYFILE   := lambda_image_upload.py
ZIP_FILE_NAME  := lambda_image_upload.zip
INPUT_JSON     := input.json
OUTPUT_FILE    := result.json
LOG_FILE       := invoke.log

S3_BUCKET      := lambda-packages
S3_KEY         := lambda_image_upload.zip
IAM_ROLE_ARN   := xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
REGION         := ap-northeast-1

.DEFAULT_GOAL := help
help: ## Show help text
    @echo "Description:"
    @echo "    Quickly deployment tool for AWS Lambda."
    @echo ""
    @echo "Requirements:"
    @echo "    - AWS CLI"
    @echo "    - jq"
    @echo ""
    @echo "Commands:"
    @grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "    \033[36m%-20s\033[0m %s\n", $$1, $$2}'

roles: ## Show the list of IAM Roles
    aws iam list-roles | jq '.Roles | .[] | .RoleName, .Arn'

functions: ## Show the list of AWS Lambda functions
    aws lambda list-functions | jq '.Functions | .[] | .FunctionName'

ls-bucket: ## Show the files in S3 Bucket
    aws s3 ls s3://$(S3_BUCKET)

$(ZIP_FILE_NAME): $(INPUT_PYFILE) ## Create deploy package
    zip -r $(ZIP_FILE_NAME) $(INPUT_PYFILE)

upload: # Upload deploy package to S3
    make $(ZIP_FILE_NAME)
    aws s3 cp $(ZIP_FILE_NAME) s3://$(S3_BUCKET)/

# See http://docs.aws.amazon.com/lambda/latest/dg/with-userapp-walkthrough-custom-events-invoke.html
invoke: ## Run lambda function and show the result and the log.
    @aws lambda invoke \
        --payload file://$(INPUT_JSON) \
        --function-name $(LAMBDA_NAME) \
        --log-type Tail \
        $(OUTPUT_FILE) \
        | jq -r '.LogResult' \
        | base64 --decode \
        > $(LOG_FILE)

    @echo "-- Result\n"
    @cat $(OUTPUT_FILE)
    @echo "\n\n-- Log\n"
    @cat $(LOG_FILE)
    @echo ""

undeploy: ## Remove deploy package from s3 and Delete lambda function
    aws s3 rm s3://$(S3_BUCKET)/$(S3_KEY)
    aws lambda delete-function --function-name $(LAMBDA_NAME)

# See https://docs.aws.amazon.com/ja_jp/lambda/latest/dg/with-on-demand-https-example-upload-deployment-pkg.html
deploy: ## Deploy to AWS Lambda
    make upload
    aws lambda create-function \
        --function-name $(LAMBDA_NAME) \
        --region $(REGION) \
        --code S3Bucket=$(S3_BUCKET),S3Key=$(S3_KEY) \
        --handler $(LAMBDA_HANDLER) \
        --timeout $(TIME_OUT) \
        --description $(DESCRIPTION) \
        --runtime python2.7 \
        --role $(IAM_ROLE_ARN)

update: ## Update Lambda function
    make upload
    aws lambda update-function-code \
        --function-name $(LAMBDA_NAME) \
        --s3-bucket $(S3_BUCKET) \
        --s3-key $(S3_KEY)

参考資料

AWS Lambda実践ガイド

AWS Lambda実践ガイド

AWS LambdaとAPI Gatewayを触ってみたのでメモ

PyCon JP スプリントの時に id:iktakahiro さんがLambdaの話をしていたのですが、Lambda + API Gatewayは覚えとくと便利そうなので試してみた。

Lambdaを触ってみる

まずAPI Gatewayは考えずに、Lambdaを触ってみる。 調べてみたら、公式のドキュメントにも今やりたいことに似たものがあった。

https://docs.aws.amazon.com/ja_jp/lambda/latest/dg/with-s3-example.html

事前に、下のことをしておいた。

  • Lambda実行用のIAMロールを作成
    • RoleType: AWS Lambda
    • Policy: AWSLambdaFullAccess, AmazonS3FullAccess, AWSLambdaBasicExecutionRole
  • S3のバケットを2つ用意。
    • bucketとlambdaは同じリージョンに作る
    • hoge-image-source: 直接アップロードされた画像
    • hoge-image-resized: Lambdaによってサイズを変更した画像
  • image-resizedtest.jpg という画像データを保存
    • Lambdaの動きを確認する時に、 test.jpg がcreatedされたというサンプルイベントデータを渡す。

デプロイパッケージを作成

Pillowみたいなサードパーティのライブラリを使うときは、それらを一緒にzipに固めたデプロイパッケージを作る。 デプロイパッケージに含めるものは、 id:iktakahiro さんいわく ./vendors に入れておくのがいいらしいのでとりあえず真似してみる。

$ virtualenv -p python2.7 venv  # Python3使えないので注意
$ source venv/bin/activate
$ pip install pillow -t ./vendors
$ ls vendors/
PIL                    Pillow-3.3.1.dist-info

ただ、この方法はPillowとか使う場合うまくいかなかった。

Unable to import module 'lambda_function': /var/task/PIL/_imaging.so: invalid ELF header

ではまった。どうやらMacでビルドするのが行けないらしい。 Amazon LinuxインスタンスをEC2で立ち上げてビルドした。 Dockerでうまくビルド出来るようにすると楽になれそう。

こういうビルド済みのファイルを提供しているリポジトリもあるらしい。

コードを書く

thumbnail-generator.py

from __future__ import print_function
import boto3
import os
import sys
import uuid
from PIL import Image

s3_client = boto3.client('s3')
RESIZED_BUCKET_NAME = 'hoge-image-resized'
RESIZED_IMAGE_SIZE = (200, 100)

def resize_image(image_path, resized_path):
    with Image.open(image_path) as image:
        image.thumbnail(RESIZED_IMAGE_SIZE)
        image.save(resized_path)

def lambda_handler(event, context):
    for record in event['Records']:
        bucket = record['s3']['bucket']['name']
        key = record['s3']['object']['key']
        download_path = '/tmp/{}{}'.format(uuid.uuid4(), key)
        upload_path = '/tmp/resized-{}'.format(key)

        s3_client.download_file(bucket, key, download_path)
        resize_image(download_path, upload_path)
        s3_client.upload_file(upload_path, RESIZED_BUCKET_NAME, key)

アップロード

10MB以上なら、S3を経由してUploadする必要がある。 10MB以下なら、Lambdaの設定画面からUpload出来る。

ログ

こんな感じでグラフ化されている。 それぞれの詳細のログも見れるので、確認したい項目はprintしておけばいいみたい。

f:id:nwpct1:20161002232026p:plain

動作確認

AWS CLIを使う方法とかもあるけど、ブラウザ上のTestボタンを押す方法が一番手軽な感じ。 CIの導入とかuploadからtestまで一発でやりたくなったら、AWS CLIを使う方法を試して追記。

Testボタン

事前に test.jpg をアップロード済みなので、ブラウザからS3のPutイベントを発生させてみる。 管理画面でTestって名前がついた青いボタンがあるのでそれを押す。

awsRegionとbucketのarn, name, objectのkeyを変えて実行。

  • awsRegion: ap-northeast-1
  • s3 > object > key: test.jpg
  • s3 > bucket > arn: arn:aws:s3:::hoge-image-source
  • s3 > bucket > name: hoge-image-source

ログを見ながらデバッグ。 うまくいくと、 hoge-image-resized の方にサムネイルが生成されている。

Github

成果物

github.com

API Gateway

API Gatewayで画像のアップロード用APIを作りたい。 POSTで画像を受け取って、S3へアップロード、URLを返す。

リクエストの形式

まずリクエストのヘッダやボディがAPI Gateway経由でどのように渡されるのか確認する。 あとで気づいたのだけれど、API Gateway AWS ProxyのSample event templateを見ればわかった。

{
  "body": "{\"test\":\"body\"}",
  "resource": "/{proxy+}",
  "requestContext": {
    "resourceId": "123456",
    "apiId": "1234567890",
    "resourcePath": "/{proxy+}",
    "httpMethod": "POST",
    "requestId": "c6af9ac6-7b61-11e6-9a41-93e8deadbeef",
    "accountId": "123456789012",
    "identity": {
      "apiKey": null,
      : (中略)
    },
    "stage": "prod"
  },
  "queryStringParameters": {
    "foo": "bar"
  },
  "headers": {
    "Accept-Encoding": "gzip, deflate, sdch",
    : (中略)
  },
  "pathParameters": {
    "proxy": "path/to/resource"
  },
  "httpMethod": "POST",
  "stageVariables": {
    "baz": "qux"
  },
  "path": "/path/to/resource"
}

うん、とても分かりやすい。 楽をさせるために、request bodyはunicodeで渡されるらしい。

2016/10/04 削除

ここに色々書いてたのですが、文字列についてだいぶ大きな勘違いをしてました。 unicode形式だと画像とかファイルは受け取れないと思ってたんですが、普通にencodeすればいいだけなんですね。

レスポンスの形式

知らずにはまったんだけど、レスポンスは次の形式で返す必要がある。 よくよく考えたら、Hello World みたいに文字列返すだけだとステータスコードとヘッダ渡す場所がないので当たり前だった。

from __future__ import print_function


def lambda_handler(event, context):
    return {
        "statusCode": 200,
        "headers": {"headerName": "headerValue", "Foo": "bar"},
        "body": "This is body"
    }

所感・メモ

Lambda

  • Pythonは2系のみ
  • 300秒まで使える
    • バッチ処理とかだと厳しい場合が結構ありそう
    • EC2立ち上げる人もいるらしい
  • Lambda Functionはバージョニング可能
  • VPC対応あり
  • CloudWatch EventsのScheduleで定期実行できる
  • 勝手にスケールしてくれる

API Gateway

リクエストの情報をかなりラップして渡してくれていて、手軽。

  • リクエストのpayloadサイズに10MBの上限がある。画像とかファイルアップロードするときは厳しい
  • 結構分かりやすい形式でLambdaにリクエストの情報を送ってくれる
  • レスポンスの形式も明確

References

Amazon Web Services クラウドネイティブ・アプリケーション開発技法 一番大切な知識と技術が身につく (Informatics&IDEA)

Amazon Web Services クラウドネイティブ・アプリケーション開発技法 一番大切な知識と技術が身につく (Informatics&IDEA)

  • 作者: NRIネットコム株式会社,佐々木拓郎,佐藤瞬,石川修,高柳怜士,佐藤雄也,岸本勇貴
  • 出版社/メーカー: SBクリエイティブ
  • 発売日: 2016/04/20
  • メディア: 単行本
  • この商品を含むブログ (1件) を見る