Airflow

 

(20)


https://qiita.com/Shmwa2/items/425c9be4e3a3a07db81e
https://qiita.com/yuuman/items/a449bbe36710ad837df7
https://qiita.com/hankehly/items/1f02a34740276d1b8f0f
https://airflow.apache.org/docs/apache-airflow/stable/start.html
https://dk521123.hatenablog.com/entry/2021/07/18/004531
https://qiita.com/mochocho/items/0af5662f63ca0c24918c
https://airflow.apache.org/docs/apache-airflow/stable/index.html
https://zenn.dev/ymasaoka/articles/register-dag-with-apache-airflow
https://dk521123.hatenablog.com/entry/2021/10/21/130702
https://airflow.apache.org/docs/apache-airflow/stable/installation/prerequisites.html

Prerequisites :
  Python: 3.8, 3.9, 3.10, 3.11
  The minimum memory required we recommend Airflow to run with is 4GB


CPU = 1
Memory = 4G
Python: 3.8.10
Airflow: 2.7.2

 

-- 1. インストール

sudo apt update

pip install pyopenssl==23.2.0.0
pip install --upgrade pip

pip install testresources

pip uninstall cryptography
pip install cryptography===41.0.4

 

mkdir -p ~/airflow
echo "export AIRFLOW_HOME=~/airflow" >> ~/.bash_profile
echo "PATH=$HOME/.local/bin:$PATH" >> ~/.bash_profile
source ~/.bash_profile

echo $AIRFLOW_HOME


AIRFLOW_VERSION=2.7.2
PYTHON_VERSION="$(python3 --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"

airflow version

 

-- 2. 初期化


airflow db init

airflow users create \
--username admin \
--password admin \
--firstname admin \
--lastname admin \
--role Admin \
--email admin@example.com

airflow webserver --port 8080


別タームで実行
airflow scheduler


-- 3. GUIアクセス

sudo apt -y install firefox

localhost:8080

コンソールから

 


-- 4. 動作確認

-- 4.1 実行例1

airflow dags unpause example_bash_operator

airflow tasks test example_bash_operator runme_0 2015-01-01

airflow dags backfill example_bash_operator \
--start-date 2015-01-01 \
--end-date 2015-01-02


-- 4.2 実行例2

# DAGファイル格納ディレクトリの作成(デフォルトのDAG参照先はairflow.cfgのdags_folderで設定)

mkdir -p ~/airflow/dags
echo "echo Hello Airflow" > ~/hello.sh
chmod +x ~/hello.sh

vim ~/airflow/dags/dag01.py

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
with DAG(
        "dag01",
        default_args={
            "depends_on_past": False,
            "email": ["admin@example.com"],
            "email_on_failure": False,
            "email_on_retry": False,
            "retries": 1,
            "retry_delay": timedelta(minutes=1)
        },
        description="dag01",
        schedule=timedelta(days=1),
        start_date=datetime(2023, 1, 1),
        catchup=False,
        tags=["dag01"],
) as dag:

    task01 = BashOperator(
        task_id="task01",
        bash_command="echo hello",
    )
    # 注意事項:bash_commandの最後にスペースが必要
    task02 = BashOperator(
        task_id="task02",
        bash_command="~/hello.sh ",
    )

    task03 = BashOperator(
        task_id="task03",
        depends_on_past=False,
        bash_command="sleep 1",
        retries=3,
    )

    task01 >> task02 >> task03


5分くらい待つとGUIに表示される

airflow dags test dag01 2023-10-01
airflow tasks test dag01 task01 2023-10-01
airflow tasks test dag01 task02 2023-10-01
airflow tasks test dag01 task03 2023-10-01

airflow dags state dag01 2023-10-01
airflow tasks state dag01 task01 2023-10-01
airflow tasks state dag01 task02 2023-10-01
airflow tasks state dag01 task03 2023-10-01


-- 4.3 実行例3


vim ~/airflow/dags/dag02.py


from datetime import datetime
from airflow import DAG
from airflow.decorators import task
from airflow.operators.bash import BashOperator

# A DAG represents a workflow, a collection of tasks
with DAG(dag_id="dag02", start_date=datetime(2023, 8, 1), schedule="0 0 * * *") as dag:

    # Tasks are represented as operators
    task01 = BashOperator(task_id="task01", bash_command="echo hello")

    @task()
    def task02():
        print("task02")

    # Set dependencies between tasks
    task01 >> task02()


airflow dags test dag02 2023-10-01
airflow tasks test dag02 task01 2023-10-01
airflow tasks test dag02 task02 2023-10-01

airflow dags state dag02 2023-10-01
airflow tasks state dag02 task01 2023-10-01
airflow tasks state dag02 task02 2023-10-01

