{GCP Cloud Composer}Google Cloud でデータ分析 DAG を実行する

 

https://cloud.google.com/composer/docs/composer-2/run-data-analytics-dag-googlecloud?hl=ja


Airflow バージョン: 2.6.3
Composer バージョン: 2.4.6

 

-- 1. 前作業

gcloud init
gcloud auth list

gcloud --version

gcloud projects create project01-9999999 \
--name="project01"

gcloud config list
gcloud config set project project01-9999999
gcloud config set compute/region us-central1 --quiet
gcloud config set compute/zone us-central1-a --quiet

gcloud beta billing accounts list
gcloud beta billing projects link project01-9999999 --billing-account=111111-111111-111111

gcloud services enable compute.googleapis.com --project project01-9999999

gcloud components update


-- 2. Dataproc, Cloud Composer, BigQuery, Cloud Storage API  の有効化

gcloud services list --enabled


gcloud services enable \
dataproc.googleapis.com \
composer.googleapis.com \
bigquery.googleapis.com \
storage.googleapis.com \
--project project01-9999999


-- 3. サービス アカウントへ権限付与

gcloud projects get-iam-policy project01-9999999

service-00000000000@cloudcomposer-accounts.iam.gserviceaccount.com
へ権限付与

gcloud projects add-iam-policy-binding project01-9999999 \
--member="serviceAccount:service-00000000000@cloudcomposer-accounts.iam.gserviceaccount.com" \
--role="roles/owner"


-- 4. Cloud Composer 環境の作成


gcloud composer environments create env01 \
--location us-central1 \
--image-version composer-2.4.6-airflow-2.6.3

 

GKEやバケットが作成される
30分程度かかる

 

gcloud composer environments list \
--locations us-central1

gcloud composer environments describe env01 \
--location us-central1

 

 

-- 5. BigQuery データセットを作成


bq \
--location=US mk \
--dataset \
project01-9999999:holiday_weather

bq ls


-- 6. バケットの作成

リクエストで指定されていない限り、
バケットは、US マルチリージョンで作成され、デフォルトのストレージ クラスは Standard Storage になります。

gcloud storage buckets create gs://bucket123 \
--default-storage-class=Standard \
--no-enable-autoclass \
--location=us \
--public-access-prevention \
--uniform-bucket-level-access

gcloud storage ls

 

-- 7. 限定公開の Google アクセス有効化

gcloud compute networks subnets update default \
--region us-central1 \
--enable-private-ip-google-access

 

 


-- 8. サポート ファイルを Cloud Storage にアップロードする


vim data_analytics_process.py

import sys

from py4j.protocol import Py4JJavaError
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

if __name__ == "__main__":
    BUCKET_NAME = sys.argv[1]
    READ_TABLE = sys.argv[2]
    WRITE_TABLE = sys.argv[3]

    # Create a SparkSession, viewable via the Spark UI
    spark = SparkSession.builder.appName("data_processing").getOrCreate()

    # Load data into dataframe if READ_TABLE exists
    try:
        df = spark.read.format("bigquery").load(READ_TABLE)
    except Py4JJavaError as e:
        raise Exception(f"Error reading {READ_TABLE}") from e

    # Convert temperature from tenths of a degree in celsius to degrees celsius
    df = df.withColumn("value", col("value") / 10)
    # Display sample of rows
    df.show(n=20)

    # Write results to GCS
    if "--dry-run" in sys.argv:
        print("Data will not be uploaded to BigQuery")
    else:
        # Set GCS temp location
        temp_path = BUCKET_NAME

        # Saving the data to BigQuery using the "indirect path" method and the spark-bigquery connector
        # Uses the "overwrite" SaveMode to ensure DAG doesn't fail when being re-run
        # See https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#save-modes
        # for other save mode options
        df.write.format("bigquery").option("temporaryGcsBucket", temp_path).mode(
            "overwrite"
        ).save(WRITE_TABLE)
        print("Data written to BigQuery")


vim holidays.csv

