「なるクラ」とは
ローカルPCでもなるべくクラウド環境らしい使い心地で データパイプライン構築や機械学習の実験管理ができるデータ分析環境を 構築してみました。
Gemini曰く、
この環境は、クラウド環境で利用されるS3、オーケストレーター、 MLトラッキングサービスなどの主要コンポーネントをローカルの Dockerコンテナで代替することで、「クラウドネイティブな開発 スタイル」を学ぶためのサンドボックスとして機能します。
ソースコード(docker-compose.yaml等)はGitHubに公開しています。
システム構成
- データストア: Versity S3 Gateway(S3互換のゲートウェイ)
- ソースコードリポジトリ: Gitデーモン
- パイプライン: Prefect(ワークフローオーケストレータ)
- 実験管理: MLflow(モデル・パラメタ・実験結果のトラッキング)
- ノートブック: Marimo(おまけ。対話的な開発用)
追記: S3代替として当初はMinIOを利用していましたが、最近 バイナリやDockerイメージの配布をやめてしまった ようなので、代わりにVersityを採用しました。 ファイルシステムへS3プロトコルでアクセスするために使います。
構成図は以下のようになります。 (Marimoはシステム全体と疎結合なので、見やすさのため省略)
graph TD
classDef host fill:#f5f5f5,stroke:#333,stroke-width:2px;
classDef app fill:#e1f5fe,stroke:#0277bd,stroke-width:2px;
classDef infra fill:#fff9c4,stroke:#fbc02d,stroke-width:2px;
subgraph host_net [Host Environment]
host[PC]:::host
end
subgraph docker_net [Docker Network]
prefect_server(Prefect Server):::app
prefect_worker(Prefect Worker):::app
mlflow(MLflow Server):::app
git[(Git Daemon)]:::infra
versitygw[(Versity S3)]:::infra
postgres[(PostgreSQL)]:::infra
host -->|Code| git -->|Code| prefect_worker
host -->|Web UI / CLI| prefect_server
host -->|Web UI| mlflow
host <-->|Data| versitygw
prefect_server -.-|Backend| postgres
prefect_server -->|Run| prefect_worker
prefect_worker -->|API| mlflow
prefect_worker <-->|Data| versitygw
mlflow -.-|Backend| postgres
mlflow -->|Artifacts| versitygw
end
使い方の流れ
以下のような流れで実験を回していくことになります:
- (必要なら)分析対象データをS3(
http://localhost:7070)にアップロード - 実験を行うワークフローを実装(
experimemnts/*.py) - Gitリポジトリ(
http:localhost:9010/repo)にコードをgit push - Prefectにデプロイ(
prefect deploy) - Prefectでワークフローを実行(
prefect deployment run) - MLflow(
http://localhost:5001)で実験結果を確認
環境変数の設定など、細かい事前準備はGitHubのREADMEに書いています。
Prefectワークフローの実装
Prefectのデコレータ(@flow, @task)で実験のワークフローや
タスクを定義します。mlflow.log_params()など仕込んで、MLflowで
トラッキング!
@flow
def example(n_tests: int = 1) -> None:
"""An example experiment as a Prefect flow with MLflow tracking."""
mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)
mlflow.set_experiment(f"example-experiment-{datetime.now()}")
df = pd.read_csv("s3://data/iris.csv")
x = df.drop(columns=["target", "species"])
y = df["target"]
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.3)
test_data = x_test.copy()
test_data["target"] = y_test
test_dataset = mlflow.data.from_pandas(test_data, targets="target")
@task
def run(n_estimators, criterion) -> None:
"""A run in the experiment."""
with mlflow.start_run():
params = {"n_estimators": n_estimators, "criterion": criterion}
mlflow.log_params(params)
model = RandomForestClassifier(**params)
model.fit(x_train, y_train)
model_info = mlflow.sklearn.log_model(model, input_example=x_train)
mlflow.models.evaluate(
model=model_info.model_uri,
data=test_dataset,
model_type="classifier",
)
for n_estimators in [2**i for i in range(5, 8)]:
for criterion in ["gini", "entropy", "log_loss"]:
for _ in range(n_tests):
run(n_estimators, criterion)
Prefectワークフローのデプロイと実行
prefect.yamlにデプロイ内容を宣言し、prefect deployを実行することで、
ワークフローをPrefectサーバに登録します。
prefect.yamlの記述例:
deployments:
- name: experiment
tags: [test]
parameters:
n_tests: 5
entrypoint: experiments/example.py:example
work_pool:
name: sandbox
job_variables: {}
デプロイ後に、以下のコマンドで実行:
$ prefect deployment run example/experiment
PrefectのウェブUI(http://localhost:4200/runs)にアクセスすると、 実行の様子をリアルタイムで確認できます。
MLflowで実験結果の確認と分析
MLflowのウェブUI(http://localhost:5001)にアクセスして、トラッキング された実験結果を確認したり、パラメタごとの比較分析ができます。
結語
なるべくクラウド環境っぽいローカル環境が作れました。 これにデータレイク等(DuckLakeとか)を足せば更にクラウドっぽくなる ことでしょう。