(20)
https://qiita.com/Shmwa2/items/425c9be4e3a3a07db81e
https://qiita.com/yuuman/items/a449bbe36710ad837df7
https://qiita.com/hankehly/items/1f02a34740276d1b8f0f
https://airflow.apache.org/docs/apache-airflow/stable/start.html
https://dk521123.hatenablog.com/entry/2021/07/18/004531
https://qiita.com/mochocho/items/0af5662f63ca0c24918c
https://airflow.apache.org/docs/apache-airflow/stable/index.html
https://zenn.dev/ymasaoka/articles/register-dag-with-apache-airflow
https://dk521123.hatenablog.com/entry/2021/10/21/130702
https://airflow.apache.org/docs/apache-airflow/stable/installation/prerequisites.html
Prerequisites :
Python: 3.8, 3.9, 3.10, 3.11
The minimum memory required we recommend Airflow to run with is 4GB
CPU = 1
Memory = 4G
Python: 3.8.10
Airflow: 2.7.2
-- 1. インストール
sudo apt update
pip install pyopenssl==23.2.0.0
pip install --upgrade pip
pip install testresources
pip uninstall cryptography
pip install cryptography===41.0.4
mkdir -p ~/airflow
echo "export AIRFLOW_HOME=~/airflow" >> ~/.bash_profile
echo "PATH=$HOME/.local/bin:$PATH" >> ~/.bash_profile
source ~/.bash_profile
echo $AIRFLOW_HOME
AIRFLOW_VERSION=2.7.2
PYTHON_VERSION="$(python3 --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
airflow version
-- 2. 初期化
airflow db init
airflow users create \
--username admin \
--password admin \
--firstname admin \
--lastname admin \
--role Admin \
--email admin@example.com
airflow webserver --port 8080
別タームで実行
airflow scheduler
-- 3. GUIアクセス
sudo apt -y install firefox
localhost:8080
コンソールから
-- 4. 動作確認
-- 4.1 実行例1
airflow dags unpause example_bash_operator
airflow tasks test example_bash_operator runme_0 2015-01-01
airflow dags backfill example_bash_operator \
--start-date 2015-01-01 \
--end-date 2015-01-02
-- 4.2 実行例2
# DAGファイル格納ディレクトリの作成(デフォルトのDAG参照先はairflow.cfgのdags_folderで設定)
mkdir -p ~/airflow/dags
echo "echo Hello Airflow" > ~/hello.sh
chmod +x ~/hello.sh
vim ~/airflow/dags/dag01.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
with DAG(
"dag01",
default_args={
"depends_on_past": False,
"email": ["admin@example.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=1)
},
description="dag01",
schedule=timedelta(days=1),
start_date=datetime(2023, 1, 1),
catchup=False,
tags=["dag01"],
) as dag:
task01 = BashOperator(
task_id="task01",
bash_command="echo hello",
)
# 注意事項:bash_commandの最後にスペースが必要
task02 = BashOperator(
task_id="task02",
bash_command="~/hello.sh ",
)
task03 = BashOperator(
task_id="task03",
depends_on_past=False,
bash_command="sleep 1",
retries=3,
)
task01 >> task02 >> task03
5分くらい待つとGUIに表示される
airflow dags test dag01 2023-10-01
airflow tasks test dag01 task01 2023-10-01
airflow tasks test dag01 task02 2023-10-01
airflow tasks test dag01 task03 2023-10-01
airflow dags state dag01 2023-10-01
airflow tasks state dag01 task01 2023-10-01
airflow tasks state dag01 task02 2023-10-01
airflow tasks state dag01 task03 2023-10-01
-- 4.3 実行例3
vim ~/airflow/dags/dag02.py
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
from airflow.operators.bash import BashOperator
# A DAG represents a workflow, a collection of tasks
with DAG(dag_id="dag02", start_date=datetime(2023, 8, 1), schedule="0 0 * * *") as dag:
# Tasks are represented as operators
task01 = BashOperator(task_id="task01", bash_command="echo hello")
@task()
def task02():
print("task02")
# Set dependencies between tasks
task01 >> task02()
airflow dags test dag02 2023-10-01
airflow tasks test dag02 task01 2023-10-01
airflow tasks test dag02 task02 2023-10-01
airflow dags state dag02 2023-10-01
airflow tasks state dag02 task01 2023-10-01
airflow tasks state dag02 task02 2023-10-01
-- 4.4 ダグ等の確認
airflow dags list
airflow dags details dag01
airflow dags details dag02
airflow dags show dag01
airflow dags show dag02
airflow dags unpause dag01
airflow dags pause dag01
airflow tasks list dag01
airflow tasks list dag02
airflow users list
airflow variables set key1 val1
airflow variables get key1
airflow variables list
(11)
Prerequisites :
Python: 3.8, 3.9, 3.10, 3.11
The minimum memory required we recommend Airflow to run with is 4GB
CPU = 1
Memory = 4G
Python: 3.9.2
Airflow: 2.7.2
-- 1. インストール
sudo apt update
sudo apt install python3-pip
pip install pyopenssl==23.2.0.0
pip install --upgrade pip
pip install testresources
pip uninstall cryptography
pip install cryptography===41.0.4
mkdir -p ~/airflow
echo "export AIRFLOW_HOME=~/airflow" >> ~/.bash_profile
echo "PATH=$HOME/.local/bin:$PATH" >> ~/.bash_profile
source ~/.bash_profile
echo $AIRFLOW_HOME
AIRFLOW_VERSION=2.7.2
PYTHON_VERSION="$(python3 --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
airflow version
-- 2. 初期化
airflow db init
airflow users create \
--username admin \
--password admin \
--firstname admin \
--lastname admin \
--role Admin \
--email admin@example.com
airflow webserver --port 8080
別タームで実行
airflow scheduler
-- 3. GUIアクセス
localhost:8080
コンソールから
-- 4. 動作確認
-- 4.1 実行例1
airflow dags unpause example_bash_operator
airflow tasks test example_bash_operator runme_0 2015-01-01
airflow dags backfill example_bash_operator \
--start-date 2015-01-01 \
--end-date 2015-01-02
-- 4.2 実行例2
# DAGファイル格納ディレクトリの作成(デフォルトのDAG参照先はairflow.cfgのdags_folderで設定)
mkdir -p ~/airflow/dags
echo "echo Hello Airflow" > ~/hello.sh
chmod +x ~/hello.sh
sudo apt -y install vim
vim ~/airflow/dags/dag01.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
with DAG(
"dag01",
default_args={
"depends_on_past": False,
"email": ["admin@example.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=1)
},
description="dag01",
schedule=timedelta(days=1),
start_date=datetime(2023, 1, 1),
catchup=False,
tags=["dag01"],
) as dag:
task01 = BashOperator(
task_id="task01",
bash_command="echo hello",
)
# 注意事項:bash_commandの最後にスペースが必要
task02 = BashOperator(
task_id="task02",
bash_command="~/hello.sh ",
)
task03 = BashOperator(
task_id="task03",
depends_on_past=False,
bash_command="sleep 1",
retries=3,
)
task01 >> task02 >> task03
5分くらい待つとGUIに表示される
airflow dags test dag01 2023-10-01
airflow tasks test dag01 task01 2023-10-01
airflow tasks test dag01 task02 2023-10-01
airflow tasks test dag01 task03 2023-10-01
airflow dags state dag01 2023-10-01
airflow tasks state dag01 task01 2023-10-01
airflow tasks state dag01 task02 2023-10-01
airflow tasks state dag01 task03 2023-10-01
-- 4.3 実行例3
vim ~/airflow/dags/dag02.py
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
from airflow.operators.bash import BashOperator
# A DAG represents a workflow, a collection of tasks
with DAG(dag_id="dag02", start_date=datetime(2023, 8, 1), schedule="0 0 * * *") as dag:
# Tasks are represented as operators
task01 = BashOperator(task_id="task01", bash_command="echo hello")
@task()
def task02():
print("task02")
# Set dependencies between tasks
task01 >> task02()
airflow dags test dag02 2023-10-01
airflow tasks test dag02 task01 2023-10-01
airflow tasks test dag02 task02 2023-10-01
airflow dags state dag02 2023-10-01
airflow tasks state dag02 task01 2023-10-01
airflow tasks state dag02 task02 2023-10-01
-- 4.4 ダグ等の確認
airflow dags list
airflow dags details dag01
airflow dags details dag02
airflow dags show dag01
airflow dags show dag02
airflow dags unpause dag01
airflow dags pause dag01
airflow tasks list dag01
airflow tasks list dag02
airflow users list
airflow variables set key1 val1
airflow variables get key1
airflow variables list
(RL8)
https://orcacore.com/set-up-python-3-11-rocky-linux-8/
https://zerofromlight.com/blogs/detail/127/
Prerequisites :
Python: 3.8, 3.9, 3.10, 3.11
The minimum memory required we recommend Airflow to run with is 4GB
CPU = 1
Memory = 4G
Python: 3.11.2
Airflow: 2.7.2
-- 1. インストール
-- 1.1 Pythonインストール
sudo dnf update -y
sudo dnf install gcc openssl-devel bzip2-devel libffi-devel zlib-devel wget make tar -y
dnf install -y sqlite-devel
sudo wget https://www.python.org/ftp/python/3.11.2/Python-3.11.2.tgz
sudo tar -xf Python-3.11.2.tgz
cd Python-3.11.2
./configure --enable-optimizations
nproc
make -j 1
sudo make altinstall
python3.11 --version
-- 1.2 airflowインストール
mkdir ~/test
cd ~/test
python3.11 -m venv venv01
source venv01/bin/activate
pip install apache-airflow
airflow version
mkdir -p ~/airflow
echo "export AIRFLOW_HOME=~/airflow" >> ~/.bash_profile
source ~/.bash_profile
echo $AIRFLOW_HOME
-- 2. 初期化
airflow db init
airflow users create \
--username admin \
--password admin \
--firstname admin \
--lastname admin \
--role Admin \
--email admin@example.com
airflow webserver --port 8080
別タームで実行
airflow scheduler
-- 3. GUIアクセス
localhost:8080
コンソールから
-- 4. 動作確認
-- 4.1 実行例1
airflow dags unpause example_bash_operator
airflow tasks test example_bash_operator runme_0 2015-01-01
airflow dags backfill example_bash_operator \
--start-date 2015-01-01 \
--end-date 2015-01-02
-- 4.2 実行例2
# DAGファイル格納ディレクトリの作成(デフォルトのDAG参照先はairflow.cfgのdags_folderで設定)
mkdir -p ~/airflow/dags
echo "echo Hello Airflow" > ~/hello.sh
chmod +x ~/hello.sh
vim ~/airflow/dags/dag01.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
with DAG(
"dag01",
default_args={
"depends_on_past": False,
"email": ["admin@example.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=1)
},
description="dag01",
schedule=timedelta(days=1),
start_date=datetime(2023, 1, 1),
catchup=False,
tags=["dag01"],
) as dag:
task01 = BashOperator(
task_id="task01",
bash_command="echo hello",
)
# 注意事項:bash_commandの最後にスペースが必要
task02 = BashOperator(
task_id="task02",
bash_command="~/hello.sh ",
)
task03 = BashOperator(
task_id="task03",
depends_on_past=False,
bash_command="sleep 1",
retries=3,
)
task01 >> task02 >> task03
5分くらい待つとGUIに表示される
airflow dags test dag01 2023-10-01
airflow tasks test dag01 task01 2023-10-01
airflow dags state dag01 2023-10-01
airflow tasks state dag01 task01 2023-10-01
-- 4.3 実行例3
vim ~/airflow/dags/dag02.py
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
from airflow.operators.bash import BashOperator
# A DAG represents a workflow, a collection of tasks
with DAG(dag_id="dag02", start_date=datetime(2023, 8, 1), schedule="0 0 * * *") as dag:
# Tasks are represented as operators
task01 = BashOperator(task_id="task01", bash_command="echo hello")
@task()
def task02():
print("task02")
# Set dependencies between tasks
task01 >> task02()
airflow dags test dag02 2023-10-01
airflow tasks test dag02 task01 2023-10-01
airflow dags state dag02 2023-10-01
airflow tasks state dag02 task01 2023-10-01
-- 4.4 ダグ等の確認
airflow dags list
airflow dags details dag01
airflow dags details dag02
airflow dags show dag01
airflow dags show dag02
airflow dags unpause dag01
airflow dags pause dag01
airflow tasks list dag01
airflow tasks list dag02
airflow users list
airflow variables set key1 val1
airflow variables get key1
airflow variables list
- Windows Server
(2022)
https://dk521123.hatenablog.com/entry/2021/07/18/004531
DockerかWSLが必要で、直接のインストールは不可の模様
一応ためしてみる
CPU = 2
Memory = 4G
Python: 3.11.6
Airflow: 2.7.2
airflow webserver --port 8080
実行時、下記エラー
webserver | ModuleNotFoundError: No module named 'pwd'