https://cloud.google.com/sdk/gcloud/reference/composer
https://laboratory.kiyono-co.jp/630/gcp/
https://qiita.com/nokoxxx1212/items/77131e2a730a550f0b09
https://cloud.google.com/composer/docs/composer-2/run-apache-airflow-dag?hl=ja
Airflow バージョン: 2.6.3
Composer バージョン: 2.4.6
-- 1. 前作業
gcloud init
gcloud auth list
gcloud --version
gcloud projects create project01-9999999 \
--name="project01"
gcloud config list
gcloud config set project project01-9999999
gcloud config set compute/region asia-northeast1 --quiet
gcloud config set compute/zone asia-northeast1-a --quiet
gcloud beta billing accounts list
gcloud beta billing projects link project01-9999999 --billing-account=111111-111111-111111
gcloud services enable compute.googleapis.com --project project01-9999999
gcloud components update
-- 2. Cloud Composer の有効化
gcloud services list --enabled
gcloud services enable composer.googleapis.com --project project01-9999999
-- 3. サービス アカウントへ権限付与
gcloud projects get-iam-policy project01-9999999
service-000000000000@cloudcomposer-accounts.iam.gserviceaccount.com
へ権限付与
gcloud projects add-iam-policy-binding project01-9999999 \
--member="serviceAccount:service-000000000000@cloudcomposer-accounts.iam.gserviceaccount.com" \
--role="roles/owner"
-- 4. 環境の作成
gcloud composer environments create env01 \
--location asia-northeast1 \
--image-version composer-2.4.6-airflow-2.6.3
GKEやバケットが作成される
30分程度かかる
gcloud composer environments list \
--locations asia-northeast1
gcloud composer environments describe env01 \
--location asia-northeast1
-- 5. DAGファイルの作成
mkdir -p test
vim ./test/dag01.py
import datetime
import airflow
from airflow.operators import bash_operator
# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
default_args = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': YESTERDAY,
}
with airflow.DAG(
'dag01',
catchup=False,
default_args=default_args,
schedule_interval=datetime.timedelta(days=1)) as dag:
# Print the dag_run id from the Airflow logs
task01 = bash_operator.BashOperator(
task_id='task01', bash_command='echo {{ dag_run.id }}')
-- 6. DAGファイルをGCSにアップロードする
gcloud composer environments storage dags list \
--environment=env01 \
--location=asia-northeast1
gcloud composer environments storage dags import \
--environment=env01 \
--location=asia-northeast1 \
--source=./test
-- 7. 動作確認
Airflow webサーバーをクリック
gcloud composer environments run env01 dags trigger \
--location=asia-northeast1 \
-- dag01
-- 8. クリーンアップ
gcloud composer environments delete env01 \
--location asia-northeast1 \
--quiet
gcloud composer environments list \
--locations asia-northeast1
gcloud projects list
gcloud projects delete project01-9999999 \
--quiet