このエントリは2019/09/12現在の情報に基づくものです。今後の機能追加や変更に伴い、記述内容との乖離が発生する可能性があります。
Azure Service Busはメッセージブローカーで、様々なメッセージをやりとりできるが、メッセージとともにカスタムプロパティを送受信することもできる。
今回はTopicを使って動作確認した。Queueでも基本的には同じである。例によって利用言語はJavaである。
SDKを使う場合
事前準備
pom.xmlに以下の依存関係を追加しておく。
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-servicebus</artifactId>
<version>3.0.0</version>
</dependency>
送信側
通常通りメッセージを作成するが、カスタムプロパティはMapに詰め込んでおき、Message#setProperties()でカスタムプロパティをメッセージに設定する。
Message message = new Message();
message.setMessageBody(MessageBody.fromValueData(messageString));
Map<String, Object> properties = new HashMap<>();
properties.put("account-id", "common");
properties.put("data-partition-id", "common");
properties.put("correlation-id", "12345-67890-abcde-fghij-klmno");
message.setContentType("application/json");
message.setCorrelationId("12345-67890-abcde-fghij");
message.setProperties(properties); // <---- Here!
topicClient.sendAsync(message).thenRunAsync(() -> topicClient.closeAsync());
受信側
messageインスタンスを取得後、Message#getProperties()を呼び出せば、カスタムプロパティを取得できる。
IMessageHandler messageHandler = new IMessageHandler() {
@Override
public CompletableFuture<Void> onMessageAsync(IMessage message) {
if(message.getContentType().contentEquals("application/json")) {
Map<String, Object> properties = message.getProperties(); // <---- !! Here !!
String messageString = (String) message.getMessageBody().getValueData();
}
return subscriptionClient.completeAsync(message.getLockToken());
}
@Override
public void notifyException(Throwable throwable, ExceptionPhase exceptionPhase) {
System.out.println(exceptionPhase + "---" + throwable.getMessage());
}
};
Functionsの場合
事前準備
pom.xmlに以下の依存関係を追加しておく。なお、Functionsのバージョンは2以上を使う。
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-servicebus</artifactId>
<version>3.0.0</version>
</dependency>
送信側(Output)
OutputBindingにMessageを指定すれば、 あとはSDKの場合と同様のお作法でカスタムプロパティを付加して送信できる。
@FunctionName("sboutput")
public HttpResponseMessage send2ServiceBus(
@HttpTrigger(
name = "req",
methods = {HttpMethod.POST},
authLevel = AuthorizationLevel.ANONYMOUS
) HttpRequestMessage<Optional<String>> request,
@ServiceBusTopicOutput(
name = "message",
topicName = "%TOPIC_NAME%",
subscriptionName = "%SUBSCRIPTION_NAME%",
connection = "AzureServiceBusConnectionString"
) OutputBinding<Message> message,
final ExecutionContext context) {
Map<String, Object> properties = new HashMap<>();
properties.put("account-id", "common");
properties.put("data-partition-id", "common");
properties.put("correlation-id", "12345-67890-abcde-fghij-klmno");
String body = request.getBody().get();
MessageBody.fromValueData(body);
Message sendMessage = new Message();
sendMessage.setMessageBody(MessageBody.fromValueData(body));
sendMessage.setContentType("application/json");
sendMessage.setCorrelationId("12345-67890-abcde-fghij");
sendMessage.setProperties(properties);
message.setValue(sendMessage);
return request.createResponseBuilder(HttpStatus.ACCEPTED)
.header("Content-Type","application/json")
.body(body)
.build();
}
受信側(Trigger)
FunctionsはデフォルトでReceiveMode.PEEKLOCKを使っており、成功すると自動的にSubscriptionClient#complete()、失敗するとSubscriptionClient#abandon()が呼ばれることに注意。Functions v2では、com.microsoft.azure.servicebus.Message をBinding変数に指定しておけば、SDKの場合と同じお作法で取得できる。もちろん通常のPOJOも可能。以下はContextTypeとしてapplication/jsonが指定されているメッセージのみ受け取る例。
@FunctionName("sbtrigger")
public void triggerFromServiceBus(
@ServiceBusTopicTrigger(
name = "message",
topicName = "%TOPIC_NAME%",
subscriptionName = "%SUBSCRIPTION_NAME%",
connection = "AzureServiceBusConnectionString"
) Message message,
final ExecutionContext context) {
if (message.getContentType().contentEquals("application/json")) {
context.getLogger().info(message.getMessageBody().getValueData().toString());
Map<String, Object> properties = message.getProperties();
context.getLogger().info(properties.toString());
}
}
SDKとFunctionsを混在させる(例えば送信側がFunctions、受信側がSDK)場合は以下のエントリで。
FunctionsとAzure Service Bus SDKのメッセージ形式の違い
https://logico-jp.io/2019/09/26/message-from-functions-is-not-compatible-with-that-from-sdk/