mypyc_ipython: mypyc用のIPythonマジックコマンド

最近mypyc(型ヒントのついたPythonのコードからC拡張を生成するコンパイラ)のコードを読んでいたのですが、Cythonの %%cython マジックコマンドみたいに手軽に確認したいなということでマジックコマンドを実装してみました。CythonやPythonとサクッと性能を比較したいときにぜひ使ってみてください。

github.com

ここでは実装時のメモと、mypyc・cython・(pure) pythonのマイクロベンチマークの結果を残しておきます。

実装について

IPythonマジックコマンドの実装自体は↓のページに書かれています。

Defining custom magics — IPython 7.18.1 documentation

ポイントとしては、 %loadext <module name> を呼び出したときに、そのモジュールに定義されている load_ipython_extension() 関数が呼び出されます。 そこに次のように関数を定義しておくと読み込まれます。

def load_ipython_extension(ip):
    """Load the extension in IPython via %load_ext mypyc_ipython."""
    from ._magic import MypycMagics

    ip.register_magics(MypycMagics)

MypycMagics の実装のポイントとして、次のような手順でコンパイルから読み込みが実行されます。

  1. %%mypyc コードセルの中身をファイルに書き出す。
    1. 書き出し先は、 $IPYTHONDIR/mypyc にしています。
    2. Cythonにも似たようなマジックコマンドがあるのですが、そちらは $IPYTHONDIR/cython に書き出していたのでそれを真似しました。
    3. コード保存時のファイル名は、 こんな感じ のhashを計算して使用していて、全く同じコードが再度定義された場合はコンパイルが省略されます。それだと困る場合には --force オプションを指定します。このあたりもCythonのマジックコマンドと完全に同じです。
  2. mypycが提供する mypyc.build.mypycify(paths: List[str]) -> Extension 関数を使ってsetuptoolsのExtensionオブジェクトを取得。C拡張のコードを生成・コンパイル
  3. 生成された .so モジュールを読み込む。
    1. CythonだとPython2対応もあるため、 imp.load_dynamic() が使われていますが、 mypyc_ipython はPython3以降のみをサポートするため、importlibを使っています。
    2. imp.load_dynamic() に相当する処理は こんな感じ で実装できます。
  4. module内のattributesをすべて読み込み
    1. https://github.com/c-bata/mypyc_ipython/blob/cefc1ce28559194ea4de6d2f686385d84b0a970e/mypyc_ipython/_magic.py#L38-L50
    2. これまでgunicornなどに送ったpatch などで似たような処理を実装したことはあったのですが、 __all__ をチェックするのを省いていたことに気づきました。たしかに見ない理由はあまりないので、今からgunicornの方にも修正patch送ってもいいかも。

あとは通常のPythonの関数と同じようにIPython上で実行できます。

マイクロベンチマーク

fibonacci数を計算するコードを2種類用意してみました。

再帰で実装

In [1]: %load_ext mypyc_ipython

In [2]: %%mypyc
   ...: def my_fibonacci(n: int) -> int:
   ...:     if n <= 2:
   ...:         return 1
   ...:     else:
   ...:         return my_fibonacci(n-1) + my_fibonacci(n-2)
   ...:

In [3]: my_fibonacci(10)
Out[3]: 55

In [4]: def py_fibonacci(n: int) -> int:
   ...:     if n <= 2:
   ...:         return 1
   ...:     else:
   ...:         return py_fibonacci(n-1) + py_fibonacci(n-2)
   ...:

In [5]: py_fibonacci(10)
Out[5]: 55

In [6]: %load_ext cython

In [7]: %%cython
   ...: cpdef int cy_fibonacci(int n):
   ...:     if n <= 2:
   ...:         return 1
   ...:     else:
   ...:         return cy_fibonacci(n-1) + cy_fibonacci(n-2)
   ...:

In [8]: cy_fibonacci(10)
Out[8]: 55

In [9]: %timeit py_fibonacci(10)
10.3 µs ± 30.2 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

