Azure Service Busのメッセージセッションステートを使う

このエントリは2023/02/09現在の情報に基づいています。将来の機能追加や変更に伴い、記載内容からの乖離が発生する可能性があります。

問い合わせ

いつもとは違う問い合わせ主から、以下のような問い合わせが届いた。

Azure Service BusのQueueやTopicのメッセージセッションに付与できるステートは、Azure Portalなどで確認できるか?

そもそもステートって何?という方は、以下のドキュメントを参照。

メッセージ セッションの状態 / Message session state
https://learn.microsoft.com/azure/service-bus-messaging/message-sessions#message-session-state

平たく言えば、いろいろ書き込めるとあるサイズのバイナリオブジェクトで、Service Bus Standard の場合は 256 KB の、Service Bus Premium の場合は 100 MB のサイズのデータを保持できる。

で、これはどこで確認できるのか、ということ。

Azure Portalでは

Azure Portalでは、残念ながら確認できない。Azure Portalに組み込まれたService Bus Explorerでも確認できない。

旧型のService Bus Explorerでは

Azure Portalに統合される以前にアプリケーションとして公開されていたService Bus Explorerであれば、確認できる。

Service Bus Explorer
https://github.com/paolosalvatori/ServiceBusExplorer

ただし、このService Bus ExplorerはAzure Service Busへの接続ではAzure AD認証をサポートしていない点に注意が必要。Service Bus ExplorerからはSession Stateは以下のように見える。

FunctionsなどでSession Stateを確認できるAPIを作成できないか

可能である。ローカルPCからService Principalを使うカスタムアプリケーションを作成することもできるが、FunctionsだとManaged Identityが利用できるので、Service Principalの発行や管理の手間がなくて楽ではある(が、誰もがステートを見えてはいけないはずなので、呼び出し可能なクライアントをコントロールするとか、IPアドレスでフィルタリングするなどの考慮は必要)。

実装にあたっての注意点

[1] 受信用クライアントからのみメッセージセッションのステートを取得・設定できる

メッセージセッションのステートは、ServiceBusReceiverClientServiceBusReceiverAsyncClientクラスのメソッドで設定する。これは、セッションの設定はServiceBusReceiverClientもしくはServiceBusReceiverAsyncClientというクライアントのクラスで設定しているためである。言い換えると、ServiceBusReceivedMessageServiceBusMessageといったメッセージのクラスにはセッションステート設定・読み取りのメソッドはない。

また、このセッションステートは、送信側クライアント、つまりServiceBusSenderClientServiceBusSenderAsyncClientのメソッドでは設定できず、必ず受信側クライアントを使う必要がある(というか、送信側のメソッドに設定できるものがない)。

例えば、以下のようなQueueを読み取るコードがあったとして

DefaultAzureCredential defaultAzureCredential = new DefaultAzureCredentialBuilder().build();
try (var sessionReceiverClient = new ServiceBusClientBuilder()
    .credential(namespace+".servicebus.windows.net", defaultAzureCredential)
    .sessionReceiver()
    .queueName(queueName)
    .receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
    .buildClient()) {
    try (var receiverClient = sessionReceiverClient.acceptSession(sessionId)) {
        IterableStream<ServiceBusReceivedMessage> messages
            = receiverClient.receiveMessages(100, Duration.ofSeconds(1));
        for (ServiceBusReceivedMessage message : messages) {
            logger.log(Level.INFO,
                String.format("ID [%s] SessionId [%s] State[%s] Message [%s]\n",
                message.getMessageId(),
                message.getSessionId(),
                new String(Optional.ofNullable(receiverClient.getSessionState())
                    .orElse("NULL".getBytes(StandardCharsets.UTF_8))),
                message.getBody().toString()));
           
            // Unlock
            receiverClient.abandon(message, new AbandonOptions());
        }
    }
}
catch (IllegalArgumentException | ServiceBusException e) {
        logger.log(Level.SEVERE,
            String.format("ServiceBus exception\n\tMessage [%s]\n\n%s",
                e.getLocalizedMessage(),
                Arrays.toString(e.getStackTrace())));
}

この例の14行目から18行目がメッセージとセッションステートを取り扱っている部分である。メッセージIDやセッションID(もちろんメッセージ本体)などはServiceReceivedMessageクラスのメソッドで取得できるが、セッションステートは以下のメソッドで操作する。

  • 同期クライアント
    • ServiceBusReceiverClient#getSessionState()
    • ServiceBusReceiverClient#setSessionState()
  • 非同期クライアント
    • ServiceBusReceiverAsyncClient#getSessionState()
    • ServiceBusReceiverAsyncClient#setSessionState()
[2] 受信モード

ServiceBusReceverClientインスタンスを生成する際に、PEEK_LOCKRECEIVE_AND_DELETEの2種類のモードを設定できるが、 RECEIVE_AND_DELETE モードでは受信のタイミングでメッセージがQueue/Topicから取り出されてしまうので、ステートだけ変更したいのであれば、

  • PEEK_LOCKモードで受信する
  • completeメソッドではなく、abandonメソッドを使ってロックを解放する

といった工夫が必要である。なお、receiveMessagesメソッドで受信、abandonメソッドでロックを解放する場合、配信回数 (Delivery Count) は1ずつ増える。そのため、Dead Letter Queueに配信されないようにQueue/Topicを構成しなければならない。

コメントを残す

以下に詳細を記入するか、アイコンをクリックしてログインしてください。

WordPress.com ロゴ

WordPress.com アカウントを使ってコメントしています。 ログアウト /  変更 )

Twitter 画像

Twitter アカウントを使ってコメントしています。 ログアウト /  変更 )

Facebook の写真

Facebook アカウントを使ってコメントしています。 ログアウト /  変更 )

%s と連携中