https://www.youtube.com/watch?v=pGZ5v7OMqhM
https://learn.microsoft.com/ja-jp/cli/azure/datafactory?view=azure-cli-latest
https://learn.microsoft.com/ja-jp/azure/data-factory/tutorial-run-existing-pipeline-with-airflow
https://learn.microsoft.com/ja-jp/azure/role-based-access-control/role-assignments-list-cli
-- 1. 前作業
az login --use-device-code
az account show
az version
az configure --list-defaults
az configure --defaults location=eastus
az configure --list-defaults
az group create \
--name rg9999999 \
--location eastus
az group list
az upgrade
-- 2. Azure Data Factory パイプラインを作成する
az datafactory create \
--location "East US" \
--name df123 \
--resource-group rg9999999
az datafactory list \
--resource-group rg9999999
az datafactory show \
--name df123 \
--resource-group rg9999999
az datafactory pipeline create \
--factory-name df123 \
--name pp123 \
--resource-group rg9999999 \
--pipeline "{\"activities\": [{\"additionalProperties\": null,\"dependsOn\": ,\"description\": null,\"name\": \"Wait1\",\"type\": \"Wait\",\"userProperties\": ,\"waitTimeInSeconds\": 1}]}"
az datafactory pipeline list \
--factory-name df123 \
--resource-group rg9999999
az datafactory pipeline show \
--factory-name df123 \
--name pp123 \
--resource-group rg9999999
-- 3. サービス プリンシパルを作成する
※Azure Active Directoryは下記に名前変更になった
Microsoft Entra ID
az account show \
--query id \
--output tsv
az ad sp create-for-rbac \
--name sp123 \
--role Contributor \
--scopes /subscriptions/11111111-1111-1111-1111-111111111111/resourceGroups/rg9999999/providers/Microsoft.DataFactory/factories/df123
出力されるappId、password、tenantをメモする
az ad sp list --spn 22222222-2222-2222-2222-222222222222
※ spnはappIdを指定
az ad sp show --id 44444444-4444-4444-4444-444444444444
サービスプリンシパルのロール割り当て確認
az role assignment list --all --assignee 44444444-4444-4444-4444-444444444444 \
--output json --query '[].{principalName:principalName, roleDefinitionName:roleDefinitionName, scope:scope}'
-- 4. Azure ストレージ アカウント を作成する
az storage account create \
--resource-group rg9999999 \
--name st123 \
--access-tier Hot \
--kind StorageV2 \
--sku Standard_LRS
az storage account list \
--resource-group rg9999999
az storage account show \
--resource-group rg9999999 \
--name st123
key=$(az storage account keys list \
--account-name st123 \
--resource-group rg9999999 \
--output json \
--query [0].value | tr -d '"')
echo $key
az storage container create \
--name dags \
--account-name st123 \
--public-access off \
--account-key $key
-- 5. Airflowコンソール起動
Data Factory Studioを起動
左ペインの管理
中ペインの一番下にあるApache Airflowを選択
通気環境の作成
名前: airflow01
Airflow認証の種類: 基本
Airflow ユーザー名: airflow
Airflow のパスワード: password
コンピューティング サイズ: 小
作成
状態が実行中になるまで待つ
実行中になったら風車の右のモニターアイコンを選択してログイン
-- 6. コネクション作成
Admin
Connections
+アイコン押下
Connection ID: con01
Connection Type: Azure Data Factory
Client ID: サービスプリンシパルのappId
Secret: サービスプリンシパルのpassword
Tenant ID: サービスプリンシパルのtenant
Subsription ID: 11111111-1111-1111-1111-111111111111
Resource Group Name: rg9999999
Factory Name: df123
TEST
SAVE
-- 7. ダグファイルのアップロード
vim a.py
----
from datetime import datetime, timedelta
from airflow.models import DAG, BaseOperator
try:
from airflow.operators.empty import EmptyOperator
except ModuleNotFoundError:
from airflow.operators.dummy import DummyOperator as EmptyOperator # type: ignore
from airflow.providers.microsoft.azure.operators.data_factory import AzureDataFactoryRunPipelineOperator
from airflow.providers.microsoft.azure.sensors.data_factory import AzureDataFactoryPipelineRunStatusSensor
from airflow.utils.edgemodifier import Label
with DAG(
dag_id="example_adf_run_pipeline",
start_date=datetime(2023, 10, 14),
schedule_interval="@daily",
catchup=False,
default_args={
"retries": 1,
"retry_delay": timedelta(minutes=3),
"azure_data_factory_conn_id": "con01", #This is a connection created on Airflow UI
"factory_name": "df123", # This can also be specified in the ADF connection.
"resource_group_name": "rg9999999", # This can also be specified in the ADF connection.
},
default_view="graph",
) as dag:
begin = EmptyOperator(task_id="begin")
end = EmptyOperator(task_id="end")
# [START howto_operator_adf_run_pipeline]
run_pipeline1: BaseOperator = AzureDataFactoryRunPipelineOperator(
task_id="run_pipeline1",
pipeline_name="pp123",
parameters={"myParam": "value"},
)
# [END howto_operator_adf_run_pipeline]
# [START howto_operator_adf_run_pipeline_async]
run_pipeline2: BaseOperator = AzureDataFactoryRunPipelineOperator(
task_id="run_pipeline2",
pipeline_name="pp123",
wait_for_termination=False,
)
pipeline_run_sensor: BaseOperator = AzureDataFactoryPipelineRunStatusSensor(
task_id="pipeline_run_sensor",
run_id=run_pipeline2.output["run_id"],
)
# [END howto_operator_adf_run_pipeline_async]
begin >> Label("No async wait") >> run_pipeline1
begin >> Label("Do async wait with sensor") >> run_pipeline2
[run_pipeline1, pipeline_run_sensor] >> end
# Task dependency created via `XComArgs`:
# run_pipeline2 >> pipeline_run_sensor
----
az storage blob upload \
--account-name st123 \
--container-name dags \
--name dags/a.py \
--file a.py \
--account-key $key
az storage blob list \
--account-name st123 \
--container-name dags \
--output table \
--account-key $key
-- 8. ジョブ実行
風車の二つ右のファイルのインポートアイコンを選択
リンクサービス
新規
名前: AzureBlobStorage01
Azure サブスクリプション: 11111111-1111-1111-1111-111111111111
ストレージ アカウント名: st123
テスト接続
作成
フォルダーのパス: dags/
インポート
Airflowコンソールでジョブ実行確認
-- 9. クリーンアップ
サービスプリンシパルを画面から削除
パイプラインを画面から削除
統合ランタイム : airflow01 を画面から削除
データファクトリを画面から削除
az group list
az group delete \
--name rg9999999 \
--yes