{QLDB}QLDB でのストリームの作成と管理

https://dev.classmethod.jp/articles/amazon-qldb-supports-real-time-streaming/
https://docs.aws.amazon.com/ja_jp/lambda/latest/dg/with-kinesis.html
https://hacknote.jp/archives/48083/


-- 前提: QLDBシェルインストール済み


-- 1. Kinesis ストリーミングの作成
aws kinesis create-stream --stream-name kinesisstream01 --shard-count 1

aws kinesis describe-stream --stream-name kinesisstream01

 


-- 2. QLDB用IAMポリシー作成
vim policy01.json

{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "QLDBStreamKinesisPermissions",
"Action": [ "kinesis:PutRecord*", "kinesis:DescribeStream", "kinesis:ListShards" ],
"Effect": "Allow",
"Resource": "arn:aws:kinesis:ap-northeast-1:999999999999:stream/kinesisstream01"
}
]
}

aws iam create-policy \
--policy-name policy01 \
--policy-document file://policy01.json

-- 3. QLDB用IAMロール作成
vim role01.json

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "qldb.amazonaws.com"
},
"Action": [ "sts:AssumeRole" ]
}
]
}

aws iam create-role \
--role-name role01 \
--assume-role-policy-document file://role01.json


-- 4. QLDB用ポリシーをロールにアタッチ
aws iam attach-role-policy \
--policy-arn arn:aws:iam::999999999999:policy/policy01 \
--role-name role01

 

-- 5. 台帳の作成
aws qldb create-ledger \
--name qldb01 \
--permissions-mode STANDARD \
--no-deletion-protection

aws qldb list-ledgers
aws qldb describe-ledger --name qldb01

-- 6. QLDBストリームの作成

-- QLDBストリーム開始

vim kinesis.json
{
"StreamArn": "arn:aws:kinesis:ap-northeast-1:999999999999:stream/kinesisstream01",
"AggregationEnabled": true
}

aws qldb stream-journal-to-kinesis \
--ledger-name qldb01 \
--role-arn arn:aws:iam::999999999999:role/role01 \
--inclusive-start-time 2021-08-29T12:04:00Z \
--kinesis-configuration file://kinesis.json \
--stream-name qldbstream01


-- QLDBストリーム終了
aws qldb cancel-journal-kinesis-stream \
--ledger-name qldb01 \
--stream-id 8UyYF1UxTSK11nkksyar91


-- QLDBストリーム一覧
aws qldb list-journal-kinesis-streams-for-ledger --ledger-name qldb01

aws qldb describe-journal-kinesis-stream \
--ledger-name qldb01 \
--stream-id 8UyYF1UxTSK11nkksyar91


-- 7. Lambda用IAMポリシー作成
vim policy02.json

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kinesis:Get*",
"kinesis:DescribeStream",
"kinesis:DescribeStreamSummary"
],
"Resource": [
"arn:aws:kinesis:ap-northeast-1:999999999999:stream/kinesisstream01"
]
},
{
"Effect": "Allow",
"Action": [
"kinesis:ListStreams",
"kinesis:ListShards"
],
"Resource": [
"*"
]
}
]
}


aws iam create-policy \
--policy-name policy02 \
--policy-document file://policy02.json

-- 8. Lambda用IAMロール作成
vim role02.json

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}


aws iam create-role \
--role-name role02 \
--assume-role-policy-document file://role02.json


-- 9. Lambda用ポリシーをロールにアタッチ
aws iam attach-role-policy \
--policy-arn arn:aws:iam::999999999999:policy/policy02 \
--role-name role02


aws iam attach-role-policy \
--policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole \
--role-name role02

 

-- 10. Lambda関数作成

mkdir package
pip3.8 install amazon.ion boto3 --target ./package

vim test.py

#!/usr/bin/python

import base64
import boto3
import amazon.ion.simpleion as ion

def lambda_handler(event, context):
print(event)
for record in event['Records']:
payload = base64.b64decode(record["kinesis"]["data"])
print("Decoded payload: " + str(payload))
ion_record = ion.loads(payload)
print("Ion reocord: ", (ion.dumps(ion_record, binary=False)))


chmod 755 test.py
chmod -R 755 package

cd package
zip -r ../test.zip .
cd ..
zip -g test.zip test.py


aws lambda create-function \
--region ap-northeast-1 \
--function-name test \
--zip-file fileb://test.zip \
--role arn:aws:iam::999999999999:role/role02 \
--handler test.lambda_handler \
--runtime python3.8 \
--timeout 60


aws lambda list-functions
aws lambda list-functions | jq -c '.Functions | [ .FunctionName ]'

aws lambda get-function --function-name test


"State"がactiveになるまで待つ


-- 11. AWS Lambda でイベントソースを追加する

aws lambda create-event-source-mapping \
--function-name test \
--event-source arn:aws:kinesis:ap-northeast-1:999999999999:stream/kinesisstream01 \
--batch-size 100 \
--starting-position LATEST


aws lambda list-event-source-mappings \
--function-name test \
--event-source arn:aws:kinesis:ap-northeast-1:999999999999:stream/kinesisstream01

 


-- 12. QLDBでテストデータ追加

qldbshell --ledger qldb01

create table tab1
insert into tab1 `{"col1": "val1"}`
select * from tab1
drop table tab1


-- 13. クリーンアップ

-- 台帳の削除
aws qldb delete-ledger --name qldb01


-- ロールの一覧
aws iam list-roles | grep role01
aws iam list-roles | grep role02

-- ロールの削除

aws iam detach-role-policy \
--role-name role01 \
--policy-arn arn:aws:iam::999999999999:policy/policy01

aws iam detach-role-policy \
--role-name role02 \
--policy-arn arn:aws:iam::999999999999:policy/policy02

aws iam detach-role-policy \
--role-name role02 \
--policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole

 

aws iam delete-role --role-name role01
aws iam delete-role --role-name role02

-- ポリシーの一覧
aws iam list-policies | grep policy01
aws iam list-policies | grep policy02

-- ポリシーの削除
aws iam delete-policy \
--policy-arn arn:aws:iam::999999999999:policy/policy01

aws iam delete-policy \
--policy-arn arn:aws:iam::999999999999:policy/policy02

-- lambdaイベントソースマッピング一覧

aws lambda list-event-source-mappings \
--function-name test \
--event-source arn:aws:kinesis:ap-northeast-1:999999999999:stream/kinesisstream01

-- lambdaイベントソースマッピング削除
aws lambda delete-event-source-mapping \
--uuid 11111111-2222-3333-4444-555555555555


-- lambda関数の一覧
aws lambda list-functions | jq -c '.Functions | [ .FunctionName ]'

-- lambda関数の削除
aws lambda delete-function --function-name test


-- Kinesis ストリーミングの一覧
aws kinesis list-streams


-- Kinesis ストリーミングの削除

aws kinesis delete-stream --stream-name kinesisstream01