In [10]: %timeit my_fibonacci(10)
848 ns ± 5.82 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)

In [11]: %timeit cy_fibonacci(10)
142 ns ± 1.18 ns per loop (mean ± std. dev. of 7 runs, 10000000 loops each)

In [12]:

今回のマイクロベンチマークではPureなPythonに比べ、mypycはおよそ1桁速くなっています。Cythonはさらに1/8の実行時間になりました。 (cythonのほうで int を使いましたが、Pythonのintとは意味が違うので、long long とかで比較するほうがよかったかもしれません。)

forループで実装

In [1]: %load_ext mypyc_ipython

In [2]: %load_ext cython

In [3]: %%mypyc
   ...: 
   ...: def mypyc_fib(n: int) -> float:
   ...:     i: int
   ...:     a: float = 0.0
   ...:     b: float = 1.0
   ...:     for i in range(n):
   ...:         a, b = a + b, a
   ...:     return a
   ...: 

In [4]: def py_fib(n: int) -> float:
   ...:     i: int
   ...:     a: float = 0.0
   ...:     b: float = 1.0
   ...:     for i in range(n):
   ...:         a, b = a + b, a
   ...:     return a
   ...: 

In [5]: %%cython
   ...: cpdef cython_fib(int n):
   ...:     cdef int i
   ...:     cdef double a = 0.0, b=1.0
   ...:     for i in range(n):
   ...:         a, b = a + b, a
   ...:     return a
   ...: 
   ...: 


In [6]: timeit py_fib(10)
627 ns ± 3.81 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)

In [7]: timeit mypyc_fib(10)
891 ns ± 26.5 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)

In [8]: timeit cython_fib(10)
44.5 ns ± 0.092 ns per loop (mean ± std. dev. of 7 runs, 10000000 loops each)

mypycのコードはpure pythonよりも悪化しました。 Cythonは速いですね。ループ以下の処理がすべてPython APIを使わないコードに落ちている雰囲気があります。 ちなみにfloat(cythonではdouble)に変えたのは特に意味はありません。

まとめ

mypycの高速化はまだまだこれからだと思いますが、型ヒントのついたコードがとりあえずmypyc挟むだけで速くなるなら嬉しいですね。 実はmypyc互換で大幅に効率的なコードを生成するツールを今実装しているので、それも出来上がってきたらまたここで紹介したいなと思います。

Kubeflow/KatibがGoptunaを使った最適化に対応しました。

KubeflowのKatibというハイパーパラメーター最適化等を担当するコンポーネントにGoptunaを使ったSuggestion serviceを実装しました。

github.com

Goptunaはハイパーパラメーター最適化ライブラリとして、機能面でも実装品質の面でもPythonで人気のライブラリに劣らないものになってきたかなと思う一方で、Goの機械学習ユーザーはまだまだ少なく、その中でも学習までGoでこなしている人はさらに絞られるので、せめてこういった方面で使われていくと嬉しいなと思います。ブラックボックス最適化フレームワークとしていろんな用途に利用できるソフトウェアなのでみなさんもぜひ触ってみてください。

github.com

Katibの基本的な使い方

