読者です 読者をやめる 読者になる 読者になる

Airbnbの「Airflow」を試す

こんにちは。最近pythonと戯れているmatsです。

 

6月の頭にこんなリリースがありました。

http://japan.cnet.com/news/service/35065498/

今回はこの「Airflow」を試してみようかと思います。

 

Airflowとは

「Airflow」はAirbnbオープンソースとして公開したワークフロー管理プラットフォームです。

データパイプラインを規模に応じてタイムリーにオーサリング、スケジューリング、モニタリングすることができ、過去数年間で爆発的に成長したAirbnbにとっては不可欠な存在とのこと。

 

ソースコード: https://github.com/airbnb/airflow/

ドキュメント: http://pythonhosted.org/airflow/

 

Airflowのインストール

Airflowはpython製で2.7系のみの対応です。今回はAmazon Linuxにインストールを行いました。

 

準備

コンパイルに必要なパッケージをインストールします。

$ sudo yum -y groupinstall "Development Tools"

 

Airflow本体のインストール

pythonのライブラリとして公開されているので、pipコマンドで導入可能です。(結構時間がかかります)

$ pip install airflow

 

データベースの初期化

次のコマンドでデータベースの初期化を行います。

デフォルトがSQLiteになっていてサクッとはじめられるのはいいですね(^^)

$ airflow initdb

DB: sqlite:////home/ec2-user/airflow/airflow.db
Done.

 

Airflowの起動

webserverを起動します。

$ airflow webserver

 ____________ _____________
 ____ |__( )_________ __/__ /________ __
____ /| |_ /__ ___/_ /_ __ /_ __ _ | /| / /
___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /
 _/_/ |_/_/ /_/ /_/ /_/ ____/____/|__/

2015-06-28 16:34:20,376 - root - INFO - Filling up the DagBag from /home/ec2-user/airflow/dags
2015-06-28 16:34:20,376 - root - INFO - Importing /usr/local/lib/python2.7/site-packages/airflow/example_dags/example_python_operator.py
2015-06-28 16:34:20,377 - root - INFO - Loaded DAG <DAG: example_python_operator>
2015-06-28 16:34:20,378 - root - INFO - Importing /usr/local/lib/python2.7/site-packages/airflow/example_dags/example_bash_operator.py
2015-06-28 16:34:20,378 - root - INFO - Loaded DAG <DAG: example_bash_operator>
2015-06-28 16:34:20,378 - root - INFO - Importing /usr/local/lib/python2.7/site-packages/airflow/example_dags/tutorial.py
2015-06-28 16:34:20,379 - root - INFO - Loaded DAG <DAG: tutorial>
Running Tornado server on host 0.0.0.0 and port 8080...

 

worker等を起動する場合は別途MySQL等が必要です。(初期設定ではceleryのバックエンドにMySQLを使用することになっています)

$ airflow worker

 -------------- celery@ip-192-168-4-67 v3.1.18 (Cipater)
---- **** -----
--- * *** * -- Linux-3.14.44-32.39.amzn1.x86_64-x86_64-with-glibc2.2.5
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: airflow.executors.celery_executor:0x7f270bd6dd90
- ** ---------- .> transport: sqla+mysql://airflow:airflow@localhost:3306/airflow
- ** ---------- .> results: db+mysql://airflow:airflow@localhost:3306/airflow
- *** --- * --- .> concurrency: 16 (prefork)
-- ******* ----
--- ***** ----- [queues]
 -------------- .> default exchange=celery(direct) key=celery


[2015-06-28 15:09:19,033: WARNING/MainProcess] celery@ip-192-168-4-67 ready.
Starting flask
 * Running on http://0.0.0.0:8793/ (Press CTRL+C to quit)

 

$ airflow flower

[I 150628 15:09:45 command:114] Visit me at http://localhost:5555
[I 150628 15:09:45 command:116] Broker: sqla+mysql://airflow:airflow@localhost:3306/airflow
[I 150628 15:09:45 command:119] Registered tasks:
 ['celery.backend_cleanup',
 'celery.chain',
 'celery.chord',
 'celery.chord_unlock',
 'celery.chunks',
 'celery.group',
 'celery.map',
 'celery.starmap']
[I 150628 15:09:45 mixins:225] Connected to sqla+mysql://airflow:airflow@localhost:3306/airflow

 

ブラウザから http://:8080 にアクセスすると・・・

Airflow_-_DAGs

お、でた!(使い方がwからない。。。)

 

所感

Q. で、結局のところ使えるの?

所定のルールでpythonスクリプト書き、登録するとこんな感じで表示されるようです。(for文でクルクル回している処理が、小ジョブとしてそれぞれ表示されていたりするのはおもしろいですね)

Airflow_DAGs

ワークフロー全体で1つのコードのようなので、動的にジョブを組み合わせるとかは難しいのかもしれません。

IMには、用意されたジョブを動的に連結させて動かす仕組みが既にあるので、同様のことをする場合はジョブを選択していくとAirflow登録用のpythonコードが生成出来る仕組みが別途必要そうです。

ある程度固まっているワークフローをガンガン回すのには向いている感じがします。

 

Q. アーキテクチャー的には?

キューの管理やデータストアやワーカーは独立させる事ができるみたいなので、比較的障害等には強い気がします。

 

Q. オートスケールとかAWSとの相性は?

まだわかりません。ワーカーとしてスポットインスタンスを使えると最高なのですが。。

キュー管理に使われているceleryはバックエンドにSQSを使えたりするみたいです。

 

 

似たような仕組みを既に社内に持っているので実際に使うかは微妙ですが、引き続き動向は追っていこうかと思います。(試すとか言って起動しかしていないのはご愛嬌)

 

おわり