はじめに
この記事では、Airflowとは何か概要を知りたい方向けの記事です。
Airflow UIの詳細な解説や、Airflowのコードの詳細に関しては解説いたしませんのであらかじめご了承ください。
Airflowとは
Airflow(Apache Airflow)とは、ワークフロー管理ツールです。
ワークフローをプログラムで作成することで、その実行をスケジューリングしたり、モニタリングを出来るプラットフォームになります。
Airflowの中でよく登場する用語として、「DAG」や「ワークフロー」と呼ばれる用語があります。
DAG (Directed Acyclic Graph)とは、依存関係のある処理同士をつなぎこんだものをDAG(有向非巡回グラフ)と呼び、DAGを用いてワークフローを作成します。 DAGの詳細には触れませんが、一方向のみ前方向に情報が流れるといった意味を持つグラフです。ワークフローというのは、データの取り組み・データの変換・分析といったデータ分析における一連のタスクを意味します。
Airflowのインストール
Airflowをインストールする方法として、ローカル環境にインストールする方法とDockerを用いてインストールする方法の2通りがあります。
ローカルの場合、pipを用いて以下のようにインストールが可能です。
pip install apache-airflow
インストール後、Pythonプログラムから以下のように読み込めればインストール成功です。
import airflow
Airflow Web UIを立ち上げる
Airflow Web UIを立ち上げる方法を簡単に解説します。
先ほどのセクションで、Airflowがインストールされている状態で進めます。
以下の手順に従ってWeb UIを立ち上げることができます。
①Airflowデータベースを初期化
Mac環境の場合、ターミナルを開いて、以下のコマンドを実行します。以下のコマンドでは、Airflowデータベースを初期化します。
airflow db init
②Airflowスケジューラを起動
続けてターミナルで以下のコマンドを実行します。
以下のコマンドではAirflowスケジューラを起動します。
airflow scheduler
③新規にターミナルを開きAirflow Webサーバーを起動します。
②でスケジューラを起動したターミナルをそのまま開いた状態で別のターミナルを開いてください。
なお②のターミナルは閉じないように注意してください。
新しいターミナルを開いて、以下のコマンドを実行してAirflow Webサーバーを起動します。
airflow webserver --port 8080
上記のコマンドにより、ポート番号8080
を使用してWebサーバーが起動されます。
なお指定したポート番号に変更も出来ます。ポート番号を変更したい場合は、--port
オプションの後に任意のポート番号を指定してください。
上記の手順後、Webブラウザより、http://localhost:8080
にアクセスすると以下の画面が立ち上がります。
Airflow Web UIより、DAGの一覧やタスクのステータス、ログなどを確認することができます。
Airflow Web UI にログインする
UsernameとPasswordを発行するには以下の手順を行います。
①以下のコマンドで初期ユーザーを作成します。以下のYOUR_USERNAME
とYOUR_EMAIL
を任意のユーザー名とメールアドレスに置き換えてください。
airflow users create --username YOUR_USERNAME --firstname YOUR_FIRSTNAME --lastname YOUR_LASTNAME --role Admin --email YOUR_EMAIL
コマンドを実行すると、パスワードの入力を求められますので、パスワードを入力して、ユーザーの作成を完了します。入力後、もう一度パスワードの入力を求められますので、2回入力後アカウントが作成されます。
Airflow Web UIのログイン画面で作成したユーザー名とパスワードを入力することでログインできます。
Airflowのサンプルコード
Airflowを使用したデータパイプラインを作成するためのPythonサンプルコードを以下に提示します。
このサンプルコードは、task1
とtask2
という2つのタスクを定義し、task1
が完了した後にtask2
が実行されるように設定されているコードです。
# このサンプルコードを実行する場合は以下の手順です。
# ①Airflowのdagsフォルダに保存
# ②Airflow Web UIでこのDAGを確認・実行
# 必要なライブラリを読み込む
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
# タスクで実行されるPython関数を定義
def execute_task1():
print("タスク1が実行")
def execute_task2():
print("タスク2が実行")
# DAGの設定
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1)
}
dag = DAG(
'simple_pipeline',
default_args=default_args,
description='Airflowを使ったシンプルなパイプラインサンプル',
schedule_interval=timedelta(days=1),
start_date=datetime(2023, 1, 16),
catchup=False
)
# タスクの定義
task1 = PythonOperator(
task_id='task1',
python_callable=execute_task1,
dag=dag
)
task2 = PythonOperator(
task_id='task2',
python_callable=execute_task2,
dag=dag
)
# タスク間の依存関係を設定
task1 >> task2
おわりに
この記事では、Airflowとは何か、DAGとは何か、インストール方法等に関して簡単に紹介しました。
Airflowを学ぶには実際に手を動かすことが近道です。
一番最初にAirflowの公式チュートリアルを元に読み進めながら手を動かしながら学ぶのが良いでしょう。そのほかにもインターネット上の記事を参考にしたり、書籍などを参考にAirflowを学んでみてください。
しかし独学で学んでいると、実務でどこまでの知識が必要なのか、わからないことがあった時にすぐに聞けないと困るという方もいらっしゃるかもしれません。そういった方はAI Academy Bootcampのオンラインチャットサポートプランをご受講ください。6ヶ月間チャットでいつでも無制限で質問し放題の環境で学べます。
実際に機械学習エンジニアやデータサイエンティストとして活躍している講師に回答が得られます。