Katibは基本的に特定の言語やフレームワークに依存しないように設計されています。パラメーターや評価値の受け渡しをどうやっているかというと、コマンドライン引数などからパラメーターを受け取り、目的関数の評価値を予め決めておいたフォーマットで標準出力に出したり、ログファイルとして保存したりします。例として f(x1, x2) = (x1-5)^2 + (x2+5)^2 を目的関数として用意しました (GitHub: https://github.com/c-bata/katib-goptuna-example)。

import argparse
import logging

logging.basicConfig(filename='/var/log/katib.log', level=logging.DEBUG)


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--x1", dest='x1', type=float)
    parser.add_argument("--x2", dest='x2', type=float)
    args = parser.parse_args()

    evaluation = (args.x1 - 5) ** 2 + (args.x2 + 5) ** 2

    logging.info(f'{{metricName: evaluation, metricValue: {evaluation:.4f}}};')


if __name__ == '__main__':
    main()
FROM python:3-alpine

ADD main.py /usr/src/main.py
WORKDIR /usr/src

この関数を最適化する際にExperiment CRを作成します。OptunaでいうところのStudyに対応するものです (昔はStudyと呼ばれていたのか、コード読んでるとStudyって書かれてるところもありますが、詳細はまだよく分かっていません)。

Experiment CRが作成されると、パラメーターを変えながらKubernetes Jobを何個も作成し先程のPythonファイルを実行します。各パラメータによる試行はTrialと呼ばれます。Experiment CRで指定するのはざっくり次のような項目です。

  • 目的関数の評価回数や並列数など
  • 目的関数に与えるパラメーターの探索範囲
  • 探索アルゴリズム (文字列で tpecmaes を指定)
    • アルゴリズムがどのSuggestion serviceに対応するかは、 katib-config.yaml で指定します。
    • デフォルトは https://github.com/kubeflow/katib/blob/master/manifests/v1alpha3/katib-controller/katib-controller.yaml です。
    • 質的変数を探索空間に含む場合には tpe を利用する方が多いかと思いますが、デフォルトでは hyperopt ベースのsuggestion serviceが利用されます。Goptunaにも tpe が実装されているので、対応するイメージをGoptuna suggestion serviceのイメージに書き換えればCMA-ESだけでなくTPEでもGoptunaを利用します。
  • metricscollectorがどのファイルからどういう形式で評価値を取り出すか
  • Kubernetes Jobのmanifest template。どういうふうにコマンドライン引数からパラメーターを与えるかはここで指定できます。

具体例は次のとおりです。

apiVersion: "kubeflow.org/v1alpha3"
kind: Experiment
metadata:
  namespace: kubeflow
  labels:
    controller-tools.k8s.io: "1.0"
  name: example
spec:
  objective:
    type: minimize
    goal: 0.001
    objectiveMetricName: evaluation
  algorithm:
    algorithmName: cmaes
  metricsCollectorSpec:
    source:
      filter:
        metricsFormat:
          - "{metricName: ([\\w|-]+), metricValue: ((-?\\d+)(\\.\\d+)?)}"
      fileSystemPath:
        path: "/var/log/katib.log"
        kind: File
    collector:
      kind: File
  parallelTrialCount: 2
  maxTrialCount: 250
  maxFailedTrialCount: 3
  parameters:
    - name: x1
      parameterType: double
      feasibleSpace:
        min: "-10"
        max: "10"
    - name: x2
      parameterType: double
      feasibleSpace:
        min: "-10"
        max: "10"
  trialTemplate:
    goTemplate:
        rawTemplate: |-
          apiVersion: batch/v1
          kind: Job
          metadata:
            name: {{.Trial}}
            namespace: {{.NameSpace}}
          spec:
            template:
              spec:
                containers:
                - name: {{.Trial}}
                  image: docker.io/cbata/hello-katib-quadratic-function
                  command:
                  - "python3"
                  - "main.py"
                  {{- with .HyperParameters}}
                  {{- range .}}
                  - "--{{.Name}} {{.Value}}"
                  {{- end}}
                  {{- end}}
                restartPolicy: Never

準備に必要なファイルはこれだけです。あとは Katib リポジトリにある、 scripts/v1alpha3/deploy.sh を実行してKatib controller等を立ち上げ、上記のExperiment CRDを適用すれば最適化が実行されます。

試行結果は kubectl port-forward svc/katib-ui -n kubeflow 8080:80 を実行してKatib UI https://localhost:8080/katib をブラウザで開いて確認できます。

f:id:nwpct1:20200411235014p:plain
Katib dashboard

Suggestion serviceの実装

Suggestion serviceは、過去のtrialsを受け取り次に探索すべきパラメーターを返すgRPCのサーバーです。 基本的にKatibを利用する上では知る必要はそんなにないので、適宜読み飛ばしてください。 Katibの動作の流れは次の画像にまとまっています。

katib workflow

katib/suggestion.md at master · kubeflow/katib · GitHub

Experimentの作成後、Experimentで指定されたalgorithmNameをもとに katib-config から対応するSuggestion serviceのImageを特定し、Katib controllerがSuggestion CRをapply、Suggestion CRがSuggestion serviceのdeploymentをapplyしPodが生成されます。

Suggestion serviceがreadyになったあと、Katib controllerは過去の試行結果を入力にして次に探索するパラメーターを取得する GetSuggestions() gRPC エンドポイントを何度も叩いてきます。Suggestion controllerはsuggestion serviceを1台しか立ち上げないため、各workerが勝手に解をサンプルしていくOptunaやGoptunaと比べアルゴリズムは実装しやすいなと思いました。Suggestion serviceを実装する際に問題になりやすいのは、Katib controllerがExperimentに紐づけて保持する過去の試行結果と、Goptunaの内部状態をどうやって同期するかです。

Katibの設計ではSuggestion serviceはただパラメーターをサンプルし、Katib controllerに伝えます。その後Katib controllerがtrialを生成し、一意なIDであるtrial nameを生成します。Suggestion serviceからみたときに、自分が生成したパラメーターにどのtrial nameが紐づくかを特定する方法は基本的にありません。TPEやRandom search、Gaussian Processなどこれまでサポートされてきた多くの最適化手法ではこれは問題になりませんが、CMA-ESは別です。

CMA-ESは、多変量正規分布を用意しその分布から解を生成、その評価値をもとに多変量正規分布のパラメーターをよりよい解をサンプルする分布へと更新します。分布が更新されるごとに世代番号がインクリメントされ、分布の更新に利用する解はかならず同じ世代番号の分布からサンプルされている必要があります。分布の世代番号はGoptunaのTrialのメタデータとして紐付けているため、Goptuna TrialとKatib Trialを適切に紐付ける必要があります。

現状ではGoptunaのStudyとは別に trialMapping map[str]int という変数を用意して、Katibのtrial name (str)からGoptunaのtrial ID (int)へのmappingを用意し、 trialMapping にまだ紐付けされていないtrialの中からパラメーターが完全に一致したものをtrialMappingへ追加しています。初期の実装ではKatib-controllerとのやりとりの際に生じる桁落ちを懸念して、パラメーターどうしのマンハッタン距離から最も似ているtrialを紐付けていましたが、Katibはprotobufの定義も(おそらくetcdかなにかに保存するときも)内部表現は全て文字列です。そのため桁落ちなどの心配がなく完全一致でチェックすることにしました。あらためて考えるとTrialのパラメーター表現を文字列で統一するのはかなりわかりやすくリーズナブルだなと思います。

あと実装してから気づいたのですが、よく見たらSuggestion serviceを実装するためのドキュメントがありました。普段あまりドキュメントに期待していないのですが、ParameterAssignmentsが requestNumber で指定された個数だけパラメーターを返さないといけないこととか気づくのに1時間ぐらい時間溶かしたりしてたので、最初に読んでおくことがおすすめです。

katib/new-algorithm-service.md at master · kubeflow/katib · GitHub

所感と今後の展望

設計的にも面白いソフトウェアだなと思いました。また今はOptunaのcommitterもやっていますが、前の部署にいたときにGoでgRPCのサーバー書いたり、OSSでkube-promptを公開していたりしていたので、スキルセット的にも結構マッチしている気がして開発が楽しかったです。

一方で、運用できるチームはかなり限られるかもしれないなというのが正直な感想です。少し動かしただけでもhyperoptのsuggestion serviceが完全に動かなくなってしまっているデグレがあって、原因探って修正するまでに自分も1晩中頑張ったりして苦労しました。Suggestion serviceがおかしいのか、metricscontrollerがおかしいのか、Katib controllerがおかしいのかを正しく状況に応じて切り分ける必要があり、Katibのコード読んでる人でもない限りなかなか難しいかもしれません。

個人的にはGoptunaをベースにすればKatibとの互換性を保ったままシンプルで運用のしやすいツールができる気もしているので、kubebuilder触りつつまた気が向いたときにでもやってみたいなと思います (NASの対応は、言語やフレームワークに依存せず汎用的に使える便利なツールを設計するのが現状では難しい気がしてるので諦める予定です)。

optuna-memorydump で InMemoryStorage の結果を定期的にSQLite3に書き出す。

CPUたくさん積んだマシン1台で、Optunaを回すときにstorage backendの選択は悩ましい点があります。 結果をDashboardやJupyterで詳しく確認したり、あとから最適化を再開するためにRDB storage backendを使って永続化しておきたいのですが、マシン1台しか使わないときにわざわざMySQLとかPostgreSQL立てるのは結構面倒なので、自然とSQLite3を選びたくなります。

ただSQLite3はマルチスレッドやマルチプロセスによる最適化時に、あまりおすすめできません。 そもそもLock errorを完全に防ぐことが難しいという話もありますが、 enqueue_trial など一部の機能は 行ロックのできないSQLite3 では並列最適化時に正しく動作しません。またRDB storage backendは性能面の課題もあり、RandomSamplerやCmaEsSamplerで実行時間の短い目的関数を数千回評価したいときにはほぼ確実にRDB storage backedがボトルネックになるかと思います。

そこで今回は高速に動作するIn-memory storage backendで最適化を回しつつ、評価回数が500回や1000回など指定したintervalに達したときにSQLite3に書き出すツールを用意してみました。冪等性を保証していて何回書き出しても差分だけを適用します。思いつきでサクッと作ってみたツールですが、思ったよりも便利そうなのでよければ使ってみてください。

optuna-memorydump の使い方

GitHubリポジトリはこちらです。まだPyPIには上げていないのでGitHubからインストールしてください。

github.com

$ pip install git+https://github.com/c-bata/optuna-memorydump.git

使い方はこんな感じでCallback関数としてOptunaに登録します。

import optuna
from optuna_memorydump import MemoryDumpCallback


def objective(trial):
    x1 = trial.suggest_uniform("x1", -100, 100)
    x2 = trial.suggest_uniform("x2", -100, 100)
    return 100 * (x2 - x1 ** 2) ** 2 + (x1 - 1) ** 2


if __name__ == "__main__":
    study = optuna.create_study(
        study_name="dumped", sampler=optuna.samplers.CmaEsSampler(),
    )
    study.optimize(
        objective, timeout=30, n_jobs=4, gc_after_trial=False,
        callbacks=[MemoryDumpCallback("sqlite:///db.sqlite3", interval=100)],
    )
    print("Best value: {} (params: {})\n".format(study.best_value, study.best_params))

intervalを100にすると、100回評価が終わるたびにcallbackが反応して指定したstorageに書き出します。intervalを指定できるようにしたのは、仮に1日中回し続けたいといったケースで12時間後にエラーがでて最初からやり直しとかを避けるためです。

$ python examples/rosenbrock.py
[I 2020-04-12 18:33:39,200] Finished trial#0 with value: ...
[I 2020-04-12 18:33:39,204] Finished trial#2 with value: ...
[I 2020-04-12 18:33:39,205] Finished trial#1 with value: ...
:
[I 2020-04-12 18:33:39,924] Finished trial#100 with value: ...
[I 2020-04-12 18:33:39,931] memorydump is triggered at trial 100 (thread=123145615126528).
[I 2020-04-12 23:40:20,013] memorydump of trial 100 is finished in 1.938s (thread=123145458638848).
:
[I 2020-04-12 18:33:39,924] Finished trial#200 with value: ...
[I 2020-04-12 23:40:21,505] memorydump of trial 200 is finished in 0.105s (thread=123145458638848).
[I 2020-04-12 23:40:24,061] memorydump is triggered at trial 300 (thread=123145425059840).
: