Airflow DAG単体テスト

 

https://takemikami.com/2021/12/05/AirflowDAG.html

前提: Airflowインストール済


pip install pytest


mkdir -p dags tests

cat -<<'EOF' > dags/dag01.py

from datetime import datetime
from airflow import DAG
from airflow.operators.dummy import DummyOperator


with DAG(
        'dag01',
        start_date=datetime(2023, 11, 11),
        schedule_interval=None,
) as dag:
    dummy = DummyOperator(task_id='dummy')
    dummy >> dummy

EOF


cat -<<'EOF' > tests/test_dags.py

import unittest
from airflow.models import DagBag


class TestDags(unittest.TestCase):

    @classmethod
    def setUpClass(cls):
        cls.dagbag = DagBag()

    def test_import_dags(self):
        self.assertFalse(
            len(self.dagbag.import_errors),
            'DAG import failures. Errors: {}'.format(
                self.dagbag.import_errors
            )
        )

EOF

cat -<<'EOF' > tests/conftest.py

import os
import pytest
import pathlib

os.environ["AIRFLOW_HOME"] = "/tmp/airflow-unittest"
os.environ["AIRFLOW__CORE__LOAD_EXAMPLES"] = "False"
os.environ["AIRFLOW__CORE__UNIT_TEST_MODE"] = "True"
os.environ["AIRFLOW__CORE__EXECUTOR"] = "DebugExecutor"
os.environ["AIRFLOW__CORE__DAGS_FOLDER"] = str(pathlib.Path("{}/../dags".format(os.path.dirname(__file__))).resolve())


@pytest.fixture(scope='session', autouse=True)
def reset_db() -> None:
    from airflow.utils import db

    db.resetdb()

EOF


pytest tests