https://blog.serverworks.co.jp/aws/swx/kinesis-analytics
https://docs.aws.amazon.com/ja_jp/kinesisanalytics/latest/dev/getting-started.html
入力: input_stream SOURCE_SQL_STREAM_001
出力: output_stream DESTINATION_SQL_STREAM
-- 1. コマンド等のインストール
-- 1.1 aws cli version 2 インストール
curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"
unzip awscliv2.zip
sudo ./aws/install
aws --version
-- 1.2 jqインストール
sudo yum -y install jq
-- 2. IAMポリシー作成
vim policy01.json
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "ReadInputKinesis",
"Effect": "Allow",
"Action": [
"kinesis:DescribeStream",
"kinesis:GetShardIterator",
"kinesis:GetRecords"
],
"Resource": [
"arn:aws:kinesis:ap-northeast-1:999999999999:stream/input_stream"
]
},
{
"Sid": "WriteOutputKinesis",
"Effect": "Allow",
"Action": [
"kinesis:DescribeStream",
"kinesis:PutRecord",
"kinesis:PutRecords"
],
"Resource": [
"arn:aws:kinesis:ap-northeast-1:999999999999:stream/output_stream"
]
}
]
}
aws iam create-policy \
--policy-name policy01 \
--policy-document file://policy01.json
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "kinesisanalytics.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
aws iam create-role \
--role-name role01 \
--assume-role-policy-document file://role01.json
-- 4. ポリシーをロールにアタッチ
aws iam attach-role-policy \
--policy-arn arn:aws:iam::999999999999:policy/policy01 \
--role-name role01
-- 5. Kinesis ストリーミングの作成
aws kinesis create-stream --stream-name input_stream --shard-count 1
aws kinesis create-stream --stream-name output_stream --shard-count 1
aws kinesis list-streams
aws kinesis describe-stream --stream-name input_stream
aws kinesis describe-stream --stream-name output_stream
-- 6. SQL アプリケーションの作成
aws kinesisanalytics create-application \
--application-name application01 \
--application-description "application01" \
--inputs '[
{
"NamePrefix": "SOURCE_SQL_STREAM",
"KinesisStreamsInput": {
"ResourceARN": "arn:aws:kinesis:ap-northeast-1:999999999999:stream/input_stream",
"RoleARN": "arn:aws:iam::999999999999:role/role01"
},
"InputParallelism": {
"Count": 1
},
"InputSchema": {
"RecordFormat": {
"RecordFormatType": "CSV",
"MappingParameters": {
"CSVMappingParameters": {
"RecordRowDelimiter": "\n",
"RecordColumnDelimiter": ","
}
}
},
"RecordEncoding": "UTF-8",
"RecordColumns": [
{
"Name": "DATA",
"SqlType": "VARCHAR(5000)"
}
]
}
}
]' \
--outputs '[
{
"Name": "DESTINATION_SQL_STREAM",
"KinesisStreamsOutput": {
"ResourceARN": "arn:aws:kinesis:ap-northeast-1:999999999999:stream/output_stream",
"RoleARN": "arn:aws:iam::999999999999:role/role01"
},
"DestinationSchema": {
"RecordFormatType": "CSV"
}
}
]' \
--application-code 'CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM"
(col1 VARCHAR(5000) );
CREATE OR REPLACE PUMP "STREAM_PUMP" AS
INSERT INTO "DESTINATION_SQL_STREAM"
SELECT DATA||DATA||DATA
FROM "SOURCE_SQL_STREAM_001";
'
aws kinesisanalytics list-applications
aws kinesisanalytics describe-application \
--application-name application01
-- 7. SQL アプリケーションの開始
aws kinesisanalytics describe-application \
--application-name application01
aws kinesisanalytics start-application \
--application-name application01 \
--input-configurations '[
{
"Id": "1.1",
"InputStartingPositionConfiguration": {
"InputStartingPosition": "NOW"
}
}
]'
-- 8. 動作確認
aws kinesis put-record \
--stream-name input_stream \
--partition-key 1 \
--data "TEST" \
--cli-binary-format raw-in-base64-out
SHARD_ITERATOR=$(aws kinesis get-shard-iterator \
--shard-id shardId-000000000000 \
--shard-iterator-type TRIM_HORIZON \
--stream-name output_stream \
--query 'ShardIterator')
aws kinesis get-records --shard-iterator $SHARD_ITERATOR
echo -n "VEVTVFRFU1RURVNUCg==" |\
base64 -d
-- 9. SQL アプリケーションの停止
aws kinesisanalytics describe-application \
--application-name application01
aws kinesisanalytics stop-application \
--application-name application01
-- 10. クリーンアップ
-- SQL アプリケーションの削除
aws kinesisanalytics list-applications
aws kinesisanalytics describe-application \
--application-name application01
aws kinesisanalytics delete-application \
--application-name application01 \
--create-timestamp "2022-05-23T14:06:19+00:00"
-- Kinesis ストリーミングの削除
aws kinesis delete-stream --stream-name input_stream
aws kinesis delete-stream --stream-name output_stream
-- IAMロールの削除
aws iam list-roles | grep role01
aws iam detach-role-policy \
--role-name role01 \
--policy-arn arn:aws:iam::999999999999:policy/policy01
aws iam delete-role --role-name role01
-- IAMポリシーの削除
aws iam list-policies | grep policy01
aws iam delete-policy \
--policy-arn arn:aws:iam::999999999999:policy/policy01