-- 4.4 ダグ等の確認

airflow dags list
airflow dags details dag01
airflow dags details dag02

airflow dags show dag01
airflow dags show dag02

airflow dags unpause dag01
airflow dags pause dag01


airflow tasks list dag01
airflow tasks list dag02

airflow users list

airflow variables set key1 val1
airflow variables get key1
airflow variables list

 

(11)


Prerequisites :
  Python: 3.8, 3.9, 3.10, 3.11
  The minimum memory required we recommend Airflow to run with is 4GB


CPU = 1
Memory = 4G
Python: 3.9.2
Airflow: 2.7.2

 

-- 1. インストール

sudo apt update

sudo apt install python3-pip


pip install pyopenssl==23.2.0.0
pip install --upgrade pip

pip install testresources

pip uninstall cryptography
pip install cryptography===41.0.4

 

mkdir -p ~/airflow
echo "export AIRFLOW_HOME=~/airflow" >> ~/.bash_profile
echo "PATH=$HOME/.local/bin:$PATH" >> ~/.bash_profile
source ~/.bash_profile

echo $AIRFLOW_HOME


AIRFLOW_VERSION=2.7.2
PYTHON_VERSION="$(python3 --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"

airflow version

 

-- 2. 初期化


airflow db init

airflow users create \
--username admin \
--password admin \
--firstname admin \
--lastname admin \
--role Admin \
--email admin@example.com

airflow webserver --port 8080


別タームで実行
airflow scheduler


-- 3. GUIアクセス


localhost:8080

コンソールから

 


-- 4. 動作確認

-- 4.1 実行例1

airflow dags unpause example_bash_operator

airflow tasks test example_bash_operator runme_0 2015-01-01

airflow dags backfill example_bash_operator \
--start-date 2015-01-01 \
--end-date 2015-01-02


-- 4.2 実行例2

# DAGファイル格納ディレクトリの作成(デフォルトのDAG参照先はairflow.cfgのdags_folderで設定)

mkdir -p ~/airflow/dags
echo "echo Hello Airflow" > ~/hello.sh
chmod +x ~/hello.sh

sudo apt -y install vim

vim ~/airflow/dags/dag01.py

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
with DAG(
        "dag01",
        default_args={
            "depends_on_past": False,
            "email": ["admin@example.com"],
            "email_on_failure": False,
            "email_on_retry": False,
            "retries": 1,
            "retry_delay": timedelta(minutes=1)
        },
        description="dag01",
        schedule=timedelta(days=1),
        start_date=datetime(2023, 1, 1),
        catchup=False,
        tags=["dag01"],
) as dag:

    task01 = BashOperator(
        task_id="task01",
        bash_command="echo hello",
    )
    # 注意事項:bash_commandの最後にスペースが必要
    task02 = BashOperator(
        task_id="task02",
        bash_command="~/hello.sh ",
    )

    task03 = BashOperator(
        task_id="task03",
        depends_on_past=False,
        bash_command="sleep 1",
        retries=3,
    )

    task01 >> task02 >> task03


5分くらい待つとGUIに表示される

airflow dags test dag01 2023-10-01
airflow tasks test dag01 task01 2023-10-01
airflow tasks test dag01 task02 2023-10-01
airflow tasks test dag01 task03 2023-10-01

airflow dags state dag01 2023-10-01
airflow tasks state dag01 task01 2023-10-01
airflow tasks state dag01 task02 2023-10-01
airflow tasks state dag01 task03 2023-10-01


-- 4.3 実行例3


vim ~/airflow/dags/dag02.py


from datetime import datetime
from airflow import DAG
from airflow.decorators import task
from airflow.operators.bash import BashOperator

# A DAG represents a workflow, a collection of tasks
with DAG(dag_id="dag02", start_date=datetime(2023, 8, 1), schedule="0 0 * * *") as dag:

    # Tasks are represented as operators
    task01 = BashOperator(task_id="task01", bash_command="echo hello")

    @task()
    def task02():
        print("task02")

    # Set dependencies between tasks
    task01 >> task02()


airflow dags test dag02 2023-10-01
airflow tasks test dag02 task01 2023-10-01
airflow tasks test dag02 task02 2023-10-01

airflow dags state dag02 2023-10-01
airflow tasks state dag02 task01 2023-10-01
airflow tasks state dag02 task02 2023-10-01

-- 4.4 ダグ等の確認

airflow dags list
airflow dags details dag01
airflow dags details dag02

airflow dags show dag01
airflow dags show dag02

airflow dags unpause dag01
airflow dags pause dag01


airflow tasks list dag01
airflow tasks list dag02

airflow users list

airflow variables set key1 val1
airflow variables get key1
airflow variables list

 

 

