{GCP Cloud Composer} Airflow データベースにアクセスする

https://cloud.google.com/composer/docs/composer-2/access-airflow-database?hl=ja#public-ip
https://cloud.google.com/composer/docs/composer-2/write-dags?hl=ja
https://qiita.com/notrogue/items/3b3f1de879dd77dae359


Airflow バージョン: 2.6.3
Composer バージョン: 2.4.6

 

-- 1. 前作業

gcloud init
gcloud auth list

gcloud --version

gcloud projects create project01-9999999 \
--name="project01"

gcloud config list
gcloud config set project project01-9999999
gcloud config set compute/region asia-northeast1 --quiet
gcloud config set compute/zone asia-northeast1-a --quiet

gcloud beta billing accounts list
gcloud beta billing projects link project01-9999999 --billing-account=111111-111111-111111

gcloud services enable compute.googleapis.com --project project01-9999999

gcloud components update


-- 2. Cloud Composer の有効化

gcloud services list --enabled


gcloud services enable composer.googleapis.com --project project01-9999999


-- 3. サービス アカウントへ権限付与

gcloud projects get-iam-policy project01-9999999

service-000000000000@cloudcomposer-accounts.iam.gserviceaccount.com
へ権限付与

 

gcloud projects add-iam-policy-binding project01-9999999 \
--member="serviceAccount:service-000000000000@cloudcomposer-accounts.iam.gserviceaccount.com" \
--role="roles/owner"

 


-- 4. 環境の作成


gcloud composer environments create env01 \
--location asia-northeast1 \
--image-version composer-2.4.6-airflow-2.6.3

 


GKEやバケットが作成される
30分程度かかる

 

gcloud composer environments list \
--locations asia-northeast1

gcloud composer environments describe env01 \
--location asia-northeast1

 

 


-- 5. DAGファイルの作成

mkdir -p test

cat <<-'EOF' > ./test/dag01.py


import datetime
from airflow import models
from airflow.operators import bash_operator
from airflow.operators import python_operator


default_dag_args = {
    "start_date": datetime.datetime(2018, 1, 1),
}

with models.DAG(
    "dag01",
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
    catchup=False,
    is_paused_upon_creation=True,
    tags=["dag01"],
) as dag:
    def greeting():
        import logging
        logging.info("Hello World!")

    hello_python = python_operator.PythonOperator(
        task_id="hello", python_callable=greeting
    )

    goodbye_bash = bash_operator.BashOperator(
        task_id="bye", bash_command="echo Goodbye."
    )
    hello_python >> goodbye_bash


EOF

 


-- 6. DAGファイルをGCSにアップロードする


gcloud composer environments storage dags list \
--environment=env01 \
--location=asia-northeast1

gcloud composer environments storage dags import \
--environment=env01 \
--location=asia-northeast1 \
--source=./test

 

-- 7. 動作確認

Airflow webサーバーをクリック

 

gcloud composer environments run env01 dags trigger \
--location=asia-northeast1 \
-- dag01


-- 8. データベース接続パラメータとデータベース エンドポイントのアドレスを取得する

 

Airflow ウェブ インターフェースで、[管理者] > [構成] 
sql_alchemy_conn パラメータ

 

postgresql+psycopg2://root:password@airflow-sqlproxy-service.composer-system.svc.cluster.local:3306/composer-2-4-6-airflow-2-6-3-aaaaaaaa

 

 

[サービスの詳細] ページで、
airflow-sqlproxy-service
 [処理元の Pod] の [エンドポイント] 


10.101.128.123

 

-- 9. VMインスタンスを作成

ゾーン → クラスタと同じゾーン
クラスタの gke-services IP 範囲にエイリアスIPを作成

 

gcloud compute instances create vm01 \
--zone=asia-northeast1-c \
--machine-type=e2-micro \
--network-interface=aliases=gke-asia-northeast1-env01-aaaaaaaa-gke-services-bbbbbbbb:10.102.0.0/22,network-tier=PREMIUM,stack-type=IPV4_ONLY,subnet=default
--image-family debian-11 \
--image-project debian-cloud \
--boot-disk-size 10 \
--boot-disk-type pd-standard \
--boot-disk-device-name vm01

 

 

gcloud compute instances list

 

-- 10. VM インスタンスに接続し、SQL クライアント パッケージをインストールする


gcloud compute ssh vm01 \
--zone=asia-northeast1-c

 

sudo apt-get update
sudo apt-get install postgresql-client

 

-- 11. Airflow データベースに接続する


psql --user=root --password \
--host=10.101.128.123 \
--port=3306 \
composer-2-4-6-airflow-2-6-3-aaaaaaaa


-- 12. SQL クエリを実行する
\x 1
\pset pager 0

select version();


SELECT * FROM dag LIMIT 10;

\q

-- 13. データベースダンプを取得


pg_dump --user=root --password \
--host=10.101.128.123 \
--port=3306 \
composer-2-4-6-airflow-2-6-3-aaaaaaaa > dump.sql


exit

 

gcloud compute scp vm01:~/dump.sql . \
--zone=asia-northeast1-c

 


主なテーブル
dag
dag_code
dag_run


select * from dag;
select * from dag_code;
select * from dag_run where dag_id = 'dag01';

 


-- 14. クリーンアップ

gcloud compute instances delete vm01 --zone asia-northeast1-c --quiet

 

gcloud composer environments delete env01 \
--location asia-northeast1 \
--quiet

gcloud composer environments list \
--locations asia-northeast1

 

gcloud projects list

gcloud projects delete project01-9999999 \
--quiet


gcloud beta billing projects unlink project01-9999999