{GCP Cloud Composer}Cloud Composer 2 で Apache Airflow DAG を実行する

 

https://cloud.google.com/sdk/gcloud/reference/composer
https://laboratory.kiyono-co.jp/630/gcp/
https://qiita.com/nokoxxx1212/items/77131e2a730a550f0b09
https://cloud.google.com/composer/docs/composer-2/run-apache-airflow-dag?hl=ja

Airflow バージョン: 2.6.3
Composer バージョン: 2.4.6

 

-- 1. 前作業

gcloud init
gcloud auth list

gcloud --version

gcloud projects create project01-9999999 \
--name="project01"

gcloud config list
gcloud config set project project01-9999999
gcloud config set compute/region asia-northeast1 --quiet
gcloud config set compute/zone asia-northeast1-a --quiet

gcloud beta billing accounts list
gcloud beta billing projects link project01-9999999 --billing-account=111111-111111-111111

gcloud services enable compute.googleapis.com --project project01-9999999

gcloud components update


-- 2. Cloud Composer の有効化

gcloud services list --enabled


gcloud services enable composer.googleapis.com --project project01-9999999


-- 3. サービス アカウントへ権限付与

gcloud projects get-iam-policy project01-9999999

service-000000000000@cloudcomposer-accounts.iam.gserviceaccount.com
へ権限付与

 

gcloud projects add-iam-policy-binding project01-9999999 \
--member="serviceAccount:service-000000000000@cloudcomposer-accounts.iam.gserviceaccount.com" \
--role="roles/owner"

 


-- 4. 環境の作成


gcloud composer environments create env01 \
--location asia-northeast1 \
--image-version composer-2.4.6-airflow-2.6.3

 

GKEやバケットが作成される
30分程度かかる

 

gcloud composer environments list \
--locations asia-northeast1

gcloud composer environments describe env01 \
--location asia-northeast1

 

 


-- 5. DAGファイルの作成

mkdir -p test

vim ./test/dag01.py


import datetime
import airflow
from airflow.operators import bash_operator

# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': YESTERDAY,
}

with airflow.DAG(
        'dag01',
        catchup=False,
        default_args=default_args,
        schedule_interval=datetime.timedelta(days=1)) as dag:

    # Print the dag_run id from the Airflow logs
    task01 = bash_operator.BashOperator(
        task_id='task01', bash_command='echo {{ dag_run.id }}')

 

-- 6. DAGファイルをGCSにアップロードする


gcloud composer environments storage dags list \
--environment=env01 \
--location=asia-northeast1

gcloud composer environments storage dags import \
--environment=env01 \
--location=asia-northeast1 \
--source=./test

 

-- 7. 動作確認

Airflow webサーバーをクリック

 

gcloud composer environments run env01 dags trigger \
--location=asia-northeast1 \
-- dag01

 


-- 8. クリーンアップ

gcloud composer environments delete env01 \
--location asia-northeast1 \
--quiet

gcloud composer environments list \
--locations asia-northeast1

 

gcloud projects list

gcloud projects delete project01-9999999 \
--quiet