(RL8)
https://orcacore.com/set-up-python-3-11-rocky-linux-8/
https://zerofromlight.com/blogs/detail/127/

 

Prerequisites :
  Python: 3.8, 3.9, 3.10, 3.11
  The minimum memory required we recommend Airflow to run with is 4GB


CPU = 1
Memory = 4G
Python: 3.11.2
Airflow: 2.7.2


-- 1. インストール

-- 1.1 Pythonインストール

sudo dnf update -y
sudo dnf install gcc openssl-devel bzip2-devel libffi-devel zlib-devel wget make tar -y

dnf install -y sqlite-devel

sudo wget https://www.python.org/ftp/python/3.11.2/Python-3.11.2.tgz
sudo tar -xf Python-3.11.2.tgz
cd Python-3.11.2
./configure --enable-optimizations
nproc
make -j 1


sudo make altinstall
python3.11 --version


-- 1.2 airflowインストール

mkdir ~/test
cd ~/test

python3.11 -m venv venv01
source venv01/bin/activate


pip install apache-airflow

airflow version

 

mkdir -p ~/airflow
echo "export AIRFLOW_HOME=~/airflow" >> ~/.bash_profile
source ~/.bash_profile

echo $AIRFLOW_HOME

 

-- 2. 初期化


airflow db init

airflow users create \
--username admin \
--password admin \
--firstname admin \
--lastname admin \
--role Admin \
--email admin@example.com

airflow webserver --port 8080


別タームで実行
airflow scheduler


-- 3. GUIアクセス


localhost:8080

コンソールから


-- 4. 動作確認

-- 4.1 実行例1

airflow dags unpause example_bash_operator

airflow tasks test example_bash_operator runme_0 2015-01-01

airflow dags backfill example_bash_operator \
--start-date 2015-01-01 \
--end-date 2015-01-02


-- 4.2 実行例2

# DAGファイル格納ディレクトリの作成(デフォルトのDAG参照先はairflow.cfgのdags_folderで設定)

mkdir -p ~/airflow/dags
echo "echo Hello Airflow" > ~/hello.sh
chmod +x ~/hello.sh


vim ~/airflow/dags/dag01.py

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
with DAG(
        "dag01",
        default_args={
            "depends_on_past": False,
            "email": ["admin@example.com"],
            "email_on_failure": False,
            "email_on_retry": False,
            "retries": 1,
            "retry_delay": timedelta(minutes=1)
        },
        description="dag01",
        schedule=timedelta(days=1),
        start_date=datetime(2023, 1, 1),
        catchup=False,
        tags=["dag01"],
) as dag:

    task01 = BashOperator(
        task_id="task01",
        bash_command="echo hello",
    )
    # 注意事項:bash_commandの最後にスペースが必要
    task02 = BashOperator(
        task_id="task02",
        bash_command="~/hello.sh ",
    )

    task03 = BashOperator(
        task_id="task03",
        depends_on_past=False,
        bash_command="sleep 1",
        retries=3,
    )

    task01 >> task02 >> task03


5分くらい待つとGUIに表示される

airflow dags test dag01 2023-10-01
airflow tasks test dag01 task01 2023-10-01

airflow dags state dag01 2023-10-01
airflow tasks state dag01 task01 2023-10-01


-- 4.3 実行例3


vim ~/airflow/dags/dag02.py


from datetime import datetime
from airflow import DAG
from airflow.decorators import task
from airflow.operators.bash import BashOperator

# A DAG represents a workflow, a collection of tasks
with DAG(dag_id="dag02", start_date=datetime(2023, 8, 1), schedule="0 0 * * *") as dag:

    # Tasks are represented as operators
    task01 = BashOperator(task_id="task01", bash_command="echo hello")

    @task()
    def task02():
        print("task02")

    # Set dependencies between tasks
    task01 >> task02()


airflow dags test dag02 2023-10-01
airflow tasks test dag02 task01 2023-10-01

airflow dags state dag02 2023-10-01
airflow tasks state dag02 task01 2023-10-01

-- 4.4 ダグ等の確認

airflow dags list
airflow dags details dag01
airflow dags details dag02

airflow dags show dag01
airflow dags show dag02

airflow dags unpause dag01
airflow dags pause dag01


airflow tasks list dag01
airflow tasks list dag02

airflow users list

airflow variables set key1 val1
airflow variables get key1
airflow variables list

 

 

(2022)
https://dk521123.hatenablog.com/entry/2021/07/18/004531

DockerかWSLが必要で、直接のインストールは不可の模様

一応ためしてみる

CPU = 2
Memory = 4G
Python: 3.11.6
Airflow: 2.7.2


airflow webserver --port 8080
実行時、下記エラー
webserver  | ModuleNotFoundError: No module named 'pwd'