{GCP Dataflow}Pub/Sub から BigQuery へのストリーミング

 

https://cloud.google.com/dataflow/docs/tutorials/dataflow-stream-to-bigquery?hl=ja

https://marketingengineercareer.com/google-cloud-dataflow

https://cloud.google.com/bigquery/docs/creating-partitioned-tables?hl=ja#bq

https://cloud.google.com/sdk/gcloud/reference/dataflow/jobs/run


Pub/Sub -> Storage -> Dataflow -> Bigquery


asia-northeast1で実施したところ、ジョブがゾーンのリソース不足エラーとなったので、
us-central1
で実施する

 

-- 1. 前作業

gcloud init
gcloud auth list

gcloud --version

gcloud projects create project01-9999999 \
--name="project01"

gcloud config list
gcloud config set project project01-9999999
gcloud config set compute/region us-central1 --quiet
gcloud config set compute/zone us-central1-a --quiet

gcloud beta billing accounts list
gcloud beta billing projects link project01-9999999 --billing-account=111111-111111-111111

gcloud services enable compute.googleapis.com --project project01-9999999

gcloud components update

gcloud auth application-default login


-- 2. APIの有効化


gcloud services list --enabled
gcloud services list --available

gcloud services enable dataflow.googleapis.com
gcloud services enable logging.googleapis.com
gcloud services enable storage-api.googleapis.com
gcloud services enable pubsub.googleapis.com
gcloud services enable cloudresourcemanager.googleapis.com
gcloud services enable cloudscheduler.googleapis.com

 


-- 3. トピックの作成

gcloud pubsub topics create topic01

gcloud pubsub topics list


-- 4. サブスクリプションの作成

gcloud pubsub subscriptions create ss01 \
--topic=topic01

gcloud pubsub subscriptions list

 


-- 5. Cloud Storage バケットを作成する

gcloud storage buckets create gs://bucket123 \
--default-storage-class=Standard \
--no-enable-autoclass \
--location=us-central1 \
--public-access-prevention \
--uniform-bucket-level-access

gcloud storage ls


-- 6. Bigqueryデータセットを作成する

bq mk ds01

bq ls


-- 7. Bigqueryデータセットにテーブルを作成する


bq mk \
--table \
--schema 'col1:DATE,col2:STRING' \
--time_partitioning_field col1 \
--time_partitioning_type DAY \
ds01.tab1

bq ls ds01
bq show ds01.tab1


-- 8. Dataflowを起動する

※staging-location : 一時ファイルを書き込むためのパスとファイル名の接頭辞。例: gs://your-bucket/temp


gcloud dataflow jobs run job01 \
--gcs-location gs://dataflow-templates-us-central1/latest/PubSub_to_BigQuery \
--region us-central1 \
--staging-location gs://bucket123/temp \
--parameters inputTopic=projects/project01-9999999/topics/topic01,outputTableSpec=project01-9999999:ds01.tab1


gcloud dataflow jobs list \
--region us-central1


gcloud dataflow jobs describe 2023-01-02_19_03_42-111111111111111111 \
--region us-central1

 


-- 9. データを手動でPublish

gcloud pubsub topics publish topic01 \
--message='{"col1":"2022-01-01","col2":"val2"}'


しばらく待つ


-- 10. BigQueryにデータが入っているか確認する

bq query --use_legacy_sql=false \
'select * from ds01.tab1;'

 

 

-- 11. クリーンアップ

gcloud dataflow jobs list \
--region us-central1


gcloud dataflow jobs cancel 2023-01-02_19_03_42-111111111111111111


bq rm --recursive=true --force ds01
bq ls


gcloud storage rm gs://bucket123 --recursive
gcloud storage ls

gcloud pubsub subscriptions delete ss01

gcloud pubsub topics delete topic01

 

gcloud projects list
gcloud projects delete project01-9999999 \
--quiet