{Azure Data Factory}マネージド エアフローを使用して既存のパイプラインを実行する

https://www.youtube.com/watch?v=pGZ5v7OMqhM
https://learn.microsoft.com/ja-jp/cli/azure/datafactory?view=azure-cli-latest
https://learn.microsoft.com/ja-jp/azure/data-factory/tutorial-run-existing-pipeline-with-airflow
https://learn.microsoft.com/ja-jp/azure/role-based-access-control/role-assignments-list-cli


-- 1. 前作業

az login --use-device-code
az account show

az version

az configure --list-defaults
az configure --defaults location=eastus
az configure --list-defaults

az group create \
--name rg9999999 \
--location eastus


az group list
az upgrade

 

 

-- 2. Azure Data Factory パイプラインを作成する


az datafactory create \
--location "East US" \
--name df123 \
--resource-group rg9999999


az datafactory list \
--resource-group rg9999999

az datafactory show \
--name df123 \
--resource-group rg9999999


az datafactory pipeline create \
--factory-name df123 \
--name pp123 \
--resource-group rg9999999 \
--pipeline "{\"activities\": [{\"additionalProperties\": null,\"dependsOn\": ,\"description\": null,\"name\": \"Wait1\",\"type\": \"Wait\",\"userProperties\": ,\"waitTimeInSeconds\": 1}]}"

 


az datafactory pipeline list \
--factory-name df123 \
--resource-group rg9999999


az datafactory pipeline show \
--factory-name df123 \
--name pp123 \
--resource-group rg9999999

 

-- 3. サービス プリンシパルを作成する

※Azure Active Directoryは下記に名前変更になった
Microsoft Entra ID


az account show \
--query id \
--output tsv

 

az ad sp create-for-rbac \
--name sp123 \
--role Contributor \
--scopes /subscriptions/11111111-1111-1111-1111-111111111111/resourceGroups/rg9999999/providers/Microsoft.DataFactory/factories/df123


出力されるappId、password、tenantをメモする

 


az ad sp list --spn 22222222-2222-2222-2222-222222222222

※ spnはappIdを指定

az ad sp show --id 44444444-4444-4444-4444-444444444444

サービスプリンシパルのロール割り当て確認

az role assignment list --all --assignee 44444444-4444-4444-4444-444444444444 \
--output json --query '[].{principalName:principalName, roleDefinitionName:roleDefinitionName, scope:scope}'

 

-- 4. Azure ストレージ アカウント を作成する

az storage account create \
--resource-group rg9999999 \
--name st123 \
--access-tier Hot \
--kind StorageV2 \
--sku Standard_LRS


az storage account list \
--resource-group rg9999999

az storage account show \
--resource-group rg9999999 \
--name st123


key=$(az storage account keys list \
--account-name st123 \
--resource-group rg9999999 \
--output json \
--query [0].value | tr -d '"')

echo $key

 

az storage container create \
--name dags \
--account-name st123 \
--public-access off \
--account-key $key

 


-- 5. Airflowコンソール起動

Data Factory Studioを起動

左ペインの管理
中ペインの一番下にあるApache Airflowを選択
通気環境の作成

名前: airflow01
Airflow認証の種類: 基本
Airflow ユーザー名: airflow
Airflow のパスワード: password
コンピューティング サイズ: 小

作成

状態が実行中になるまで待つ
実行中になったら風車の右のモニターアイコンを選択してログイン


-- 6. コネクション作成

Admin
Connections
+アイコン押下


Connection ID: con01
Connection Type: Azure Data Factory
Client ID: サービスプリンシパルのappId
Secret: サービスプリンシパルのpassword
Tenant ID: サービスプリンシパルのtenant
Subsription ID: 11111111-1111-1111-1111-111111111111
Resource Group Name: rg9999999
Factory Name: df123

TEST
SAVE

 

 

-- 7. ダグファイルのアップロード

vim a.py
----

from datetime import datetime, timedelta

from airflow.models import DAG, BaseOperator

try:
    from airflow.operators.empty import EmptyOperator
except ModuleNotFoundError:
    from airflow.operators.dummy import DummyOperator as EmptyOperator  # type: ignore
from airflow.providers.microsoft.azure.operators.data_factory import AzureDataFactoryRunPipelineOperator
from airflow.providers.microsoft.azure.sensors.data_factory import AzureDataFactoryPipelineRunStatusSensor
from airflow.utils.edgemodifier import Label

with DAG(
    dag_id="example_adf_run_pipeline",
    start_date=datetime(2023, 10, 14),
    schedule_interval="@daily",
    catchup=False,
    default_args={
        "retries": 1,
        "retry_delay": timedelta(minutes=3),
        "azure_data_factory_conn_id": "con01", #This is a connection created on Airflow UI
        "factory_name": "df123",  # This can also be specified in the ADF connection.
        "resource_group_name": "rg9999999",  # This can also be specified in the ADF connection.
    },
    default_view="graph",
) as dag:
    begin = EmptyOperator(task_id="begin")
    end = EmptyOperator(task_id="end")

    # [START howto_operator_adf_run_pipeline]
    run_pipeline1: BaseOperator = AzureDataFactoryRunPipelineOperator(
        task_id="run_pipeline1",
        pipeline_name="pp123", 
        parameters={"myParam": "value"},
    )
    # [END howto_operator_adf_run_pipeline]

    # [START howto_operator_adf_run_pipeline_async]
    run_pipeline2: BaseOperator = AzureDataFactoryRunPipelineOperator(
        task_id="run_pipeline2",
        pipeline_name="pp123",
        wait_for_termination=False,
    )

    pipeline_run_sensor: BaseOperator = AzureDataFactoryPipelineRunStatusSensor(
        task_id="pipeline_run_sensor",
        run_id=run_pipeline2.output["run_id"],
    )
    # [END howto_operator_adf_run_pipeline_async]

    begin >> Label("No async wait") >> run_pipeline1
    begin >> Label("Do async wait with sensor") >> run_pipeline2
    [run_pipeline1, pipeline_run_sensor] >> end

    # Task dependency created via `XComArgs`:
    #   run_pipeline2 >> pipeline_run_sensor

----


az storage blob upload \
--account-name st123 \
--container-name dags \
--name dags/a.py \
--file a.py \
--account-key $key


az storage blob list \
--account-name st123 \
--container-name dags \
--output table \
--account-key $key

 

-- 8. ジョブ実行


風車の二つ右のファイルのインポートアイコンを選択

リンクサービス
新規


名前: AzureBlobStorage01
Azure サブスクリプション: 11111111-1111-1111-1111-111111111111
ストレージ アカウント名: st123

テスト接続
作成

フォルダーのパス: dags/
インポート

Airflowコンソールでジョブ実行確認

 

-- 9. クリーンアップ

サービスプリンシパルを画面から削除
パイプラインを画面から削除
統合ランタイム : airflow01 を画面から削除
データファクトリを画面から削除

az group list
az group delete \
--name rg9999999 \
--yes