mypyc_ipython: mypycを気軽に動かせるIPythonマジックコマンドの紹介と実装解説

最近mypyc(型ヒントのついたPythonのコードからC拡張を生成するコンパイラ)のコードを読んでいたのですが、Cythonの %%cython マジックコマンドみたいに手軽に確認したいなということでマジックコマンドを実装してみました。CythonやPythonとサクッと性能を比較したいときにぜひ使ってみてください。

github.com

ここでは実装時のメモと、mypyc・cython・(pure) pythonのマイクロベンチマークの結果を残しておきます。

実装について

IPythonマジックコマンドの実装自体は↓のページに書かれています。

Defining custom magics — IPython 7.25.0 documentation

ポイントとしては、 %loadext <module name> を呼び出したときに、そのモジュールに定義されている load_ipython_extension() 関数が呼び出されます。 そこに次のように関数を定義しておくと読み込まれます。

def load_ipython_extension(ip):
    """Load the extension in IPython via %load_ext mypyc_ipython."""
    from ._magic import MypycMagics

    ip.register_magics(MypycMagics)

MypycMagics の実装のポイントとして、次のような手順でコンパイルから読み込みが実行されます。

  1. %%mypyc コードセルの中身をファイルに書き出す。
    1. 書き出し先は、 $IPYTHONDIR/mypyc にしています。
    2. Cythonにも似たようなマジックコマンドがあるのですが、そちらは $IPYTHONDIR/cython に書き出していたのでそれを真似しました。
    3. コード保存時のファイル名は、 こんな感じ のhashを計算して使用していて、全く同じコードが再度定義された場合はコンパイルが省略されます。それだと困る場合には --force オプションを指定します。このあたりもCythonのマジックコマンドと完全に同じです。
  2. mypycが提供する mypyc.build.mypycify(paths: List[str]) -> Extension 関数を使ってsetuptoolsのExtensionオブジェクトを取得。C拡張のコードを生成・コンパイル
  3. 生成された .so モジュールを読み込む。
    1. CythonだとPython2対応もあるため、 imp.load_dynamic() が使われていますが、 mypyc_ipython はPython3以降のみをサポートするため、importlibを使っています。
    2. imp.load_dynamic() に相当する処理は こんな感じ で実装できます。
  4. module内のattributesをすべて読み込み
    1. https://github.com/c-bata/mypyc_ipython/blob/cefc1ce28559194ea4de6d2f686385d84b0a970e/mypyc_ipython/_magic.py#L38-L50
    2. これまでgunicornなどに送ったpatch などで似たような処理を実装したことはあったのですが、 __all__ をチェックするのを省いていたことに気づきました。たしかに見ない理由はあまりないので、今からgunicornの方にも修正patch送ってもいいかも。

あとは通常のPythonの関数と同じようにIPython上で実行できます。

マイクロベンチマーク

fibonacci数を計算するコードを2種類用意してみました。

再帰で実装

In [1]: %load_ext mypyc_ipython

In [2]: %%mypyc
   ...: def my_fibonacci(n: int) -> int:
   ...:     if n <= 2:
   ...:         return 1
   ...:     else:
   ...:         return my_fibonacci(n-1) + my_fibonacci(n-2)
   ...:

In [3]: my_fibonacci(10)
Out[3]: 55

In [4]: def py_fibonacci(n: int) -> int:
   ...:     if n <= 2:
   ...:         return 1
   ...:     else:
   ...:         return py_fibonacci(n-1) + py_fibonacci(n-2)
   ...:

In [5]: py_fibonacci(10)
Out[5]: 55

In [6]: %load_ext cython

In [7]: %%cython
   ...: cpdef int cy_fibonacci(int n):
   ...:     if n <= 2:
   ...:         return 1
   ...:     else:
   ...:         return cy_fibonacci(n-1) + cy_fibonacci(n-2)
   ...:

In [8]: cy_fibonacci(10)
Out[8]: 55

In [9]: %timeit py_fibonacci(10)
10.3 µs ± 30.2 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

