https://learn.microsoft.com/ja-jp/azure/event-hubs/event-hubs-quickstart-cli
Azure Event Hubs はビッグ データ ストリーミング プラットフォームであり、毎秒数百万のイベントを受け取って処理できるイベント インジェスト サービスです
-- 1. 前作業
az login --use-device-code
az account show
az version
az configure --list-defaults
az configure --defaults location=japaneast
az configure --list-defaults
az group create \
--name rg9999999 \
--location japaneast
az group list
az upgrade
-- 2. Event Hubs用の Python パッケージのインストール
pip install azure-eventhub
pip install azure-eventhub-checkpointstoreblob-aio
-- 3. ストレージアカウントとBLOBコンテナーの作成
az storage account create \
--resource-group rg9999999 \
--name st123 \
--access-tier Hot \
--kind StorageV2 \
--sku Standard_LRS
az storage account list \
--resource-group rg9999999
key=$(az storage account keys list \
--account-name st123 \
--resource-group rg9999999 \
--output json \
--query [0].value | tr -d '"')
echo $key
az storage container create \
--name container01 \
--account-name st123 \
--public-access off \
--account-key $key
az storage container list \
--account-name st123 \
--account-key $key
-- 4. 接続文字列を取得
connectionString=$(az storage account show-connection-string \
--resource-group rg9999999 \
--name st123 \
--query connectionString \
--output tsv )
echo $connectionString
-- 5. Event Hubs名前空間の作成
az eventhubs namespace create \
--resource-group rg9999999 \
--name ns123 \
--sku Basic
az eventhubs namespace list \
--resource-group rg9999999
-- 6. Event Hubsの作成
az eventhubs eventhub create \
--resource-group rg9999999 \
--namespace-name ns123 \
--name hub01 \
--message-retention 1
az eventhubs eventhub list \
--resource-group rg9999999 \
--namespace-name ns123
-- 7. Event Hubs名前空間用の接続文字列を取得
az eventhubs namespace authorization-rule keys list \
--resource-group rg9999999 \
--namespace-name ns123 \
--name RootManageSharedAccessKey
-- 8. イベントを送信する Python スクリプトの作成
vim send.py
import asyncio
from azure.eventhub.aio import EventHubProducerClient
from azure.eventhub import EventData
async def run():
# Create a producer client to send messages to the event hub.
# Specify a connection string to your event hubs namespace and
# the event hub name.
producer = EventHubProducerClient.from_connection_string(conn_str="Endpoint=sb://ns123.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx", eventhub_name="hub01")
async with producer:
# Create a batch.
event_data_batch = await producer.create_batch()
# Add events to the batch.
event_data_batch.add(EventData('First event '))
event_data_batch.add(EventData('Second event'))
event_data_batch.add(EventData('Third event'))
# Send the batch of events to the event hub.
await producer.send_batch(event_data_batch)
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
-- 9. イベントを受信する Python スクリプトの作成
vim recv.py
import asyncio
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
async def on_event(partition_context, event):
# Print the event data.
print("Received the event: \"{}\" from the partition with ID: \"{}\"".format(event.body_as_str(encoding='UTF-8'), partition_context.partition_id))
# Update the checkpoint so that the program doesn't read the events
# that it has already read when you run it next time.
await partition_context.update_checkpoint(event)
async def main():
# Create an Azure blob checkpoint store to store the checkpoints.
checkpoint_store = BlobCheckpointStore.from_connection_string("DefaultEndpointsProtocol=https;EndpointSuffix=core.windows.net;AccountName=st123;AccountKey=yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy;BlobEndpoint=https://st123.blob.core.windows.net/;FileEndpoint=https://st123.file.core.windows.net/;QueueEndpoint=https://st123.queue.core.windows.net/;TableEndpoint=https://st123.table.core.windows.net/", "container01")
# Create a consumer client for the event hub.
client = EventHubConsumerClient.from_connection_string("Endpoint=sb://ns123.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx", consumer_group="$Default", eventhub_name="hub01", checkpoint_store=checkpoint_store)
async with client:
# Call the receive method. Read from the beginning of the partition (starting_position: "-1")
await client.receive(on_event=on_event, starting_position="-1")
if __name__ == '__main__':
loop = asyncio.get_event_loop()
# Run the main method.
loop.run_until_complete(main())
-- 10. 動作確認
python recv.py
python send.py
-- 11. クリーンアップ
az eventhubs eventhub delete \
--resource-group rg9999999 \
--namespace-name ns123 \
--name hub01
az eventhubs namespace delete \
--resource-group rg9999999 \
--name ns123
az group list
az group delete \
--name rg9999999 \
--yes