Python: 3.8.10
Airflow: 2.7.2
OS: Ubuntu20
結論: タスク切り替わりのタイミングで修正が反映される
確認1: 15分間sleepするタスクの実行中に、
sleep時間を10秒に修正およびタスクの追加と削除をしたDAGをデプロイ
(DAGID=同じ)
cat <<-'EOF' > ~/airflow/dags/dag01.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
with DAG(
"dag01",
default_args={
"depends_on_past": False,
"email": ["admin@example.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=1)
},
description="dag01",
schedule="0 2 * * *",
start_date=datetime(2023, 12, 1),
catchup=False,
tags=["dag01"],
) as dag:
task01 = BashOperator(
task_id="task01",
bash_command="sleep 900",
)
task02 = BashOperator(
task_id="task02",
bash_command="sleep 900",
)
task03 = BashOperator(
task_id="task03",
bash_command="sleep 900",
)
task01 >> task02 >> task03
EOF
デプロイしてunpause
→ start_date < 本日 かつ 現在時刻 > スケジュール時刻のため、ジョブ動作開始
cat <<-'EOF' > ~/airflow/dags/dag01.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
with DAG(
"dag01",
default_args={
"depends_on_past": False,
"email": ["admin@example.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=1)
},
description="dag01",
schedule="0 2 * * *",
start_date=datetime(2023, 12, 1),
catchup=False,
tags=["dag01"],
) as dag:
task01 = BashOperator(
task_id="task01",
bash_command="sleep 10",
)
task03 = BashOperator(
task_id="task03",
bash_command="sleep 10",
)
task04 = BashOperator(
task_id="task04",
bash_command="sleep 10",
)
task01 >> task03 >> task04
EOF
タスク実行中でロックがかかっている模様。CODEは修正後のものに反映されない
task01の終了後、CODEが修正後のものに変化して、ジョブ終了した。
→タスク単位でロックされている模様
確認2: 15分間sleepするタスクの実行中に、
sleep時間を10秒に修正およびタスクの追加と削除をしたDAGをデプロイ
(DAGID=変更)
cat <<-'EOF' > ~/airflow/dags/dag11.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
with DAG(
"dag11",
default_args={
"depends_on_past": False,
"email": ["admin@example.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=1)
},
description="dag11",
schedule="0 2 * * *",
start_date=datetime(2023, 12, 1),
catchup=False,
tags=["dag11"],
) as dag:
task01 = BashOperator(
task_id="task01",
bash_command="sleep 900",
)
task02 = BashOperator(
task_id="task02",
bash_command="sleep 900",
)
task03 = BashOperator(
task_id="task03",
bash_command="sleep 900",
)
task01 >> task02 >> task03
EOF
デプロイしてunpause
→ start_date < 本日 かつ 現在時刻 > スケジュール時刻のため、ジョブ動作開始
cat <<-'EOF' > ~/airflow/dags/dag11.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
with DAG(
"dag11_v2",
default_args={
"depends_on_past": False,
"email": ["admin@example.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=1)
},
description="dag11_v2",
schedule="0 2 * * *",
start_date=datetime(2023, 12, 1),
catchup=False,
tags=["dag11_v2"],
) as dag:
task01 = BashOperator(
task_id="task01",
bash_command="sleep 10",
)
task03 = BashOperator(
task_id="task03",
bash_command="sleep 10",
)
task04 = BashOperator(
task_id="task04",
bash_command="sleep 10",
)
task01 >> task03 >> task04
EOF
タスク実行中でロックがかかっている模様。修正後のDAGIDが一覧に表示されない
task01の終了後、修正後のDAGIDが一覧に表示された
修正前のDAGIDは、消失した。
→ジョブ実行中にDAGIDの変更を行うとジョブがタスクの区切りで終了する