Date,Holiday
2016-10-10,Columbus Day
2017-1-20,Washington’s Birthday
2006-2-20,Washington’s Birthday
2001-5-28,Memorial Day
2014-1-20,"Birthday of Martin Luther King, Jr."
2008-10-13,Columbus Day
2002-9-2,Labor Day
2019-7-4,Independence Day
2012-11-12,Veterans Day
2008-1-1,New Year’s Day
2000-2-21,Washington’s Birthday
1998-12-25,Christmas Day
1998-1-1,New Year’s Day
2019-12-25,Christmas Day
2004-1-1,New Year's Day
2014-5-26,Memorial Day
2008-12-25,Christmas Day
2007-7-4,Independence Day
2021-7-5,Independence Day
2005-7-4,Independence Day
2012-10-8,Columbus Day
1999-12-31,New Year’s Day
1999-9-6,Labor Day
1999-12-24,Christmas Day
2004-11-11,Veterans Day
2016-2-15,Washington’s Birthday
2015-10-12,Columbus Day
2015-7-3,Independence Day
2008-1-21,"Birthday of Martin Luther King, Jr."
2007-10-8,Columbus Day
2007-2-19,Washington’s Birthday
2005-11-24,Thanksgiving Day
2000-12-25,Christmas Day
2004-1-19,"Birthday of Martin Luther King, Jr."
2011-11-14,Thanksgiving Day
2020-1-20,"Birthday of Martin Luther King, Jr."
2016-9-5,Labor Day
1997-10-13,Columbus Day
2003-11-27,Thanksgiving Day
2018-5-28,Memorial Day
2003-10-13,Columbus Day
2018-1-1,New Year’s Day
2006-10-9,Columbus Day
2005-12-26,Christmas Day
2017-1-2,New Year’s Day
1997-5-26,Memorial Day
2009-10-12,Columbus Day
2008-2-18,Washington’s Birthday
2008-11-11,Veterans Day
2001-7-4,Independence Day
2009-1-19,"Birthday of Martin Luther King, Jr."
2008-9-1,Labor Day
2010-11-11,Veterans Day
2015-1-19,"Birthday of Martin Luther King, Jr."
2004-5-31,Memorial Day
1998-1-19,"Birthday of Martin Luther King, Jr."
2016-1-18,"Birthday of Martin Luther King, Jr."
2013-1-1,New Year’s Day
2010-12-31,New Year’s Day
2008-5-26,Memorial Day
2011-10-10,Columbus Day
2005-1-17,"Birthday of Martin Luther King, Jr."
2004-2-16,Washington's Birthday
2013-12-25,Christmas Day
2012-12-25,Christmas Day
2003-1-20,"Birthday of Martin Luther King, Jr."
2002-2-18,Washington’s Birthday
1999-7-5,Independence Day
2019-11-28,Thanksgiving Day
2015-1-1,New Year’s Day
2010-9-6,Labor Day
2016-7-4,Independence Day
2009-1-1,New Year’s Day
2000-9-4,Labor Day
2000-5-29,Memorial Day
2004-10-11,Columbus Day
2020-11-26,Thanksgiving Day
2006-5-29,Memorial Day
2005-9-5,Labor Day
2001-9-3,Labor Day
2019-11-11,Veterans Day
1998-10-12,Columbus Day
2013-10-14,Columbus Day
2007-1-15,"Birthday of Martin Luther King, Jr."
2007-12-25,Christmas Day
2016-1-1,New Year’s Day
2017-12-23,Thanksgiving Day
2011-7-4,Independence Day
2005-5-30,Memorial Day
2004-11-25,Thanksgiving Day
2016-11-24,Thanksgiving Day
2003-11-11,Veterans Day
2011-11-11,Veterans Day
2000-7-4,Independence Day
2017-12-25,Christmas Day
2020-9-7,Labor Day
2013-2-18,Washington’s Birthday
1998-2-16,Washington’s Birthday
2013-7-4,Independence Day
2005-2-21,Washington’s Birthday
2009-2-16,Washington’s Birthday
2016-11-11,Veterans Day
1998-5-25,Memorial Day
2001-11-12,Veterans Day
2000-1-17,"Birthday of Martin Luther King, Jr."
1997-11-27,Thanksgiving Day
2018-11-12,Veterans Day
2010-12-24,Christmas Day
2014-11-11,Veterans Day
2001-1-15,"Birthday of Martin Luther King, Jr."
1997-1-1,New Year’s Day
2015-12-25,Christmas Day
2020-1-1,New Year’s Day
2021-1-20,Inauguation Day
2020-2-17,Washington’s Birthday
2004-7-5,Independence Day
2019-9-2,Labor Day
2004-12-31,New Year’s Day
2005-11-11,Veterans Day
1998-9-7,Labor Day
2015-5-25,Memorial Day
1999-11-11,Veterans Day
2009-11-26,Thanksgiving Day
2018-1-15,"Birthday of Martin Luther King, Jr."
2010-10-11,Columbus Day
2006-7-4,Independence Day
2006-1-2,New Year’s Day
2010-11-25,Thanksgiving Day
2007-11-22,Thanksgiving Day
2001-10-8,Columbus Day
1997-7-4,Independence Day
1997-2-17,Washington’s Birthday
2000-11-10,Veterans Day
1999-1-1,New Year’s Day
2015-9-7,Labor Day
2002-1-21,"Birthday of Martin Luther King, Jr."
2019-10-14,Columbus Day
2014-1-1,New Year’s Day
2019-1-21,"Birthday of Martin Luther King, Jr."
2002-11-28,Thanksgiving Day
1998-11-26,Thanksgiving Day
2010-7-5,Independence Day
1997-1-20,"Birthday of Martin Luther King, Jr."
1997-12-25,Christmas Day
2021-2-15,Washington’s Birthday
2017-7-4,Independence Day
2003-1-1,New Year’s Day
2006-11-10,Veterans Day
2012-11-22,Thanksgiving Day
2009-11-11,Veterans Day
2004-12-24,Christmas Day
2003-2-17,Washington’s Birthday
2000-10-9,Columbus Day
2009-5-25,Memorial Day
2021-10-11,Columbus Day
2008-7-4,Independence Day
2021-11-25,Thanksgiving Day
2013-1-20,Inauguration Day
2020-7-3,Independence Day
2019-1-1,New Year’s Day
2009-7-3,Independence Day
2010-5-31,Memorial Day
2006-1-16,"Birthday of Martin Luther King, Jr."
2002-1-1,New Year’s Day
2007-1-1,New Year’s Day
2021-11-11,Veterans Day
2008-11-27,Thanksgiving Day
2014-11-27,Thanksgiving Day
2017-10-9,Columbus Day
2003-7-4,Independence Day
1998-7-3,Independence Day
2009-9-7,Labor Day
2011-12-26,Christmas Day
2018-12-25,Christmas Day
2006-12-25,Christmas Day
2014-12-25,Christmas Day
2017-1-16,"Birthday of Martin Luther King, Jr."
2017-9-4,Labor Day
2016-12-26,Christmas Day
2011-5-30,Memorial Day
2017-5-29,Memorial Day
2002-10-14,Columbus Day
2014-7-4,Independence Day
2015-2-16,Washington’s Birthday
2005-10-10,Columbus Day
2011-9-5,Labor Day
2012-7-4,Independence Day
2005-1-20,Inauguration Day
1999-2-15,Washington’s Birthday
2002-11-11,Veterans Day
2020-10-12,Columbus Day
2004-9-6,Labor Day
1999-5-31,Memorial Day
2007-5-28,Memorial Day
2020-5-25,Memorial Day
2013-1-21,"Birthday of Martin Luther King, Jr."
1999-11-25,Thanksgiving Day
2001-1-20,Inauguration Day
2014-2-17,Washington’s Birthday
2003-9-1,Labor Day
2003-5-26,Memorial Day
2021-12-25,Christmas Day
2003-12-25,Christmas Day
2011-1-17,"Birthday of Martin Luther King, Jr."
2014-9-1,Labor Day
2021-9-6,Labor Day
1997-9-1,Labor Day
2006-11-23,Thanksgiving Day
2012-1-16,"Birthday of Martin Luther King, Jr."
2013-9-2,Labor Day
2009-1-20,Inauguration Day
1998-11-11,Veterans Day
2002-5-27,Memorial Day
2013-11-11,Veterans Day
2015-11-26,Thanksgiving Day
2010-1-1,New Year’s Day
2018-9-3,Labor Day
2009-12-25,Christmas Day
2018-7-4,Independence Day
2012-1-2,New Year's Day
2002-7-4,Independence Day
2020-12-25,Christmas Day
2020-11-11,Veterans Day
2012-5-28,Memorial Day
2021-1-1,New Year’s Day
2002-12-25,Christmas Day
2014-10-13,Columbus Day
2007-11-12,Veterans Day
2018-11-22,Thanksgiving Day
2006-9-4,Labor Day
1999-10-11,Columbus Day
2021-5-31,Memorial Day
2001-2-19,Washington’s Birthday
1999-1-8,"Birthday of Martin Luther King, Jr."
2021-1-18,"Birthday of Martin Luther King, Jr."
2017-1-20,Inauguration Day
2013-11-28,Thanksgiving Day
2007-9-3,Labor Day
2012-2-20,Washington's Birthday
2018-2-19,Washington’s Birthday
2012-9-3,Labor Day
2017-11-10,Veterans Day
2016-5-30,Memorial Day
2010-2-15,Washington’s Birthday
2001-12-25,Christmas Day
2001-1-1,New Year’s Day
2000-11-23,Thanksgiving Day
1997-11-11,Veterans Day
2018-10-8,Columbus Day
2010-1-18,"Birthday of Martin Luther King, Jr."
2019-5-27,Memorial Day
2019-2-18,Washington’s Birthday
2001-11-22,Thanksgiving Day
2011-2-21,Washington’s Birthday
2015-11-11,Veterans Day
2013-5-27,Memorial Day

 


