https://docs.aws.amazon.com/mwaa/latest/userguide/quick-start.html
https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-mwaa-environment.html
https://docs.aws.amazon.com/cli/latest/reference/mwaa/
https://bunsugi.com/mwaa-beginner/
-- 1. コマンド等のインストール
-- 1.1 aws cli version 2 インストール (mac)
curl "https://awscli.amazonaws.com/AWSCLIV2.pkg" -o "AWSCLIV2.pkg"
sudo installer -pkg AWSCLIV2.pkg -target /
which aws
aws --version
-- 2. VPC、S3、Airflow環境の作成
----
AWSTemplateFormatVersion: "2010-09-09"
Parameters:
EnvironmentName:
Description: An environment name that is prefixed to resource names
Type: String
Default: MWAAEnvironment
VpcCIDR:
Description: The IP range (CIDR notation) for this VPC
Type: String
Default: 10.192.0.0/16
PublicSubnet1CIDR:
Description: The IP range (CIDR notation) for the public subnet in the first Availability Zone
Type: String
Default: 10.192.10.0/24
PublicSubnet2CIDR:
Description: The IP range (CIDR notation) for the public subnet in the second Availability Zone
Type: String
Default: 10.192.11.0/24
PrivateSubnet1CIDR:
Description: The IP range (CIDR notation) for the private subnet in the first Availability Zone
Type: String
Default: 10.192.20.0/24
PrivateSubnet2CIDR:
Description: The IP range (CIDR notation) for the private subnet in the second Availability Zone
Type: String
Default: 10.192.21.0/24
BucketName:
Description: BucketName
Type: String
Default: bucket123
MaxWorkerNodes:
Description: The maximum number of workers that can run in the environment
Type: Number
Default: 2
DagProcessingLogs:
Description: Log level for DagProcessing
Type: String
Default: INFO
SchedulerLogsLevel:
Description: Log level for SchedulerLogs
Type: String
Default: INFO
TaskLogsLevel:
Description: Log level for TaskLogs
Type: String
Default: INFO
WorkerLogsLevel:
Description: Log level for WorkerLogs
Type: String
Default: INFO
WebserverLogsLevel:
Description: Log level for WebserverLogs
Type: String
Default: INFO
Resources:
#####################################################################################################################
# CREATE VPC
#####################################################################################################################
VPC:
Type: AWS::EC2::VPC
Properties:
CidrBlock: !Ref VpcCIDR
EnableDnsSupport: true
EnableDnsHostnames: true
Tags:
- Key: Name
Value: MWAAEnvironment
InternetGateway:
Type: AWS::EC2::InternetGateway
Properties:
Tags:
- Key: Name
Value: MWAAEnvironment
InternetGatewayAttachment:
Type: AWS::EC2::VPCGatewayAttachment
Properties:
InternetGatewayId: !Ref InternetGateway
VpcId: !Ref VPC
PublicSubnet1:
Type: AWS::EC2::Subnet
Properties:
VpcId: !Ref VPC
AvailabilityZone: !Select [ 0, !GetAZs '' ]
CidrBlock: !Ref PublicSubnet1CIDR
MapPublicIpOnLaunch: true
Tags:
- Key: Name
Value: !Sub ${EnvironmentName} Public Subnet (AZ1)
PublicSubnet2:
Type: AWS::EC2::Subnet
Properties:
VpcId: !Ref VPC
AvailabilityZone: !Select [ 1, !GetAZs '' ]
CidrBlock: !Ref PublicSubnet2CIDR
MapPublicIpOnLaunch: true
Tags:
- Key: Name
Value: !Sub ${EnvironmentName} Public Subnet (AZ2)
PrivateSubnet1:
Type: AWS::EC2::Subnet
Properties:
VpcId: !Ref VPC
AvailabilityZone: !Select [ 0, !GetAZs '' ]
CidrBlock: !Ref PrivateSubnet1CIDR
MapPublicIpOnLaunch: false
Tags:
- Key: Name
Value: !Sub ${EnvironmentName} Private Subnet (AZ1)
PrivateSubnet2:
Type: AWS::EC2::Subnet
Properties:
VpcId: !Ref VPC
AvailabilityZone: !Select [ 1, !GetAZs '' ]
CidrBlock: !Ref PrivateSubnet2CIDR
MapPublicIpOnLaunch: false
Tags:
- Key: Name
Value: !Sub ${EnvironmentName} Private Subnet (AZ2)
NatGateway1EIP:
Type: AWS::EC2::EIP
DependsOn: InternetGatewayAttachment
Properties:
Domain: vpc
NatGateway2EIP:
Type: AWS::EC2::EIP
DependsOn: InternetGatewayAttachment
Properties:
Domain: vpc
NatGateway1:
Type: AWS::EC2::NatGateway
Properties:
AllocationId: !GetAtt NatGateway1EIP.AllocationId
SubnetId: !Ref PublicSubnet1
NatGateway2:
Type: AWS::EC2::NatGateway
Properties:
AllocationId: !GetAtt NatGateway2EIP.AllocationId
SubnetId: !Ref PublicSubnet2
PublicRouteTable:
Type: AWS::EC2::RouteTable
Properties:
VpcId: !Ref VPC
Tags:
- Key: Name
Value: !Sub ${EnvironmentName} Public Routes
DefaultPublicRoute:
Type: AWS::EC2::Route
DependsOn: InternetGatewayAttachment
Properties:
RouteTableId: !Ref PublicRouteTable
DestinationCidrBlock: 0.0.0.0/0
GatewayId: !Ref InternetGateway
PublicSubnet1RouteTableAssociation:
Type: AWS::EC2::SubnetRouteTableAssociation
Properties:
RouteTableId: !Ref PublicRouteTable
SubnetId: !Ref PublicSubnet1
PublicSubnet2RouteTableAssociation:
Type: AWS::EC2::SubnetRouteTableAssociation
Properties:
RouteTableId: !Ref PublicRouteTable
SubnetId: !Ref PublicSubnet2
PrivateRouteTable1:
Type: AWS::EC2::RouteTable
Properties:
VpcId: !Ref VPC
Tags:
- Key: Name
Value: !Sub ${EnvironmentName} Private Routes (AZ1)
DefaultPrivateRoute1:
Type: AWS::EC2::Route
Properties:
RouteTableId: !Ref PrivateRouteTable1
DestinationCidrBlock: 0.0.0.0/0
NatGatewayId: !Ref NatGateway1
PrivateSubnet1RouteTableAssociation:
Type: AWS::EC2::SubnetRouteTableAssociation
Properties:
RouteTableId: !Ref PrivateRouteTable1
SubnetId: !Ref PrivateSubnet1
PrivateRouteTable2:
Type: AWS::EC2::RouteTable
Properties:
VpcId: !Ref VPC
Tags:
- Key: Name
Value: !Sub ${EnvironmentName} Private Routes (AZ2)
DefaultPrivateRoute2:
Type: AWS::EC2::Route
Properties:
RouteTableId: !Ref PrivateRouteTable2
DestinationCidrBlock: 0.0.0.0/0
NatGatewayId: !Ref NatGateway2
PrivateSubnet2RouteTableAssociation:
Type: AWS::EC2::SubnetRouteTableAssociation
Properties:
RouteTableId: !Ref PrivateRouteTable2
SubnetId: !Ref PrivateSubnet2
EnvironmentBucket:
Type: AWS::S3::Bucket
Properties:
BucketName: !Ref BucketName
VersioningConfiguration:
Status: Enabled
PublicAccessBlockConfiguration:
BlockPublicAcls: true
BlockPublicPolicy: true
IgnorePublicAcls: true
RestrictPublicBuckets: true
#####################################################################################################################
# CREATE MWAA
#####################################################################################################################
MwaaEnvironment:
Type: AWS::MWAA::Environment
DependsOn: MwaaExecutionPolicy
Properties:
Name: !Sub "${AWS::StackName}-MwaaEnvironment"
SourceBucketArn: !GetAtt EnvironmentBucket.Arn
ExecutionRoleArn: !GetAtt MwaaExecutionRole.Arn
DagS3Path: dags
EnvironmentClass : "mw1.small"
NetworkConfiguration:
SecurityGroupIds:
- !GetAtt SecurityGroup.GroupId
SubnetIds:
- !Ref PrivateSubnet1
- !Ref PrivateSubnet2
WebserverAccessMode: PUBLIC_ONLY
MaxWorkers: !Ref MaxWorkerNodes
LoggingConfiguration:
DagProcessingLogs:
LogLevel: !Ref DagProcessingLogs
Enabled: true
SchedulerLogs:
LogLevel: !Ref SchedulerLogsLevel
Enabled: true
TaskLogs:
LogLevel: !Ref TaskLogsLevel
Enabled: true
WorkerLogs:
LogLevel: !Ref WorkerLogsLevel
Enabled: true
WebserverLogs:
LogLevel: !Ref WebserverLogsLevel
Enabled: true
SecurityGroup:
Type: AWS::EC2::SecurityGroup
Properties:
VpcId: !Ref VPC
GroupDescription: !Sub "Security Group for Amazon MWAA Environment ${AWS::StackName}-MwaaEnvironment"
GroupName: !Sub "airflow-security-group-${AWS::StackName}-MwaaEnvironment"
SecurityGroupIngress:
Type: AWS::EC2::SecurityGroupIngress
Properties:
GroupId: !Ref SecurityGroup
IpProtocol: "-1"
SourceSecurityGroupId: !Ref SecurityGroup
SecurityGroupEgress:
Type: AWS::EC2::SecurityGroupEgress
Properties:
GroupId: !Ref SecurityGroup
IpProtocol: "-1"
CidrIp: "0.0.0.0/0"
MwaaExecutionRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Principal:
Service:
- airflow-env.amazonaws.com
- airflow.amazonaws.com
Action:
- "sts:AssumeRole"
Path: "/service-role/"
MwaaExecutionPolicy:
DependsOn: EnvironmentBucket
Type: AWS::IAM::ManagedPolicy
Properties:
Roles:
- !Ref MwaaExecutionRole
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action: airflow:PublishMetrics
Resource:
- !Sub "arn:aws:airflow:${AWS::Region}:${AWS::AccountId}:environment/${EnvironmentName}"
- Effect: Deny
Action: s3:ListAllMyBuckets
Resource:
- !Sub "${EnvironmentBucket.Arn}"
- !Sub "${EnvironmentBucket.Arn}/*"
- Effect: Allow
Action:
- "s3:GetObject*"
- "s3:GetBucket*"
- "s3:List*"
Resource:
- !Sub "${EnvironmentBucket.Arn}"
- !Sub "${EnvironmentBucket.Arn}/*"
- Effect: Allow
Action:
- logs:DescribeLogGroups
Resource: "*"
- Effect: Allow
Action:
- logs:CreateLogStream
- logs:CreateLogGroup
- logs:PutLogEvents
- logs:GetLogEvents
- logs:GetLogRecord
- logs:GetLogGroupFields
- logs:GetQueryResults
- logs:DescribeLogGroups
Resource:
- !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:airflow-${AWS::StackName}*"
- Effect: Allow
Action: cloudwatch:PutMetricData
Resource: "*"
- Effect: Allow
Action:
- sqs:ChangeMessageVisibility
- sqs:DeleteMessage
- sqs:GetQueueAttributes
- sqs:GetQueueUrl
- sqs:ReceiveMessage
- sqs:SendMessage
Resource:
- !Sub "arn:aws:sqs:${AWS::Region}:*:airflow-celery-*"
- Effect: Allow
Action:
- kms:Decrypt
- kms:DescribeKey
- "kms:GenerateDataKey*"
- kms:Encrypt
NotResource: !Sub "arn:aws:kms:*:${AWS::AccountId}:key/*"
Condition:
StringLike:
"kms:ViaService":
- !Sub "sqs.${AWS::Region}.amazonaws.com"
Outputs:
VPC:
Description: A reference to the created VPC
Value: !Ref VPC
PublicSubnets:
Description: A list of the public subnets
Value: !Join [ ",", [ !Ref PublicSubnet1, !Ref PublicSubnet2 ]]
PrivateSubnets:
Description: A list of the private subnets
Value: !Join [ ",", [ !Ref PrivateSubnet1, !Ref PrivateSubnet2 ]]
PublicSubnet1:
Description: A reference to the public subnet in the 1st Availability Zone
Value: !Ref PublicSubnet1
PublicSubnet2:
Description: A reference to the public subnet in the 2nd Availability Zone
Value: !Ref PublicSubnet2
PrivateSubnet1:
Description: A reference to the private subnet in the 1st Availability Zone
Value: !Ref PrivateSubnet1
PrivateSubnet2:
Description: A reference to the private subnet in the 2nd Availability Zone
Value: !Ref PrivateSubnet2
MwaaApacheAirflowUI:
Description: MWAA Environment
Value: !Sub "https://${MwaaEnvironment.WebserverUrl}"
----
aws cloudformation validate-template \
--template-body file://a.yaml
aws cloudformation create-stack \
--stack-name stack01 \
--template-body file://a.yaml \
--capabilities CAPABILITY_IAM
aws cloudformation list-stacks
aws cloudformation describe-stacks \
--stack-name stack01
aws cloudformation describe-stack-resources \
--stack-name stack01
-- 3. DAGのアップロード
vim dag01.py
----
from datetime import datetime, timedelta
from textwrap import dedent
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'dag': dag,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function,
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'sla_miss_callback': yet_another_function,
# 'trigger_rule': 'all_success'
}
with DAG(
'dag01',
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1),
start_date=datetime(2021, 1, 1),
catchup=False,
tags=['example'],
) as dag:
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
)
t2 = BashOperator(
task_id='sleep',
depends_on_past=False,
bash_command='sleep 5',
retries=3,
)
t1.doc_md = dedent(
"""\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.
![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
"""
)
dag.doc_md = __doc__ # providing that you have a docstring at the beginning of the DAG
dag.doc_md = """
This is a documentation placed anywhere
""" # otherwise, type it like this
templated_command = dedent(
"""
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
)
t3 = BashOperator(
task_id='templated',
depends_on_past=False,
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
)
t1 >> [t2, t3]
----
aws s3 ls
aws s3 ls s3://bucket123 --recursive
aws s3 cp dag01.py s3://bucket123/dags/
aws s3 ls s3://bucket123 --recursive
-- 4. 動作確認
Amazon MWAA の Airflow UIから実行
AWS CLIから実行はできない模様
アップロードしたダグが表示されるまで待つ
-- 5. クリーンアップ
S3バケットを空にする
-- スタック削除
aws cloudformation delete-stack \
--stack-name stack01