In [10]: %timeit my_fibonacci(10)
848 ns ± 5.82 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)

In [11]: %timeit cy_fibonacci(10)
142 ns ± 1.18 ns per loop (mean ± std. dev. of 7 runs, 10000000 loops each)

In [12]:

今回のマイクロベンチマークではPureなPythonに比べ、mypycはおよそ1桁速くなっています。Cythonはさらに1/8の実行時間になりました。 (cythonのほうで int を使いましたが、Pythonのintとは意味が違うので、long long とかで比較するほうがよかったかもしれません。)

forループで実装

In [1]: %load_ext mypyc_ipython

In [2]: %load_ext cython

In [3]: %%mypyc
   ...: 
   ...: def mypyc_fib(n: int) -> float:
   ...:     i: int
   ...:     a: float = 0.0
   ...:     b: float = 1.0
   ...:     for i in range(n):
   ...:         a, b = a + b, a
   ...:     return a
   ...: 

In [4]: def py_fib(n: int) -> float:
   ...:     i: int
   ...:     a: float = 0.0
   ...:     b: float = 1.0
   ...:     for i in range(n):
   ...:         a, b = a + b, a
   ...:     return a
   ...: 

In [5]: %%cython
   ...: cpdef cython_fib(int n):
   ...:     cdef int i
   ...:     cdef double a = 0.0, b=1.0
   ...:     for i in range(n):
   ...:         a, b = a + b, a
   ...:     return a
   ...: 
   ...: 


In [6]: timeit py_fib(10)
627 ns ± 3.81 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)

In [7]: timeit mypyc_fib(10)
891 ns ± 26.5 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)

In [8]: timeit cython_fib(10)
44.5 ns ± 0.092 ns per loop (mean ± std. dev. of 7 runs, 10000000 loops each)

mypycのコードはpure pythonよりも悪化しました。 Cythonは速いですね。ループ以下の処理がすべてPython APIを使わないコードに落ちている雰囲気があります。 ちなみにfloat(cythonではdouble)に変えたのは特に意味はありません。

まとめ

mypycの高速化はまだまだこれからだと思いますが、型ヒントのついたコードがとりあえずmypyc挟むだけで速くなるなら嬉しいですね。 実はmypyc互換で大幅に効率的なコードを生成するツールを今実装しているので、それも出来上がってきたらまたここで紹介したいなと思います。

Kubeflow/KatibがGoptunaを使った最適化に対応しました。

KubeflowのKatibというハイパーパラメーター最適化等を担当するコンポーネントにGoptunaを使ったSuggestion serviceを実装しました。

github.com

Goptunaはハイパーパラメーター最適化ライブラリとして、機能面でも実装品質の面でもPythonで人気のライブラリに劣らないものになってきたかなと思う一方で、Goの機械学習ユーザーはまだまだ少なく、その中でも学習までGoでこなしている人はさらに絞られるので、せめてこういった方面で使われていくと嬉しいなと思います。ブラックボックス最適化フレームワークとしていろんな用途に利用できるソフトウェアなのでみなさんもぜひ触ってみてください。

github.com

Katibの基本的な使い方

