{Azure Event Hubs}クイック スタート: Azure CLI を使用したイベント ハブの作成

 

https://learn.microsoft.com/ja-jp/azure/event-hubs/event-hubs-quickstart-cli

 

Azure Event Hubs はビッグ データ ストリーミング プラットフォームであり、毎秒数百万のイベントを受け取って処理できるイベント インジェスト サービスです

 

-- 1. 前作業

az login --use-device-code
az account show

az version

az configure --list-defaults
az configure --defaults location=japaneast
az configure --list-defaults

az group create \
--name rg9999999 \
--location japaneast


az group list
az upgrade


-- 2. Event Hubs用の Python パッケージのインストール

pip install azure-eventhub
pip install azure-eventhub-checkpointstoreblob-aio


-- 3. ストレージアカウントとBLOBコンテナーの作成

az storage account create \
--resource-group rg9999999 \
--name st123 \
--access-tier Hot \
--kind StorageV2 \
--sku Standard_LRS

az storage account list \
--resource-group rg9999999

 


key=$(az storage account keys list \
--account-name st123 \
--resource-group rg9999999 \
--output json \
--query [0].value | tr -d '"')

echo $key


az storage container create \
--name container01 \
--account-name st123 \
--public-access off \
--account-key $key

az storage container list \
--account-name st123 \
--account-key $key


-- 4. 接続文字列を取得

connectionString=$(az storage account show-connection-string \
--resource-group rg9999999 \
--name st123 \
--query connectionString \
--output tsv )

echo $connectionString

 

-- 5. Event Hubs名前空間の作成


az eventhubs namespace create \
--resource-group rg9999999 \
--name ns123 \
--sku Basic

az eventhubs namespace list \
--resource-group rg9999999

 

-- 6. Event Hubsの作成

az eventhubs eventhub create \
--resource-group rg9999999 \
--namespace-name ns123 \
--name hub01 \
--message-retention 1

 

az eventhubs eventhub list \
--resource-group rg9999999 \
--namespace-name ns123

 

-- 7. Event Hubs名前空間用の接続文字列を取得

az eventhubs namespace authorization-rule keys list \
--resource-group rg9999999 \
--namespace-name ns123 \
--name RootManageSharedAccessKey

 


-- 8. イベントを送信する Python スクリプトの作成

vim send.py

import asyncio
from azure.eventhub.aio import EventHubProducerClient
from azure.eventhub import EventData

async def run():
    # Create a producer client to send messages to the event hub.
    # Specify a connection string to your event hubs namespace and
    # the event hub name.
    producer = EventHubProducerClient.from_connection_string(conn_str="Endpoint=sb://ns123.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx", eventhub_name="hub01")
    async with producer:
        # Create a batch.
        event_data_batch = await producer.create_batch()

        # Add events to the batch.
        event_data_batch.add(EventData('First event '))
        event_data_batch.add(EventData('Second event'))
        event_data_batch.add(EventData('Third event'))

        # Send the batch of events to the event hub.
        await producer.send_batch(event_data_batch)

loop = asyncio.get_event_loop()
loop.run_until_complete(run())

 

-- 9. イベントを受信する Python スクリプトの作成

vim recv.py

import asyncio
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore


async def on_event(partition_context, event):
    # Print the event data.
    print("Received the event: \"{}\" from the partition with ID: \"{}\"".format(event.body_as_str(encoding='UTF-8'), partition_context.partition_id))

    # Update the checkpoint so that the program doesn't read the events
    # that it has already read when you run it next time.
    await partition_context.update_checkpoint(event)

async def main():
    # Create an Azure blob checkpoint store to store the checkpoints.
    checkpoint_store = BlobCheckpointStore.from_connection_string("DefaultEndpointsProtocol=https;EndpointSuffix=core.windows.net;AccountName=st123;AccountKey=yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy;BlobEndpoint=https://st123.blob.core.windows.net/;FileEndpoint=https://st123.file.core.windows.net/;QueueEndpoint=https://st123.queue.core.windows.net/;TableEndpoint=https://st123.table.core.windows.net/", "container01")

    # Create a consumer client for the event hub.
    client = EventHubConsumerClient.from_connection_string("Endpoint=sb://ns123.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx", consumer_group="$Default", eventhub_name="hub01", checkpoint_store=checkpoint_store)
    async with client:
        # Call the receive method. Read from the beginning of the partition (starting_position: "-1")
        await client.receive(on_event=on_event,  starting_position="-1")

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    # Run the main method.
    loop.run_until_complete(main())


-- 10. 動作確認

python recv.py

python send.py


-- 11. クリーンアップ


az eventhubs eventhub delete \
--resource-group rg9999999 \
--namespace-name ns123 \
--name hub01


az eventhubs namespace delete \
--resource-group rg9999999 \
--name ns123

 

az group list
az group delete \
--name rg9999999 \
--yes