Airflow ジョブ実行中にDAG修正した場合の挙動

 

Python: 3.8.10
Airflow: 2.7.2

OS: Ubuntu20

結論: タスク切り替わりのタイミングで修正が反映される

 

確認1: 15分間sleepするタスクの実行中に、
sleep時間を10秒に修正およびタスクの追加と削除をしたDAGをデプロイ
(DAGID=同じ)

 

cat <<-'EOF' > ~/airflow/dags/dag01.py

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
with DAG(
        "dag01",
        default_args={
            "depends_on_past": False,
            "email": ["admin@example.com"],
            "email_on_failure": False,
            "email_on_retry": False,
            "retries": 1,
            "retry_delay": timedelta(minutes=1)
        },
        description="dag01",
        schedule="0 2 * * *",
        start_date=datetime(2023, 12, 1),
        catchup=False,
        tags=["dag01"],
) as dag:

    task01 = BashOperator(
        task_id="task01",
        bash_command="sleep 900",
    )

    task02 = BashOperator(
        task_id="task02",
        bash_command="sleep 900",
    )
    
    task03 = BashOperator(
        task_id="task03",
        bash_command="sleep 900",
    )

    task01 >> task02 >> task03

EOF


デプロイしてunpause 
→ start_date < 本日 かつ 現在時刻 > スケジュール時刻のため、ジョブ動作開始

 

cat <<-'EOF' > ~/airflow/dags/dag01.py

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
with DAG(
        "dag01",
        default_args={
            "depends_on_past": False,
            "email": ["admin@example.com"],
            "email_on_failure": False,
            "email_on_retry": False,
            "retries": 1,
            "retry_delay": timedelta(minutes=1)
        },
        description="dag01",
        schedule="0 2 * * *",
        start_date=datetime(2023, 12, 1),
        catchup=False,
        tags=["dag01"],
) as dag:

    task01 = BashOperator(
        task_id="task01",
        bash_command="sleep 10",
    )
    task03 = BashOperator(
        task_id="task03",
        bash_command="sleep 10",
    )
    
    task04 = BashOperator(
        task_id="task04",
        bash_command="sleep 10",
    )

    task01 >> task03 >> task04

EOF


タスク実行中でロックがかかっている模様。CODEは修正後のものに反映されない

task01の終了後、CODEが修正後のものに変化して、ジョブ終了した。
→タスク単位でロックされている模様


確認2: 15分間sleepするタスクの実行中に、
sleep時間を10秒に修正およびタスクの追加と削除をしたDAGをデプロイ
(DAGID=変更)

 

cat <<-'EOF' > ~/airflow/dags/dag11.py

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
with DAG(
        "dag11",
        default_args={
            "depends_on_past": False,
            "email": ["admin@example.com"],
            "email_on_failure": False,
            "email_on_retry": False,
            "retries": 1,
            "retry_delay": timedelta(minutes=1)
        },
        description="dag11",
        schedule="0 2 * * *",
        start_date=datetime(2023, 12, 1),
        catchup=False,
        tags=["dag11"],
) as dag:

    task01 = BashOperator(
        task_id="task01",
        bash_command="sleep 900",
    )

    task02 = BashOperator(
        task_id="task02",
        bash_command="sleep 900",
    )
    
    task03 = BashOperator(
        task_id="task03",
        bash_command="sleep 900",
    )

    task01 >> task02 >> task03

EOF


デプロイしてunpause 
→ start_date < 本日 かつ 現在時刻 > スケジュール時刻のため、ジョブ動作開始

 

cat <<-'EOF' > ~/airflow/dags/dag11.py

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
with DAG(
        "dag11_v2",
        default_args={
            "depends_on_past": False,
            "email": ["admin@example.com"],
            "email_on_failure": False,
            "email_on_retry": False,
            "retries": 1,
            "retry_delay": timedelta(minutes=1)
        },
        description="dag11_v2",
        schedule="0 2 * * *",
        start_date=datetime(2023, 12, 1),
        catchup=False,
        tags=["dag11_v2"],
) as dag:

    task01 = BashOperator(
        task_id="task01",
        bash_command="sleep 10",
    )
    task03 = BashOperator(
        task_id="task03",
        bash_command="sleep 10",
    )
    
    task04 = BashOperator(
        task_id="task04",
        bash_command="sleep 10",
    )

    task01 >> task03 >> task04

EOF


タスク実行中でロックがかかっている模様。修正後のDAGIDが一覧に表示されない

task01の終了後、修正後のDAGIDが一覧に表示された
修正前のDAGIDは、消失した。

→ジョブ実行中にDAGIDの変更を行うとジョブがタスクの区切りで終了する