{DocumentDB}Amazon DocumentDB での変更ストリームの使用

-- 1. Amazon DocumentDB クラスターの作成

aws docdb create-db-cluster \
--db-cluster-identifier cluster01 \
--engine docdb \
--engine-version 4.0.0 \
--port 27017 \
--master-username test \
--master-user-password 'password' \
--no-storage-encrypted \
--no-deletion-protection


aws docdb create-db-instance \
--db-instance-identifier instance01 \
--db-instance-class db.t3.medium \
--engine docdb \
--no-auto-minor-version-upgrade \
--db-cluster-identifier cluster01

 

aws docdb describe-db-clusters \
--filter Name=engine,Values=docdb \
--db-cluster-identifier cluster01


aws docdb describe-db-instances \
--filter Name=engine,Values=docdb \
--db-instance-identifier instance01

 

-- 2. テストデータ作成
echo -e "[mongodb-org-4.0] \nname=MongoDB Repository\nbaseurl=https://repo.mongodb.org/yum/amazon/2013.03/mongodb-org/4.0/x86_64/\ngpgcheck=1 \nenabled=1 \ngpgkey=https://www.mongodb.org/static/pgp/server-4.0.asc" | sudo tee /etc/yum.repos.d/mongodb-org-4.0.repo
sudo yum install -y mongodb-org-shell

wget https://s3.amazonaws.com/rds-downloads/rds-combined-ca-bundle.pem
mongo --ssl --host cluster01.cluster-xxxxxxxxxxxx.ap-northeast-1.docdb.amazonaws.com:27017 --sslCAFile rds-combined-ca-bundle.pem --username test

--コレクション作成とデータ追加
use test

db.tab1.drop()

db.tab1.insert({_id:1,col2:"val12"});
db.tab1.insert({_id:2,col2:"val22"});

db.tab1.find()

show dbs
show collections

 

-- 3. 変更ストリームの有効化

-- testデータベースのtab1コレクションで有効化
db.adminCommand({modifyChangeStreams: 1,
    database: "test",
    collection: "tab1", 
    enable: true});

-- testデータベースの全コレクションで有効化
db.adminCommand({modifyChangeStreams: 1,
    database: "test",
    collection: "", 
    enable: true});

-- 全データベースの全コレクションで有効化
db.adminCommand({modifyChangeStreams: 1,
    database: "",
    collection: "", 
    enable: true});


-- 4. 変更ストリームの一覧

cursor = new DBCommandCursor(db,
    db.runCommand(
        {aggregate: 1,
        pipeline: [{$listChangeStreams: 1}], 
        cursor:{}}));


-- 5. 動作確認

sudo yum -y install python3-pip
sudo pip3 install pymongo


vim a.python
---------------

import os
import sys
from pymongo import MongoClient, ReadPreference

username = "test"
password = 'password'

clusterendpoint = "cluster01.cluster-xxxxxxxxxxxx.ap-northeast-1.docdb.amazonaws.com:27017"
client = MongoClient(clusterendpoint, username=username, password=password, ssl='true', ssl_ca_certs='rds-combined-ca-bundle.pem', retryWrites='false')

db = client['test']
 
coll = db.get_collection('tab1', read_preference=ReadPreference.PRIMARY)
stream = coll.watch()
coll.insert_one({'_id':3,'col2': "val32"})
print(stream.try_next())

print(stream.try_next())

result = coll.update_one({'col2': "val32"}, {'$set': {'col2': "newval32"}})
print(stream.try_next())

---------------

python3 a.python

 

-- 6. クリーンアップ

-- Amazon DocumentDBインスタンス削除

aws docdb delete-db-instance \
--db-instance-identifier instance01

-- Amazon DocumentDBクラスター削除

aws docdb delete-db-cluster \
--db-cluster-identifier cluster01 \
--skip-final-snapshot