Kubeflow/KatibがGoptunaを使った最適化に対応しました。

KubeflowのKatibというハイパーパラメーター最適化等を担当するコンポーネントにGoptunaを使ったSuggestion serviceを実装しました。

github.com

Goptunaはハイパーパラメーター最適化ライブラリとして、機能面でも実装品質の面でもPythonで人気のライブラリに劣らないものになってきたかなと思う一方で、Goの機械学習ユーザーはまだまだ少なく、その中でも学習までGoでこなしている人はさらに絞られるので、せめてこういった方面で使われていくと嬉しいなと思います。ブラックボックス最適化フレームワークとしていろんな用途に利用できるソフトウェアなのでみなさんもぜひ触ってみてください。

github.com

Katibの基本的な使い方

Katibは基本的に特定の言語やフレームワークに依存しないように設計されています。パラメーターや評価値の受け渡しをどうやっているかというと、コマンドライン引数などからパラメーターを受け取り、目的関数の評価値を予め決めておいたフォーマットで標準出力に出したり、ログファイルとして保存したりします。例として f(x1, x2) = (x1-5)^2 + (x2+5)^2 を目的関数として用意しました (GitHub: https://github.com/c-bata/katib-goptuna-example)。

import argparse
import logging

logging.basicConfig(filename='/var/log/katib.log', level=logging.DEBUG)


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--x1", dest='x1', type=float)
    parser.add_argument("--x2", dest='x2', type=float)
    args = parser.parse_args()

    evaluation = (args.x1 - 5) ** 2 + (args.x2 + 5) ** 2

    logging.info(f'{{metricName: evaluation, metricValue: {evaluation:.4f}}};')


if __name__ == '__main__':
    main()
FROM python:3-alpine

ADD main.py /usr/src/main.py
WORKDIR /usr/src

この関数を最適化する際にExperiment CRを作成します。OptunaでいうところのStudyに対応するものです (昔はStudyと呼ばれていたのか、コード読んでるとStudyって書かれてるところもありますが、詳細はまだよく分かっていません)。

Experiment CRが作成されると、パラメーターを変えながらKubernetes Jobを何個も作成し先程のPythonファイルを実行します。各パラメータによる試行はTrialと呼ばれます。Experiment CRで指定するのはざっくり次のような項目です。

  • 目的関数の評価回数や並列数など
  • 目的関数に与えるパラメーターの探索範囲
  • 探索アルゴリズム (文字列で tpecmaes を指定)
    • アルゴリズムがどのSuggestion serviceに対応するかは、 katib-config.yaml で指定します。
    • デフォルトは https://github.com/kubeflow/katib/blob/master/manifests/v1alpha3/katib-controller/katib-controller.yaml です。
    • 質的変数を探索空間に含む場合には tpe を利用する方が多いかと思いますが、デフォルトでは hyperopt ベースのsuggestion serviceが利用されます。Goptunaにも tpe が実装されているので、対応するイメージをGoptuna suggestion serviceのイメージに書き換えればCMA-ESだけでなくTPEでもGoptunaを利用します。
  • metricscollectorがどのファイルからどういう形式で評価値を取り出すか
  • Kubernetes Jobのmanifest template。どういうふうにコマンドライン引数からパラメーターを与えるかはここで指定できます。

具体例は次のとおりです。

apiVersion: "kubeflow.org/v1alpha3"
kind: Experiment
metadata:
  namespace: kubeflow
  labels:
    controller-tools.k8s.io: "1.0"
  name: example
spec:
  objective:
    type: minimize
    goal: 0.001
    objectiveMetricName: evaluation
  algorithm:
    algorithmName: cmaes
  metricsCollectorSpec:
    source:
      filter:
        metricsFormat:
          - "{metricName: ([\\w|-]+), metricValue: ((-?\\d+)(\\.\\d+)?)}"
      fileSystemPath:
        path: "/var/log/katib.log"
        kind: File
    collector:
      kind: File
  parallelTrialCount: 2
  maxTrialCount: 250
  maxFailedTrialCount: 3
  parameters:
    - name: x1
      parameterType: double
      feasibleSpace:
        min: "-10"
        max: "10"
    - name: x2
      parameterType: double
      feasibleSpace:
        min: "-10"
        max: "10"
  trialTemplate:
    goTemplate:
        rawTemplate: |-
          apiVersion: batch/v1
          kind: Job
          metadata:
            name: {{.Trial}}
            namespace: {{.NameSpace}}
          spec:
            template:
              spec:
                containers:
                - name: {{.Trial}}
                  image: docker.io/cbata/hello-katib-quadratic-function
                  command:
                  - "python3"
                  - "main.py"
                  {{- with .HyperParameters}}
                  {{- range .}}
                  - "--{{.Name}} {{.Value}}"
                  {{- end}}
                  {{- end}}
                restartPolicy: Never

準備に必要なファイルはこれだけです。あとは Katib リポジトリにある、 scripts/v1alpha3/deploy.sh を実行してKatib controller等を立ち上げ、上記のExperiment CRDを適用すれば最適化が実行されます。

試行結果は kubectl port-forward svc/katib-ui -n kubeflow 8080:80 を実行してKatib UI https://localhost:8080/katib をブラウザで開いて確認できます。

f:id:nwpct1:20200411235014p:plain
Katib dashboard

Suggestion serviceの実装

Suggestion serviceは、過去のtrialsを受け取り次に探索すべきパラメーターを返すgRPCのサーバーです。 基本的にKatibを利用する上では知る必要はそんなにないので、適宜読み飛ばしてください。 Katibの動作の流れは次の画像にまとまっています。

katib workflow

katib/suggestion.md at master · kubeflow/katib · GitHub

Experimentの作成後、Experimentで指定されたalgorithmNameをもとに katib-config から対応するSuggestion serviceのImageを特定し、Katib controllerがSuggestion CRをapply、Suggestion CRがSuggestion serviceのdeploymentをapplyしPodが生成されます。

Suggestion serviceがreadyになったあと、Katib controllerは過去の試行結果を入力にして次に探索するパラメーターを取得する GetSuggestions() gRPC エンドポイントを何度も叩いてきます。Suggestion controllerはsuggestion serviceを1台しか立ち上げないため、各workerが勝手に解をサンプルしていくOptunaやGoptunaと比べアルゴリズムは実装しやすいなと思いました。Suggestion serviceを実装する際に問題になりやすいのは、Katib controllerがExperimentに紐づけて保持する過去の試行結果と、Goptunaの内部状態をどうやって同期するかです。

Katibの設計ではSuggestion serviceはただパラメーターをサンプルし、Katib controllerに伝えます。その後Katib controllerがtrialを生成し、一意なIDであるtrial nameを生成します。Suggestion serviceからみたときに、自分が生成したパラメーターにどのtrial nameが紐づくかを特定する方法は基本的にありません。TPEやRandom search、Gaussian Processなどこれまでサポートされてきた多くの最適化手法ではこれは問題になりませんが、CMA-ESは別です。

CMA-ESは、多変量正規分布を用意しその分布から解を生成、その評価値をもとに多変量正規分布のパラメーターをよりよい解をサンプルする分布へと更新します。分布が更新されるごとに世代番号がインクリメントされ、分布の更新に利用する解はかならず同じ世代番号の分布からサンプルされている必要があります。分布の世代番号はGoptunaのTrialのメタデータとして紐付けているため、Goptuna TrialとKatib Trialを適切に紐付ける必要があります。

現状ではGoptunaのStudyとは別に trialMapping map[str]int という変数を用意して、Katibのtrial name (str)からGoptunaのtrial ID (int)へのmappingを用意し、 trialMapping にまだ紐付けされていないtrialの中からパラメーターが完全に一致したものをtrialMappingへ追加しています。初期の実装ではKatib-controllerとのやりとりの際に生じる桁落ちを懸念して、パラメーターどうしのマンハッタン距離から最も似ているtrialを紐付けていましたが、Katibはprotobufの定義も(おそらくetcdかなにかに保存するときも)内部表現は全て文字列です。そのため桁落ちなどの心配がなく完全一致でチェックすることにしました。あらためて考えるとTrialのパラメーター表現を文字列で統一するのはかなりわかりやすくリーズナブルだなと思います。

あと実装してから気づいたのですが、よく見たらSuggestion serviceを実装するためのドキュメントがありました。普段あまりドキュメントに期待していないのですが、ParameterAssignmentsが requestNumber で指定された個数だけパラメーターを返さないといけないこととか気づくのに1時間ぐらい時間溶かしたりしてたので、最初に読んでおくことがおすすめです。

katib/new-algorithm-service.md at master · kubeflow/katib · GitHub

所感と今後の展望

設計的にも面白いソフトウェアだなと思いました。また今はOptunaのcommitterもやっていますが、前の部署にいたときにGoでgRPCのサーバー書いたり、OSSでkube-promptを公開していたりしていたので、スキルセット的にも結構マッチしている気がして開発が楽しかったです。

一方で、運用できるチームはかなり限られるかもしれないなというのが正直な感想です。少し動かしただけでもhyperoptのsuggestion serviceが完全に動かなくなってしまっているデグレがあって、原因探って修正するまでに自分も1晩中頑張ったりして苦労しました。Suggestion serviceがおかしいのか、metricscontrollerがおかしいのか、Katib controllerがおかしいのかを正しく状況に応じて切り分ける必要があり、Katibのコード読んでる人でもない限りなかなか難しいかもしれません。

個人的にはGoptunaをベースにすればKatibとの互換性を保ったままシンプルで運用のしやすいツールができる気もしているので、kubebuilder触りつつまた気が向いたときにでもやってみたいなと思います (NASの対応は、言語やフレームワークに依存せず汎用的に使える便利なツールを設計するのが現状では難しい気がしてるので諦める予定です)。

optuna-memorydump で InMemoryStorage の結果を定期的にSQLite3に書き出す。

CPUたくさん積んだマシン1台で、Optunaを回すときにstorage backendの選択は悩ましい点があります。 結果をDashboardやJupyterで詳しく確認したり、あとから最適化を再開するためにRDB storage backendを使って永続化しておきたいのですが、マシン1台しか使わないときにわざわざMySQLとかPostgreSQL立てるのは結構面倒なので、自然とSQLite3を選びたくなります。

ただSQLite3はマルチスレッドやマルチプロセスによる最適化時に、あまりおすすめできません。 そもそもLock errorを完全に防ぐことが難しいという話もありますが、 enqueue_trial など一部の機能は 行ロックのできないSQLite3 では並列最適化時に正しく動作しません。またRDB storage backendは性能面の課題もあり、RandomSamplerやCmaEsSamplerで実行時間の短い目的関数を数千回評価したいときにはほぼ確実にRDB storage backedがボトルネックになるかと思います。

そこで今回は高速に動作するIn-memory storage backendで最適化を回しつつ、評価回数が500回や1000回など指定したintervalに達したときにSQLite3に書き出すツールを用意してみました。冪等性を保証していて何回書き出しても差分だけを適用します。思いつきでサクッと作ってみたツールですが、思ったよりも便利そうなのでよければ使ってみてください。

optuna-memorydump の使い方

GitHubリポジトリはこちらです。まだPyPIには上げていないのでGitHubからインストールしてください。

github.com

$ pip install git+https://github.com/c-bata/optuna-memorydump.git

使い方はこんな感じでCallback関数としてOptunaに登録します。

import optuna
from optuna_memorydump import MemoryDumpCallback


def objective(trial):
    x1 = trial.suggest_uniform("x1", -100, 100)
    x2 = trial.suggest_uniform("x2", -100, 100)
    return 100 * (x2 - x1 ** 2) ** 2 + (x1 - 1) ** 2


if __name__ == "__main__":
    study = optuna.create_study(
        study_name="dumped", sampler=optuna.samplers.CmaEsSampler(),
    )
    study.optimize(
        objective, timeout=30, n_jobs=4, gc_after_trial=False,
        callbacks=[MemoryDumpCallback("sqlite:///db.sqlite3", interval=100)],
    )
    print("Best value: {} (params: {})\n".format(study.best_value, study.best_params))

intervalを100にすると、100回評価が終わるたびにcallbackが反応して指定したstorageに書き出します。intervalを指定できるようにしたのは、仮に1日中回し続けたいといったケースで12時間後にエラーがでて最初からやり直しとかを避けるためです。

$ python examples/rosenbrock.py
[I 2020-04-12 18:33:39,200] Finished trial#0 with value: ...
[I 2020-04-12 18:33:39,204] Finished trial#2 with value: ...
[I 2020-04-12 18:33:39,205] Finished trial#1 with value: ...
:
[I 2020-04-12 18:33:39,924] Finished trial#100 with value: ...
[I 2020-04-12 18:33:39,931] memorydump is triggered at trial 100 (thread=123145615126528).
[I 2020-04-12 23:40:20,013] memorydump of trial 100 is finished in 1.938s (thread=123145458638848).
:
[I 2020-04-12 18:33:39,924] Finished trial#200 with value: ...
[I 2020-04-12 23:40:21,505] memorydump of trial 200 is finished in 0.105s (thread=123145458638848).
[I 2020-04-12 23:40:24,061] memorydump is triggered at trial 300 (thread=123145425059840).
:

Django 3.1 MySQL db_flush() の高速化とTransactionTestCase利用時の注意点

DjangoMySQL DatabaseOperations Backendのとある処理を最適化するためのpatchを書いていて、それがマージされたのですが、注意点があるため記事にしておこうと思います。 全部読むのが面倒な方向けに結論だけ先に書いておきます。

  • MySQLにおいて、TransactionTestCaseのteardown処理が高速になりました。
    • またほとんどのユーザーにはそれほど重要ではありませんが、 sqlflush コマンドも効率的なクエリを生成し、 flush コマンドも高速になります。
  • TransactionTestCaseを使ったテストで AUTO INCREMENT フィールドの値(デフォルトの主キーなど)に依存しているテストは、そのままだとMySQLではFAILするようになります。
    • 自分は基本的にMySQLSQLiteしか使わなくて知らなかったのですが、TransactionTestCaseにおいてAUTO INCREMENT のカウンターがテストケースごとにリセットされていたのは未定義動作です (そもそもそういうテストはアンチパターンかもしれないという話はここでは一旦置いておきます)。
    • この問題を回避するには reset_sequences オプションを明示的に True にする必要があります。

テスト時間が長く困っている方は3.1以降で改善されるかもしれません。

TransactionTestCaseのteardown処理について

厳密には flush management comandの高速化を行ったのですが、どうしてTransactionTestCaseが速くなるのかを解説します。 これにはTransactionTestCaseがtear down時に何をしているのかを理解する必要があります。

Djangoでテストを書いているという方はご存知のように、通常の TestCase では、テスト開始時にトランザクションを開始し、終了時にはロールバックすることで、テストメソッド内のDB操作を取り消します。つまり各テストケースで行った操作はロールバックにより切り戻されるため、他のテストケースの実行に影響を与えることはありません。 一方でそれだと困るケースも当然あり、そういうケースでは TransactionTestCase を使用します。 TrasactionTestCase は、各テストケースごとに全てのテーブルを初期化した状態にしておかないといけません。

どうやっているかというと、 flush management commandを call_command() 関数で発行しています。 この関数は、内部で各DatabaseOperations Backendの sql_flush() メソッドを呼び出し、そのメソッドが返すSQLを実行します。 flush management commandを普通に利用する人は、ここが少し速くなったところでそれほど嬉しくないと思いますが、何度もteardownで呼び出されるTransactionTestCase においては速度が重要になります。

sql_flush() の高速化

MySQL DatabaseOperations backendの sql_flush() メソッドは、全テーブルに対して TRUNCATE クエリを返す実装になっていました。 TRUNCATE クエリはドキュメントにも書いてあるように、 DELETE クエリや、 DROP TABLE && CREATE TABLE クエリに似ています。

TRUNCATE TABLE empties a table completely. It requires the DROP privilege. Logically, TRUNCATE TABLE is similar to a DELETE statement that deletes all rows, or a sequence of DROP TABLE and CREATE TABLE statements. https://dev.mysql.com/doc/refman/8.0/en/truncate-table.html

ただこのクエリは比較的時間がかかります。厳密にはテーブルのサイズが小さい場合に、DELETE クエリに比べ時間がかかります。 そこで DELETE クエリを使いたいのですが、 DELETE クエリで全ての情報が消えるわけではありません。 問題になるのは AUTO INCREMENT フィールドのカウンターの値です。 3つのレコードが登録され、その全てをDELETEクエリにより削除した場合、次に作成するレコードの AUTO INCREMENT フィールドは4から開始します。

そのため別途 ALTER TABLE tablename AUTO_INCREMENT = 1 などを発行してリセットする必要があります。 最初に書いたpatchではそういう処理をしていたのですが、core contributorsやmaintainerからのレビューを通して、カウンター値はリセットしないことになりました。 カウンター値がリセットされなくなることは破壊的変更のように思えますが、実はカウンター値がリセットされるかどうかは未定義動作だったようです。 Djangoのモデルの主キーは、デフォルトだとAUTO INCREMENT なINTEGERが利用されますが、この主キー値の値などをテストケースでチェックしている場合には、後述するオプションをつけていないとFailします。

TransactionTestCasereset_sequences オプション

先程の問題に対処するためには、ALTER TABLE tablename AUTO_INCREMENT = 1 を合わせて発行する必要があると説明しました。 それを有効にするのが TransactionTestCasereset_sequences オプションです。 詳細は Advanced features of TransactionTestCase  に書かれています。 Django 3.0まではMySQL利用時にこれをセットしても変わらなかったのですが、3.1からは必要に応じて reset_sequences=True を指定してください。

class TestsThatDependsOnPrimaryKeySequences(TransactionTestCase):
    reset_sequences = True

    def test_animal_pk(self):
        ...

細かく確認してないのですが、PostgreSQLのDatabase Operations backendのコードには、このreset_sequencesの処理が実装されていたので、PostgreSQLを使ってTransactionTestCase走らせていた方には常識だったのかもしれません。

ベンチマーク

ベンチマークのコードはこちらです。

GitHub - c-bata/django-fast-mysql-flush: for ticket #31275

number of records on each table before after
10 3.302 sec (+/- 0.076) 0.517 sec (+/- 0.019)
100 3.323 sec (+/- 0.047) 0.575 sec (+/- 0.025)
1000 3.577 sec (+/- 0.106) 1.046 sec (+/- 0.029)

余談 ( information_schema.tables の利用)

今回書いたpatchは、もともとヒューリスティックに1000行以下ならDELETE クエリを発行するように実装していました。 行数が多い場合には性能の改善がなく、むしろ遅くなる可能性もあるからです。 全テーブルに対して SELECT COUNT(*) で行数を調べていると余計に時間がかかる可能性があるため、 information_schema.tablestable_rows から行数を取り出して判断しました。 この値はおおよその値が返ってくるだけですが(MyISAMを除く)、今回のようにざっくり1000行以上あるかどうかを知りたいときには十分です。

TABLE_ROWS

The number of rows. Some storage engines, such as MyISAM, store the exact count. For other storage engines, such as InnoDB, this value is an approximation, and may vary from the actual value by as much as 40% to 50%. In such cases, use SELECT COUNT(*) to obtain an accurate count.

TABLE_ROWS is NULL for INFORMATION_SCHEMA tables.

For InnoDB tables, the row count is only a rough estimate used in SQL optimization. (This is also true if the InnoDB table is partitioned.) https://dev.mysql.com/doc/refman/8.0/en/tables-table.html

ただ最終的にこの方針はやめることになりました。 自分が用意したベンチマークでは2倍程度高速でしたが、 DELETE 文と ALTER TABLE tablename AUTO INCRMENT = 1 の2つのSQLを発行しているためcore contributorsの方が試したベンチマーク問題では遅くなったりもしたようです。 詳しく見てみようかとも思ったのですが、ヒューリスティックを入れるのはあまり筋がよくないのと、たかだか2倍程度の改善だったのでまぁいいかなと思い今の実装に落ち着きました。

ちなみに information_schema.tables から auto_increment を取り出して、それが1より大きい場合のみ TRUNCATE を呼ぶという実装も試してみたのですが、INSERTをしても AUTO_INCREMENT の値が更新されず1のままにいることが頻繁にありテストが落ちるため諦めました。ドキュメントを読んでもapproximationとは書かれていないので理由がよくわからないのですが、もし知ってる方いたら教えて下さい。

AUTO_INCREMENT: The next AUTO_INCREMENT value. https://dev.mysql.com/doc/refman/5.7/en/tables-table.html

自分があとから思い出すためのメモでもあったので、雑な記事でしたがこれで終わり。