https://cloud.google.com/composer/docs/composer-2/run-data-analytics-dag-googlecloud?hl=ja
Airflow バージョン: 2.6.3
Composer バージョン: 2.4.6
-- 1. 前作業
gcloud init
gcloud auth list
gcloud --version
gcloud projects create project01-9999999 \
--name="project01"
gcloud config list
gcloud config set project project01-9999999
gcloud config set compute/region us-central1 --quiet
gcloud config set compute/zone us-central1-a --quiet
gcloud beta billing accounts list
gcloud beta billing projects link project01-9999999 --billing-account=111111-111111-111111
gcloud services enable compute.googleapis.com --project project01-9999999
gcloud components update
-- 2. Dataproc, Cloud Composer, BigQuery, Cloud Storage API の有効化
gcloud services list --enabled
gcloud services enable \
dataproc.googleapis.com \
composer.googleapis.com \
bigquery.googleapis.com \
storage.googleapis.com \
--project project01-9999999
-- 3. サービス アカウントへ権限付与
gcloud projects get-iam-policy project01-9999999
service-00000000000@cloudcomposer-accounts.iam.gserviceaccount.com
へ権限付与
gcloud projects add-iam-policy-binding project01-9999999 \
--member="serviceAccount:service-00000000000@cloudcomposer-accounts.iam.gserviceaccount.com" \
--role="roles/owner"
-- 4. Cloud Composer 環境の作成
gcloud composer environments create env01 \
--location us-central1 \
--image-version composer-2.4.6-airflow-2.6.3
GKEやバケットが作成される
30分程度かかる
gcloud composer environments list \
--locations us-central1
gcloud composer environments describe env01 \
--location us-central1
-- 5. BigQuery データセットを作成
bq \
--location=US mk \
--dataset \
project01-9999999:holiday_weather
bq ls
-- 6. バケットの作成
リクエストで指定されていない限り、
バケットは、US マルチリージョンで作成され、デフォルトのストレージ クラスは Standard Storage になります。
gcloud storage buckets create gs://bucket123 \
--default-storage-class=Standard \
--no-enable-autoclass \
--location=us \
--public-access-prevention \
--uniform-bucket-level-access
gcloud storage ls
-- 7. 限定公開の Google アクセス有効化
gcloud compute networks subnets update default \
--region us-central1 \
--enable-private-ip-google-access
-- 8. サポート ファイルを Cloud Storage にアップロードする
vim data_analytics_process.py
import sys
from py4j.protocol import Py4JJavaError
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
if __name__ == "__main__":
BUCKET_NAME = sys.argv[1]
READ_TABLE = sys.argv[2]
WRITE_TABLE = sys.argv[3]
# Create a SparkSession, viewable via the Spark UI
spark = SparkSession.builder.appName("data_processing").getOrCreate()
# Load data into dataframe if READ_TABLE exists
try:
df = spark.read.format("bigquery").load(READ_TABLE)
except Py4JJavaError as e:
raise Exception(f"Error reading {READ_TABLE}") from e
# Convert temperature from tenths of a degree in celsius to degrees celsius
df = df.withColumn("value", col("value") / 10)
# Display sample of rows
df.show(n=20)
# Write results to GCS
if "--dry-run" in sys.argv:
print("Data will not be uploaded to BigQuery")
else:
# Set GCS temp location
temp_path = BUCKET_NAME
# Saving the data to BigQuery using the "indirect path" method and the spark-bigquery connector
# Uses the "overwrite" SaveMode to ensure DAG doesn't fail when being re-run
# See https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#save-modes
# for other save mode options
df.write.format("bigquery").option("temporaryGcsBucket", temp_path).mode(
"overwrite"
).save(WRITE_TABLE)
print("Data written to BigQuery")
Date,Holiday
2016-10-10,Columbus Day
2017-1-20,Washington’s Birthday
2006-2-20,Washington’s Birthday
2001-5-28,Memorial Day
2014-1-20,"Birthday of Martin Luther King, Jr."
2008-10-13,Columbus Day
2002-9-2,Labor Day
2019-7-4,Independence Day
2012-11-12,Veterans Day
2008-1-1,New Year’s Day
2000-2-21,Washington’s Birthday
1998-12-25,Christmas Day
1998-1-1,New Year’s Day
2019-12-25,Christmas Day
2004-1-1,New Year's Day
2014-5-26,Memorial Day
2008-12-25,Christmas Day
2007-7-4,Independence Day
2021-7-5,Independence Day
2005-7-4,Independence Day
2012-10-8,Columbus Day
1999-12-31,New Year’s Day
1999-9-6,Labor Day
1999-12-24,Christmas Day
2004-11-11,Veterans Day
2016-2-15,Washington’s Birthday
2015-10-12,Columbus Day
2015-7-3,Independence Day
2008-1-21,"Birthday of Martin Luther King, Jr."
2007-10-8,Columbus Day
2007-2-19,Washington’s Birthday
2005-11-24,Thanksgiving Day
2000-12-25,Christmas Day
2004-1-19,"Birthday of Martin Luther King, Jr."
2011-11-14,Thanksgiving Day
2020-1-20,"Birthday of Martin Luther King, Jr."
2016-9-5,Labor Day
1997-10-13,Columbus Day
2003-11-27,Thanksgiving Day
2018-5-28,Memorial Day
2003-10-13,Columbus Day
2018-1-1,New Year’s Day
2006-10-9,Columbus Day
2005-12-26,Christmas Day
2017-1-2,New Year’s Day
1997-5-26,Memorial Day
2009-10-12,Columbus Day
2008-2-18,Washington’s Birthday
2008-11-11,Veterans Day
2001-7-4,Independence Day
2009-1-19,"Birthday of Martin Luther King, Jr."
2008-9-1,Labor Day
2010-11-11,Veterans Day
2015-1-19,"Birthday of Martin Luther King, Jr."
2004-5-31,Memorial Day
1998-1-19,"Birthday of Martin Luther King, Jr."
2016-1-18,"Birthday of Martin Luther King, Jr."
2013-1-1,New Year’s Day
2010-12-31,New Year’s Day
2008-5-26,Memorial Day
2011-10-10,Columbus Day
2005-1-17,"Birthday of Martin Luther King, Jr."
2004-2-16,Washington's Birthday
2013-12-25,Christmas Day
2012-12-25,Christmas Day
2003-1-20,"Birthday of Martin Luther King, Jr."
2002-2-18,Washington’s Birthday
1999-7-5,Independence Day
2019-11-28,Thanksgiving Day
2015-1-1,New Year’s Day
2010-9-6,Labor Day
2016-7-4,Independence Day
2009-1-1,New Year’s Day
2000-9-4,Labor Day
2000-5-29,Memorial Day
2004-10-11,Columbus Day
2020-11-26,Thanksgiving Day
2006-5-29,Memorial Day
2005-9-5,Labor Day
2001-9-3,Labor Day
2019-11-11,Veterans Day
1998-10-12,Columbus Day
2013-10-14,Columbus Day
2007-1-15,"Birthday of Martin Luther King, Jr."
2007-12-25,Christmas Day
2016-1-1,New Year’s Day
2017-12-23,Thanksgiving Day
2011-7-4,Independence Day
2005-5-30,Memorial Day
2004-11-25,Thanksgiving Day
2016-11-24,Thanksgiving Day
2003-11-11,Veterans Day
2011-11-11,Veterans Day
2000-7-4,Independence Day
2017-12-25,Christmas Day
2020-9-7,Labor Day
2013-2-18,Washington’s Birthday
1998-2-16,Washington’s Birthday
2013-7-4,Independence Day
2005-2-21,Washington’s Birthday
2009-2-16,Washington’s Birthday
2016-11-11,Veterans Day
1998-5-25,Memorial Day
2001-11-12,Veterans Day
2000-1-17,"Birthday of Martin Luther King, Jr."
1997-11-27,Thanksgiving Day
2018-11-12,Veterans Day
2010-12-24,Christmas Day
2014-11-11,Veterans Day
2001-1-15,"Birthday of Martin Luther King, Jr."
1997-1-1,New Year’s Day
2015-12-25,Christmas Day
2020-1-1,New Year’s Day
2021-1-20,Inauguation Day
2020-2-17,Washington’s Birthday
2004-7-5,Independence Day
2019-9-2,Labor Day
2004-12-31,New Year’s Day
2005-11-11,Veterans Day
1998-9-7,Labor Day
2015-5-25,Memorial Day
1999-11-11,Veterans Day
2009-11-26,Thanksgiving Day
2018-1-15,"Birthday of Martin Luther King, Jr."
2010-10-11,Columbus Day
2006-7-4,Independence Day
2006-1-2,New Year’s Day
2010-11-25,Thanksgiving Day
2007-11-22,Thanksgiving Day
2001-10-8,Columbus Day
1997-7-4,Independence Day
1997-2-17,Washington’s Birthday
2000-11-10,Veterans Day
1999-1-1,New Year’s Day
2015-9-7,Labor Day
2002-1-21,"Birthday of Martin Luther King, Jr."
2019-10-14,Columbus Day
2014-1-1,New Year’s Day
2019-1-21,"Birthday of Martin Luther King, Jr."
2002-11-28,Thanksgiving Day
1998-11-26,Thanksgiving Day
2010-7-5,Independence Day
1997-1-20,"Birthday of Martin Luther King, Jr."
1997-12-25,Christmas Day
2021-2-15,Washington’s Birthday
2017-7-4,Independence Day
2003-1-1,New Year’s Day
2006-11-10,Veterans Day
2012-11-22,Thanksgiving Day
2009-11-11,Veterans Day
2004-12-24,Christmas Day
2003-2-17,Washington’s Birthday
2000-10-9,Columbus Day
2009-5-25,Memorial Day
2021-10-11,Columbus Day
2008-7-4,Independence Day
2021-11-25,Thanksgiving Day
2013-1-20,Inauguration Day
2020-7-3,Independence Day
2019-1-1,New Year’s Day
2009-7-3,Independence Day
2010-5-31,Memorial Day
2006-1-16,"Birthday of Martin Luther King, Jr."
2002-1-1,New Year’s Day
2007-1-1,New Year’s Day
2021-11-11,Veterans Day
2008-11-27,Thanksgiving Day
2014-11-27,Thanksgiving Day
2017-10-9,Columbus Day
2003-7-4,Independence Day
1998-7-3,Independence Day
2009-9-7,Labor Day
2011-12-26,Christmas Day
2018-12-25,Christmas Day
2006-12-25,Christmas Day
2014-12-25,Christmas Day
2017-1-16,"Birthday of Martin Luther King, Jr."
2017-9-4,Labor Day
2016-12-26,Christmas Day
2011-5-30,Memorial Day
2017-5-29,Memorial Day
2002-10-14,Columbus Day
2014-7-4,Independence Day
2015-2-16,Washington’s Birthday
2005-10-10,Columbus Day
2011-9-5,Labor Day
2012-7-4,Independence Day
2005-1-20,Inauguration Day
1999-2-15,Washington’s Birthday
2002-11-11,Veterans Day
2020-10-12,Columbus Day
2004-9-6,Labor Day
1999-5-31,Memorial Day
2007-5-28,Memorial Day
2020-5-25,Memorial Day
2013-1-21,"Birthday of Martin Luther King, Jr."
1999-11-25,Thanksgiving Day
2001-1-20,Inauguration Day
2014-2-17,Washington’s Birthday
2003-9-1,Labor Day
2003-5-26,Memorial Day
2021-12-25,Christmas Day
2003-12-25,Christmas Day
2011-1-17,"Birthday of Martin Luther King, Jr."
2014-9-1,Labor Day
2021-9-6,Labor Day
1997-9-1,Labor Day
2006-11-23,Thanksgiving Day
2012-1-16,"Birthday of Martin Luther King, Jr."
2013-9-2,Labor Day
2009-1-20,Inauguration Day
1998-11-11,Veterans Day
2002-5-27,Memorial Day
2013-11-11,Veterans Day
2015-11-26,Thanksgiving Day
2010-1-1,New Year’s Day
2018-9-3,Labor Day
2009-12-25,Christmas Day
2018-7-4,Independence Day
2012-1-2,New Year's Day
2002-7-4,Independence Day
2020-12-25,Christmas Day
2020-11-11,Veterans Day
2012-5-28,Memorial Day
2021-1-1,New Year’s Day
2002-12-25,Christmas Day
2014-10-13,Columbus Day
2007-11-12,Veterans Day
2018-11-22,Thanksgiving Day
2006-9-4,Labor Day
1999-10-11,Columbus Day
2021-5-31,Memorial Day
2001-2-19,Washington’s Birthday
1999-1-8,"Birthday of Martin Luther King, Jr."
2021-1-18,"Birthday of Martin Luther King, Jr."
2017-1-20,Inauguration Day
2013-11-28,Thanksgiving Day
2007-9-3,Labor Day
2012-2-20,Washington's Birthday
2018-2-19,Washington’s Birthday
2012-9-3,Labor Day
2017-11-10,Veterans Day
2016-5-30,Memorial Day
2010-2-15,Washington’s Birthday
2001-12-25,Christmas Day
2001-1-1,New Year’s Day
2000-11-23,Thanksgiving Day
1997-11-11,Veterans Day
2018-10-8,Columbus Day
2010-1-18,"Birthday of Martin Luther King, Jr."
2019-5-27,Memorial Day
2019-2-18,Washington’s Birthday
2001-11-22,Thanksgiving Day
2011-2-21,Washington’s Birthday
2015-11-11,Veterans Day
2013-5-27,Memorial Day
gcloud storage cp data_analytics_process.py gs://bucket123
gcloud storage cp holidays.csv gs://bucket123
gcloud storage ls gs://bucket123/*
-- 9. Airflow UI を使用して変数を追加する
gcloud composer environments run env01 variables set \
--location=us-central1 \
-- gcp_project project01-9999999
gcloud composer environments run env01 variables set \
--location=us-central1 \
-- gcs_bucket bucket123
gcloud composer environments run env01 variables set \
--location=us-central1 \
-- gce_region us-central1
gcloud composer environments run env01 variables set \
--location=us-central1 \
-- dataproc_service_account 00000000000-compute@developer.gserviceaccount.com
gcloud composer environments run env01 variables list \
--location=us-central1
-- 10. DAG を環境のバケットにアップロードする
vim data_analytics_dag.py
import datetime
from airflow import models
from airflow.providers.google.cloud.operators import dataproc
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import (
GCSToBigQueryOperator,
)
from airflow.utils.task_group import TaskGroup
PROJECT_NAME = "{{var.value.gcp_project}}"
# BigQuery configs
BQ_DESTINATION_DATASET_NAME = "holiday_weather"
BQ_DESTINATION_TABLE_NAME = "holidays_weather_joined"
BQ_NORMALIZED_TABLE_NAME = "holidays_weather_normalized"
# Dataproc configs
BUCKET_NAME = "{{var.value.gcs_bucket}}"
PROCESSING_PYTHON_FILE = f"gs://{BUCKET_NAME}/data_analytics_process.py"
BATCH_ID = "data-processing-{{ ts_nodash | lower}}" # Dataproc serverless only allows lowercase characters
BATCH_CONFIG = {
"runtime_config": {"version": "1.1"},
"pyspark_batch": {
"main_python_file_uri": PROCESSING_PYTHON_FILE,
"args": [
BUCKET_NAME,
f"{BQ_DESTINATION_DATASET_NAME}.{BQ_DESTINATION_TABLE_NAME}",
f"{BQ_DESTINATION_DATASET_NAME}.{BQ_NORMALIZED_TABLE_NAME}",
],
},
"environment_config": {
"execution_config": {
"service_account": "{{var.value.dataproc_service_account}}"
}
},
}
yesterday = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()
)
default_dag_args = {
# Setting start date as yesterday starts the DAG immediately when it is
# detected in the Cloud Storage bucket.
"start_date": yesterday,
# To email on failure or retry set 'email' arg to your email and enable
# emailing here.
"email_on_failure": False,
"email_on_retry": False,
}
with models.DAG(
"data_analytics_dag",
# Continue to run DAG once per day
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args,
) as dag:
create_batch = dataproc.DataprocCreateBatchOperator(
task_id="create_batch",
project_id=PROJECT_NAME,
region="{{ var.value.gce_region }}",
batch=BATCH_CONFIG,
batch_id=BATCH_ID,
)
# This data is static and it is safe to use WRITE_TRUNCATE
# to reduce chance of 409 duplicate errors
load_external_dataset = GCSToBigQueryOperator(
task_id="run_bq_external_ingestion",
bucket=BUCKET_NAME,
source_objects=["holidays.csv"],
destination_project_dataset_table=f"{BQ_DESTINATION_DATASET_NAME}.holidays",
source_format="CSV",
schema_fields=[
{"name": "Date", "type": "DATE"},
{"name": "Holiday", "type": "STRING"},
],
skip_leading_rows=1,
write_disposition="WRITE_TRUNCATE",
)
with TaskGroup("join_bq_datasets") as bq_join_group:
for year in range(2019, 2022):
# BigQuery configs
BQ_DATASET_NAME = f"bigquery-public-data.ghcn_d.ghcnd_{str(year)}"
BQ_DESTINATION_TABLE_NAME = "holidays_weather_joined"
# Specifically query a Chicago weather station
WEATHER_HOLIDAYS_JOIN_QUERY = f"""
SELECT Holidays.Date, Holiday, id, element, value
FROM `{PROJECT_NAME}.holiday_weather.holidays` AS Holidays
JOIN (SELECT id, date, element, value FROM {BQ_DATASET_NAME} AS Table WHERE Table.element="TMAX" AND Table.id="USW00094846") AS Weather
ON Holidays.Date = Weather.Date;
"""
# for demo purposes we are using WRITE_APPEND
# but if you run the DAG repeatedly it will continue to append
# Your use case may be different, see the Job docs
# https://cloud.google.com/bigquery/docs/reference/rest/v2/Job
# for alternative values for the writeDisposition
# or consider using partitioned tables
# https://cloud.google.com/bigquery/docs/partitioned-tables
bq_join_holidays_weather_data = BigQueryInsertJobOperator(
task_id=f"bq_join_holidays_weather_data_{str(year)}",
configuration={
"query": {
"query": WEATHER_HOLIDAYS_JOIN_QUERY,
"useLegacySql": False,
"destinationTable": {
"projectId": PROJECT_NAME,
"datasetId": BQ_DESTINATION_DATASET_NAME,
"tableId": BQ_DESTINATION_TABLE_NAME,
},
"writeDisposition": "WRITE_APPEND",
}
},
location="US",
)
load_external_dataset >> bq_join_group >> create_batch
gcloud composer environments storage dags list \
--environment=env01 \
--location=us-central1
gcloud composer environments storage dags import \
--environment=env01 \
--location=us-central1 \
--source=data_analytics_dag.py
-- 11. DAG をトリガーする
Airflow webサーバーをクリック
アップロードしたDAGが表示されるまで待つ
gcloud composer environments run env01 dags trigger \
--location=us-central1 \
-- data_analytics_dag
-- 12. DAG の成功を検証する
bq query \
--use_legacy_sql=false \
'SELECT * from holiday_weather.holidays_weather_joined limit 10;'
bq query \
--use_legacy_sql=false \
'SELECT * from holiday_weather.holidays_weather_normalized limit 10;'
-- 13. クリーンアップ
gcloud projects list
gcloud projects delete project01-9999999 \
--quiet