Katibは基本的に特定の言語やフレームワークに依存しないように設計されています。パラメーターや評価値の受け渡しをどうやっているかというと、コマンドライン引数などからパラメーターを受け取り、目的関数の評価値を予め決めておいたフォーマットで標準出力に出したり、ログファイルとして保存したりします。例として f(x1, x2) = (x1-5)^2 + (x2+5)^2 を目的関数として用意しました (GitHub: https://github.com/c-bata/katib-goptuna-example)。

import argparse
import logging

logging.basicConfig(filename='/var/log/katib.log', level=logging.DEBUG)


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--x1", dest='x1', type=float)
    parser.add_argument("--x2", dest='x2', type=float)
    args = parser.parse_args()

    evaluation = (args.x1 - 5) ** 2 + (args.x2 + 5) ** 2

    logging.info(f'{{metricName: evaluation, metricValue: {evaluation:.4f}}};')


if __name__ == '__main__':
    main()
FROM python:3-alpine

ADD main.py /usr/src/main.py
WORKDIR /usr/src

この関数を最適化する際にExperiment CRを作成します。OptunaでいうところのStudyに対応するものです (昔はStudyと呼ばれていたのか、コード読んでるとStudyって書かれてるところもありますが、詳細はまだよく分かっていません)。

Experiment CRが作成されると、パラメーターを変えながらKubernetes Jobを何個も作成し先程のPythonファイルを実行します。各パラメータによる試行はTrialと呼ばれます。Experiment CRで指定するのはざっくり次のような項目です。

  • 目的関数の評価回数や並列数など
  • 目的関数に与えるパラメーターの探索範囲
  • 探索アルゴリズム (文字列で tpecmaes を指定)
    • アルゴリズムがどのSuggestion serviceに対応するかは、 katib-config.yaml で指定します。
    • デフォルトは https://github.com/kubeflow/katib/blob/master/manifests/v1alpha3/katib-controller/katib-controller.yaml です。
    • 質的変数を探索空間に含む場合には tpe を利用する方が多いかと思いますが、デフォルトでは hyperopt ベースのsuggestion serviceが利用されます。Goptunaにも tpe が実装されているので、対応するイメージをGoptuna suggestion serviceのイメージに書き換えればCMA-ESだけでなくTPEでもGoptunaを利用します。
  • metricscollectorがどのファイルからどういう形式で評価値を取り出すか
  • Kubernetes Jobのmanifest template。どういうふうにコマンドライン引数からパラメーターを与えるかはここで指定できます。

具体例は次のとおりです。

apiVersion: "kubeflow.org/v1alpha3"
kind: Experiment
metadata:
  namespace: kubeflow
  labels:
    controller-tools.k8s.io: "1.0"
  name: example
spec:
  objective:
    type: minimize
    goal: 0.001
    objectiveMetricName: evaluation
  algorithm:
    algorithmName: cmaes
  metricsCollectorSpec:
    source:
      filter:
        metricsFormat:
          - "{metricName: ([\\w|-]+), metricValue: ((-?\\d+)(\\.\\d+)?)}"
      fileSystemPath:
        path: "/var/log/katib.log"
        kind: File
    collector:
      kind: File
  parallelTrialCount: 2
  maxTrialCount: 250
  maxFailedTrialCount: 3
  parameters:
    - name: x1
      parameterType: double
      feasibleSpace:
        min: "-10"
        max: "10"
    - name: x2
      parameterType: double
      feasibleSpace:
        min: "-10"
        max: "10"
  trialTemplate:
    goTemplate:
        rawTemplate: |-
          apiVersion: batch/v1
          kind: Job
          metadata:
            name: {{.Trial}}
            namespace: {{.NameSpace}}
          spec:
            template:
              spec:
                containers:
                - name: {{.Trial}}
                  image: docker.io/cbata/hello-katib-quadratic-function
                  command:
                  - "python3"
                  - "main.py"
                  {{- with .HyperParameters}}
                  {{- range .}}
                  - "--{{.Name}} {{.Value}}"
                  {{- end}}
                  {{- end}}
                restartPolicy: Never

準備に必要なファイルはこれだけです。あとは Katib リポジトリにある、 scripts/v1alpha3/deploy.sh を実行してKatib controller等を立ち上げ、上記のExperiment CRDを適用すれば最適化が実行されます。

試行結果は kubectl port-forward svc/katib-ui -n kubeflow 8080:80 を実行してKatib UI https://localhost:8080/katib をブラウザで開いて確認できます。

f:id:nwpct1:20200411235014p:plain
Katib dashboard

Suggestion serviceの実装

Suggestion serviceは、過去のtrialsを受け取り次に探索すべきパラメーターを返すgRPCのサーバーです。 基本的にKatibを利用する上では知る必要はそんなにないので、適宜読み飛ばしてください。 Katibの動作の流れは次の画像にまとまっています。

katib workflow

katib/suggestion.md at master · kubeflow/katib · GitHub

Experimentの作成後、Experimentで指定されたalgorithmNameをもとに katib-config から対応するSuggestion serviceのImageを特定し、Katib controllerがSuggestion CRをapply、Suggestion CRがSuggestion serviceのdeploymentをapplyしPodが生成されます。

Suggestion serviceがreadyになったあと、Katib controllerは過去の試行結果を入力にして次に探索するパラメーターを取得する GetSuggestions() gRPC エンドポイントを何度も叩いてきます。Suggestion controllerはsuggestion serviceを1台しか立ち上げないため、各workerが勝手に解をサンプルしていくOptunaやGoptunaと比べアルゴリズムは実装しやすいなと思いました。Suggestion serviceを実装する際に問題になりやすいのは、Katib controllerがExperimentに紐づけて保持する過去の試行結果と、Goptunaの内部状態をどうやって同期するかです。

Katibの設計ではSuggestion serviceはただパラメーターをサンプルし、Katib controllerに伝えます。その後Katib controllerがtrialを生成し、一意なIDであるtrial nameを生成します。Suggestion serviceからみたときに、自分が生成したパラメーターにどのtrial nameが紐づくかを特定する方法は基本的にありません。TPEやRandom search、Gaussian Processなどこれまでサポートされてきた多くの最適化手法ではこれは問題になりませんが、CMA-ESは別です。

CMA-ESは、多変量正規分布を用意しその分布から解を生成、その評価値をもとに多変量正規分布のパラメーターをよりよい解をサンプルする分布へと更新します。分布が更新されるごとに世代番号がインクリメントされ、分布の更新に利用する解はかならず同じ世代番号の分布からサンプルされている必要があります。分布の世代番号はGoptunaのTrialのメタデータとして紐付けているため、Goptuna TrialとKatib Trialを適切に紐付ける必要があります。

現状ではGoptunaのStudyとは別に trialMapping map[str]int という変数を用意して、Katibのtrial name (str)からGoptunaのtrial ID (int)へのmappingを用意し、 trialMapping にまだ紐付けされていないtrialの中からパラメーターが完全に一致したものをtrialMappingへ追加しています。初期の実装ではKatib-controllerとのやりとりの際に生じる桁落ちを懸念して、パラメーターどうしのマンハッタン距離から最も似ているtrialを紐付けていましたが、Katibはprotobufの定義も(おそらくetcdかなにかに保存するときも)内部表現は全て文字列です。そのため桁落ちなどの心配がなく完全一致でチェックすることにしました。あらためて考えるとTrialのパラメーター表現を文字列で統一するのはかなりわかりやすくリーズナブルだなと思います。

あと実装してから気づいたのですが、よく見たらSuggestion serviceを実装するためのドキュメントがありました。普段あまりドキュメントに期待していないのですが、ParameterAssignmentsが requestNumber で指定された個数だけパラメーターを返さないといけないこととか気づくのに1時間ぐらい時間溶かしたりしてたので、最初に読んでおくことがおすすめです。

katib/new-algorithm-service.md at master · kubeflow/katib · GitHub

所感と今後の展望

設計的にも面白いソフトウェアだなと思いました。また今はOptunaのcommitterもやっていますが、前の部署にいたときにGoでgRPCのサーバー書いたり、OSSでkube-promptを公開していたりしていたので、スキルセット的にも結構マッチしている気がして開発が楽しかったです。

一方で、運用できるチームはかなり限られるかもしれないなというのが正直な感想です。少し動かしただけでもhyperoptのsuggestion serviceが完全に動かなくなってしまっているデグレがあって、原因探って修正するまでに自分も1晩中頑張ったりして苦労しました。Suggestion serviceがおかしいのか、metricscontrollerがおかしいのか、Katib controllerがおかしいのかを正しく状況に応じて切り分ける必要があり、Katibのコード読んでる人でもない限りなかなか難しいかもしれません。

個人的にはGoptunaをベースにすればKatibとの互換性を保ったままシンプルで運用のしやすいツールができる気もしているので、kubebuilder触りつつまた気が向いたときにでもやってみたいなと思います (NASの対応は、言語やフレームワークに依存せず汎用的に使える便利なツールを設計するのが現状では難しい気がしてるので諦める予定です)。

Django 3.1 MySQL db_flush() の高速化とTransactionTestCase利用時の注意点

追記: 翔泳社さんからDjangoの書籍を出版するので、ぜひ読んでみてください。


DjangoMySQL DatabaseOperations Backendのとある処理を最適化するためのpatchを書いていて、それがマージされたのですが、注意点があるため記事にしておこうと思います。 全部読むのが面倒な方向けに結論だけ先に書いておきます。

  • MySQLにおいて、TransactionTestCaseのteardown処理が高速になりました。
    • またほとんどのユーザーにはそれほど重要ではありませんが、 sqlflush コマンドも効率的なクエリを生成し、 flush コマンドも高速になります。
  • TransactionTestCaseを使ったテストで AUTO INCREMENT フィールドの値(デフォルトの主キーなど)に依存しているテストは、そのままだとMySQLではFAILするようになります。
    • 自分は基本的にMySQLSQLiteしか使わなくて知らなかったのですが、TransactionTestCaseにおいてAUTO INCREMENT のカウンターがテストケースごとにリセットされていたのは未定義動作です (そもそもそういうテストはアンチパターンかもしれないという話はここでは一旦置いておきます)。
    • この問題を回避するには reset_sequences オプションを明示的に True にする必要があります。

テスト時間が長く困っている方は3.1以降で改善されるかもしれません。

TransactionTestCaseのteardown処理について

厳密には flush management comandの高速化を行ったのですが、どうしてTransactionTestCaseが速くなるのかを解説します。 これにはTransactionTestCaseがtear down時に何をしているのかを理解する必要があります。

Djangoでテストを書いているという方はご存知のように、通常の TestCase では、テスト開始時にトランザクションを開始し、終了時にはロールバックすることで、テストメソッド内のDB操作を取り消します。つまり各テストケースで行った操作はロールバックにより切り戻されるため、他のテストケースの実行に影響を与えることはありません。 一方でそれだと困るケースも当然あり、そういうケースでは TransactionTestCase を使用します。 TrasactionTestCase は、各テストケースごとに全てのテーブルを初期化した状態にしておかないといけません。

どうやっているかというと、 flush management commandを call_command() 関数で発行しています。 この関数は、内部で各DatabaseOperations Backendの sql_flush() メソッドを呼び出し、そのメソッドが返すSQLを実行します。 flush management commandを普通に利用する人は、ここが少し速くなったところでそれほど嬉しくないと思いますが、何度もteardownで呼び出されるTransactionTestCase においては速度が重要になります。

sql_flush() の高速化

MySQL DatabaseOperations backendの sql_flush() メソッドは、全テーブルに対して TRUNCATE クエリを返す実装になっていました。 TRUNCATE クエリはドキュメントにも書いてあるように、 DELETE クエリや、 DROP TABLE && CREATE TABLE クエリに似ています。

TRUNCATE TABLE empties a table completely. It requires the DROP privilege. Logically, TRUNCATE TABLE is similar to a DELETE statement that deletes all rows, or a sequence of DROP TABLE and CREATE TABLE statements. https://dev.mysql.com/doc/refman/8.0/en/truncate-table.html

ただこのクエリは比較的時間がかかります。厳密にはテーブルのサイズが小さい場合に、DELETE クエリに比べ時間がかかります。 そこで DELETE クエリを使いたいのですが、 DELETE クエリで全ての情報が消えるわけではありません。 問題になるのは AUTO INCREMENT フィールドのカウンターの値です。 3つのレコードが登録され、その全てをDELETEクエリにより削除した場合、次に作成するレコードの AUTO INCREMENT フィールドは4から開始します。

そのため別途 ALTER TABLE tablename AUTO_INCREMENT = 1 などを発行してリセットする必要があります。 最初に書いたpatchではそういう処理をしていたのですが、core contributorsやmaintainerからのレビューを通して、カウンター値はリセットしないことになりました。 カウンター値がリセットされなくなることは破壊的変更のように思えますが、実はカウンター値がリセットされるかどうかは未定義動作だったようです。 Djangoのモデルの主キーは、デフォルトだとAUTO INCREMENT なINTEGERが利用されますが、この主キー値の値などをテストケースでチェックしている場合には、後述するオプションをつけていないとFailします。

TransactionTestCasereset_sequences オプション

先程の問題に対処するためには、ALTER TABLE tablename AUTO_INCREMENT = 1 を合わせて発行する必要があると説明しました。 それを有効にするのが TransactionTestCasereset_sequences オプションです。 詳細は Advanced features of TransactionTestCase  に書かれています。 Django 3.0まではMySQL利用時にこれをセットしても変わらなかったのですが、3.1からは必要に応じて reset_sequences=True を指定してください。

class TestsThatDependsOnPrimaryKeySequences(TransactionTestCase):
    reset_sequences = True

    def test_animal_pk(self):
        ...

細かく確認してないのですが、PostgreSQLのDatabase Operations backendのコードには、このreset_sequencesの処理が実装されていたので、PostgreSQLを使ってTransactionTestCase走らせていた方には常識だったのかもしれません。

ベンチマーク

ベンチマークのコードはこちらです。

GitHub - c-bata/django-fast-mysql-flush: for ticket #31275

number of records on each table before after
10 3.302 sec (+/- 0.076) 0.517 sec (+/- 0.019)
100 3.323 sec (+/- 0.047) 0.575 sec (+/- 0.025)
1000 3.577 sec (+/- 0.106) 1.046 sec (+/- 0.029)

余談 ( information_schema.tables の利用)

今回書いたpatchは、もともとヒューリスティックに1000行以下ならDELETE クエリを発行するように実装していました。 行数が多い場合には性能の改善がなく、むしろ遅くなる可能性もあるからです。 全テーブルに対して SELECT COUNT(*) で行数を調べていると余計に時間がかかる可能性があるため、 information_schema.tablestable_rows から行数を取り出して判断しました。 この値はおおよその値が返ってくるだけですが(MyISAMを除く)、今回のようにざっくり1000行以上あるかどうかを知りたいときには十分です。

TABLE_ROWS

The number of rows. Some storage engines, such as MyISAM, store the exact count. For other storage engines, such as InnoDB, this value is an approximation, and may vary from the actual value by as much as 40% to 50%. In such cases, use SELECT COUNT(*) to obtain an accurate count.

TABLE_ROWS is NULL for INFORMATION_SCHEMA tables.

For InnoDB tables, the row count is only a rough estimate used in SQL optimization. (This is also true if the InnoDB table is partitioned.) https://dev.mysql.com/doc/refman/8.0/en/tables-table.html

ただ最終的にこの方針はやめることになりました。 自分が用意したベンチマークでは2倍程度高速でしたが、 DELETE 文と ALTER TABLE tablename AUTO INCRMENT = 1 の2つのSQLを発行しているためcore contributorsの方が試したベンチマーク問題では遅くなったりもしたようです。 詳しく見てみようかとも思ったのですが、ヒューリスティックを入れるのはあまり筋がよくないのと、たかだか2倍程度の改善だったのでまぁいいかなと思い今の実装に落ち着きました。

ちなみに information_schema.tables から auto_increment を取り出して、それが1より大きい場合のみ TRUNCATE を呼ぶという実装も試してみたのですが、INSERTをしても AUTO_INCREMENT の値が更新されず1のままにいることが頻繁にありテストが落ちるため諦めました。ドキュメントを読んでもapproximationとは書かれていないので理由がよくわからないのですが、もし知ってる方いたら教えて下さい。

AUTO_INCREMENT: The next AUTO_INCREMENT value. https://dev.mysql.com/doc/refman/5.7/en/tables-table.html

自分があとから思い出すためのメモでもあったので、雑な記事でしたがこれで終わり。