{Kinesis}Amazon Kinesis Data Analytics for SQL Applications の開始方法

 

https://blog.serverworks.co.jp/aws/swx/kinesis-analytics

https://docs.aws.amazon.com/ja_jp/kinesisanalytics/latest/dev/getting-started.html

 

入力: input_stream   SOURCE_SQL_STREAM_001
出力: output_stream  DESTINATION_SQL_STREAM


-- 1. コマンド等のインストール

-- 1.1 aws cli version 2 インストール

curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"
unzip awscliv2.zip
sudo ./aws/install
aws --version

-- 1.2 jqインストール
sudo yum -y install jq


-- 2. IAMポリシー作成
vim policy01.json

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "ReadInputKinesis",
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStream",
                "kinesis:GetShardIterator",
                "kinesis:GetRecords"
            ],
            "Resource": [
                "arn:aws:kinesis:ap-northeast-1:999999999999:stream/input_stream"
            ]
        },
        {
            "Sid": "WriteOutputKinesis",
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStream",
                "kinesis:PutRecord",
                "kinesis:PutRecords"
            ],
            "Resource": [
                "arn:aws:kinesis:ap-northeast-1:999999999999:stream/output_stream"
            ]
        }
    ]
}


aws iam create-policy \
--policy-name policy01 \
--policy-document file://policy01.json

-- 3. IAMロール作成
vim role01.json

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": "kinesisanalytics.amazonaws.com"
            },
            "Action": "sts:AssumeRole"
        }
    ]
}

 

aws iam create-role \
--role-name role01 \
--assume-role-policy-document file://role01.json

-- 4. ポリシーをロールにアタッチ
aws iam attach-role-policy \
--policy-arn arn:aws:iam::999999999999:policy/policy01 \
--role-name role01


-- 5. Kinesis ストリーミングの作成

aws kinesis create-stream --stream-name input_stream --shard-count 1
aws kinesis create-stream --stream-name output_stream --shard-count 1

aws kinesis list-streams
  
aws kinesis describe-stream --stream-name input_stream
aws kinesis describe-stream --stream-name output_stream


-- 6. SQL アプリケーションの作成


aws kinesisanalytics create-application \
--application-name application01 \
--application-description "application01" \
--inputs '[
            {
                "NamePrefix": "SOURCE_SQL_STREAM",
                "KinesisStreamsInput": {
                    "ResourceARN": "arn:aws:kinesis:ap-northeast-1:999999999999:stream/input_stream",
                    "RoleARN": "arn:aws:iam::999999999999:role/role01"
                },
                "InputParallelism": {
                    "Count": 1
                },
                "InputSchema": {
                    "RecordFormat": {
                        "RecordFormatType": "CSV",
                        "MappingParameters": {
                            "CSVMappingParameters": {
                                "RecordRowDelimiter": "\n",
                                "RecordColumnDelimiter": ","
                            }
                        }
                    },
                    "RecordEncoding": "UTF-8",
                    "RecordColumns": [
                        {
                            "Name": "DATA",
                            "SqlType": "VARCHAR(5000)"
                        }
                    ]
                }
            }
        ]' \
--outputs '[
            {
                "Name": "DESTINATION_SQL_STREAM",
                "KinesisStreamsOutput": {
                    "ResourceARN": "arn:aws:kinesis:ap-northeast-1:999999999999:stream/output_stream",
                    "RoleARN": "arn:aws:iam::999999999999:role/role01"
                },
                "DestinationSchema": {
                    "RecordFormatType": "CSV"
                }
            }
        ]' \
--application-code 'CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM"
(col1 VARCHAR(5000) );

CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO "DESTINATION_SQL_STREAM"
SELECT DATA||DATA||DATA
FROM   "SOURCE_SQL_STREAM_001";
'

 

aws kinesisanalytics list-applications

 

aws kinesisanalytics describe-application \
--application-name application01


-- 7. SQL アプリケーションの開始


aws kinesisanalytics describe-application \
--application-name application01

aws kinesisanalytics start-application \
--application-name application01 \
--input-configurations '[
  {
    "Id": "1.1",
    "InputStartingPositionConfiguration": {
      "InputStartingPosition": "NOW"
    }
  }
]'

 


-- 8. 動作確認


aws kinesis put-record \
--stream-name input_stream \
--partition-key 1 \
--data "TEST" \
--cli-binary-format raw-in-base64-out

 


SHARD_ITERATOR=$(aws kinesis get-shard-iterator \
--shard-id shardId-000000000000 \
--shard-iterator-type TRIM_HORIZON \
--stream-name output_stream \
--query 'ShardIterator')

aws kinesis get-records --shard-iterator $SHARD_ITERATOR

 


データはBase64エンコーディングされている

echo -n "VEVTVFRFU1RURVNUCg=="  |\
base64 -d 

 

 

-- 9. SQL アプリケーションの停止

aws kinesisanalytics describe-application \
--application-name application01

aws kinesisanalytics stop-application \
--application-name application01

 

-- 10. クリーンアップ

 

-- SQL アプリケーションの削除

aws kinesisanalytics list-applications

aws kinesisanalytics describe-application \
--application-name application01

aws kinesisanalytics delete-application \
--application-name application01 \
--create-timestamp "2022-05-23T14:06:19+00:00"

 

-- Kinesis ストリーミングの削除

aws kinesis list-streams

aws kinesis delete-stream --stream-name input_stream
aws kinesis delete-stream --stream-name output_stream


-- IAMロールの削除
aws iam list-roles | grep role01

aws iam detach-role-policy \
--role-name role01 \
--policy-arn arn:aws:iam::999999999999:policy/policy01

aws iam delete-role --role-name role01


-- IAMポリシーの削除
aws iam list-policies | grep policy01

aws iam delete-policy \
--policy-arn arn:aws:iam::999999999999:policy/policy01