gcloud storage cp data_analytics_process.py gs://bucket123
gcloud storage cp holidays.csv gs://bucket123

gcloud storage ls gs://bucket123/*


-- 9. Airflow UI を使用して変数を追加する


gcloud composer environments run env01 variables set \
--location=us-central1 \
-- gcp_project project01-9999999

gcloud composer environments run env01 variables set \
--location=us-central1 \
-- gcs_bucket bucket123

gcloud composer environments run env01 variables set \
--location=us-central1 \
-- gce_region us-central1

gcloud composer environments run env01 variables set \
--location=us-central1 \
-- dataproc_service_account 00000000000-compute@developer.gserviceaccount.com

gcloud composer environments run env01 variables list \
--location=us-central1

 

-- 10. DAG を環境のバケットにアップロードする

vim data_analytics_dag.py

import datetime

from airflow import models
from airflow.providers.google.cloud.operators import dataproc
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import (
    GCSToBigQueryOperator,
)
from airflow.utils.task_group import TaskGroup

PROJECT_NAME = "{{var.value.gcp_project}}"

# BigQuery configs
BQ_DESTINATION_DATASET_NAME = "holiday_weather"
BQ_DESTINATION_TABLE_NAME = "holidays_weather_joined"
BQ_NORMALIZED_TABLE_NAME = "holidays_weather_normalized"

# Dataproc configs
BUCKET_NAME = "{{var.value.gcs_bucket}}"
PROCESSING_PYTHON_FILE = f"gs://{BUCKET_NAME}/data_analytics_process.py"

BATCH_ID = "data-processing-{{ ts_nodash | lower}}"  # Dataproc serverless only allows lowercase characters
BATCH_CONFIG = {
    "runtime_config": {"version": "1.1"},
    "pyspark_batch": {
        "main_python_file_uri": PROCESSING_PYTHON_FILE,
        "args": [
            BUCKET_NAME,
            f"{BQ_DESTINATION_DATASET_NAME}.{BQ_DESTINATION_TABLE_NAME}",
            f"{BQ_DESTINATION_DATASET_NAME}.{BQ_NORMALIZED_TABLE_NAME}",
        ],
    },
    "environment_config": {
        "execution_config": {
            "service_account": "{{var.value.dataproc_service_account}}"
        }
    },
}

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()
)

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    "start_date": yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    "email_on_failure": False,
    "email_on_retry": False,
}

with models.DAG(
    "data_analytics_dag",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:
    create_batch = dataproc.DataprocCreateBatchOperator(
        task_id="create_batch",
        project_id=PROJECT_NAME,
        region="{{ var.value.gce_region }}",
        batch=BATCH_CONFIG,
        batch_id=BATCH_ID,
    )
    # This data is static and it is safe to use WRITE_TRUNCATE
    # to reduce chance of 409 duplicate errors
    load_external_dataset = GCSToBigQueryOperator(
        task_id="run_bq_external_ingestion",
        bucket=BUCKET_NAME,
        source_objects=["holidays.csv"],
        destination_project_dataset_table=f"{BQ_DESTINATION_DATASET_NAME}.holidays",
        source_format="CSV",
        schema_fields=[
            {"name": "Date", "type": "DATE"},
            {"name": "Holiday", "type": "STRING"},
        ],
        skip_leading_rows=1,
        write_disposition="WRITE_TRUNCATE",
    )

    with TaskGroup("join_bq_datasets") as bq_join_group:
        for year in range(2019, 2022):
            # BigQuery configs
            BQ_DATASET_NAME = f"bigquery-public-data.ghcn_d.ghcnd_{str(year)}"
            BQ_DESTINATION_TABLE_NAME = "holidays_weather_joined"
            # Specifically query a Chicago weather station
            WEATHER_HOLIDAYS_JOIN_QUERY = f"""
            SELECT Holidays.Date, Holiday, id, element, value
            FROM `{PROJECT_NAME}.holiday_weather.holidays` AS Holidays
            JOIN (SELECT id, date, element, value FROM {BQ_DATASET_NAME} AS Table WHERE Table.element="TMAX" AND Table.id="USW00094846") AS Weather
            ON Holidays.Date = Weather.Date;
            """

            # for demo purposes we are using WRITE_APPEND
            # but if you run the DAG repeatedly it will continue to append
            # Your use case may be different, see the Job docs
            # https://cloud.google.com/bigquery/docs/reference/rest/v2/Job
            # for alternative values for the writeDisposition
            # or consider using partitioned tables
            # https://cloud.google.com/bigquery/docs/partitioned-tables
            bq_join_holidays_weather_data = BigQueryInsertJobOperator(
                task_id=f"bq_join_holidays_weather_data_{str(year)}",
                configuration={
                    "query": {
                        "query": WEATHER_HOLIDAYS_JOIN_QUERY,
                        "useLegacySql": False,
                        "destinationTable": {
                            "projectId": PROJECT_NAME,
                            "datasetId": BQ_DESTINATION_DATASET_NAME,
                            "tableId": BQ_DESTINATION_TABLE_NAME,
                        },
                        "writeDisposition": "WRITE_APPEND",
                    }
                },
                location="US",
            )

        load_external_dataset >> bq_join_group >> create_batch

 

gcloud composer environments storage dags list \
--environment=env01 \
--location=us-central1

gcloud composer environments storage dags import \
--environment=env01 \
--location=us-central1 \
--source=data_analytics_dag.py

 

 

-- 11. DAG をトリガーする


Airflow webサーバーをクリック

アップロードしたDAGが表示されるまで待つ

 

gcloud composer environments run env01 dags trigger \
--location=us-central1 \
-- data_analytics_dag


-- 12. DAG の成功を検証する

bq query \
--use_legacy_sql=false \
'SELECT * from holiday_weather.holidays_weather_joined limit 10;'


bq query \
--use_legacy_sql=false \
'SELECT * from holiday_weather.holidays_weather_normalized limit 10;'

 

-- 13. クリーンアップ


gcloud projects list

gcloud projects delete project01-9999999 \
--quiet