原文はこちら。
The original article was written by Daniel Kec (Java developer at Oracle).
https://medium.com/helidon/reactive-messaging-with-helidon-2-0-f5de1ca5dc63
非同期メッセージングはマイクロサービスの世界でよく使われる通信形式で、MicroProfile Reactive Messaging Specificationは、ものごとの接続方式を形式化する必要性に対する必然的な反応です。
MicroProfile Reactive Messaging Specification
https://download.eclipse.org/microprofile/microprofile-reactive-messaging-1.0/microprofile-reactive-messaging-spec.html
非同期通信はどうするのが最善なのでしょうか。もちろん、Reactive Streamsですね。Reactive Streamsでの操作を形式化するため、別の仕様を作る必要がありました。それがMicroProfile Reactive Streams Operatorsです。Reactive Messagingは既存の実装間での可搬性をもたらすために標準化されたoperatorに大きく依存しています。
Reactive Streams
https://www.reactive-streams.org/
MicroProfile Reactive Streams Operators Specification
https://download.eclipse.org/microprofile/microprofile-reactive-streams-operators-1.0.1/microprofile-reactive-streams-operators-spec.html
HelidonのReactive MessagingとReactive Streams Operatorsのプレリリース版がHelidon 2.0.0-M2リリースに含まれています。
Helidon 2.0.0-M2 Release Notes
https://github.com/oracle/helidon/releases/tag/2.0.0-M2
[注意]
プレリリース版は実験的なもので、本番環境での利用を想定していません。APIや機能はまだ完全にテストされておらず、変更される可能性があります。
Reactive Messaging
ものごとをつなぐ、これこそがReactive Messagingがやることです。Reactive Messagingは、Reactive Streamsを構成するために必要なplumbing tools(配管工具)であり、これにより非同期メッセージングをより効率的に取り扱うことができます。これは、メソッドやコネクタの名前付きペアをチャネルに接続することで実現します。
どのように動作するのか見てみましょう。
@Outgoing("publisher-payload") | |
public PublisherBuilder<Integer> streamOfMessages() { | |
return ReactiveStreams.of(1, 2, 3); | |
} | |
@Incoming("publisher-payload") | |
@Outgoing("wrapped-message") | |
public Message<String> rewrapMessageManually(Message<Integer> message) { | |
return Message.of(Integer.toString(message.getPayload())); | |
} | |
@Incoming("wrapped-message") | |
public void consumeImplicitlyUnwrappedMessage(String value) { | |
System.out.println("Consuming message: " + value); | |
} | |
> Consuming message: 1 | |
> Consuming message: 2 | |
> Consuming message: 3 |
2メソッド間のチャネル
この例では、3個のメッセージングメソッドを2個のチャネルで接続しています。最初のメソッドはアセンブリ時に呼び出され、2つ目のメソッドrewrapMessageManuallyに接続される整数のパブリッシャーを準備します。
2つ目のメソッドはパブリッシャーから届く全てのアイテムに対して呼び出されます。メッセージングは動的に全ての整数アイテムをMessagingラッパーにラップします。これは2つ目のメソッドの引数型です。その後、メソッドのユーザーコードがオリジナルのペイロードを取り出し、Stringに変換の後、再度ラップします。rewrapMessageManuallyメソッドは3つ目のメソッドに接続されます。3つ目のメソッドは到達する全てのペイロードを消費し、標準出力に表示します。再度、メッセージングはメソッドシグネチャに基づいて自動的にMessageラッパーからペイロードを取り出します。
この例は非常にシンプルですが、メッセージングチャネルが全てのapplication-scoped beanや様々なコネクタにまたがって接続可能であることがわかると、このツールが非常に強力であることがわかるでしょう。このようにクリーンでリアクティブな方法ですべてのメッセージを覗いたり変換したりしながら、JMSをKafkaと接続することを想像してみてください。
Connectors
同じアプリケーションコンテキストのbean間のメッセージングであれば、設定可能なコネクタを使わないのであればただのおもちゃになってしまいます。現在、Helidon 2.0用の本格的なKafkaコネクタを準備しており、その後につづくコネクタを多数準備中です。しかし、特定のユースケースに特化したカスタムメイドのコネクタを作成することほど簡単なことはありません。結局のところ、それはただのbeanだからです。
@ApplicationScoped | |
@Connector("example-connector") | |
public class ExampleConnector implements IncomingConnectorFactory, OutgoingConnectorFactory { | |
@Override | |
public PublisherBuilder<? extends Message<?>> getPublisherBuilder(Config config) { | |
return ReactiveStreams.of("foo", "bar") | |
.map(Message::of); | |
} | |
@Override | |
public SubscriberBuilder<? extends Message<?>, Void> getSubscriberBuilder(Config config) { | |
return ReactiveStreams.<Message<?>>builder() | |
.map(Message::getPayload) | |
.forEach(o -> System.out.println("Connector says: " + o)); | |
} | |
} |
では、MicroProfile configと接続するためのチャネルを構成します。
Configuration for MicroProfile
https://download.eclipse.org/microprofile/microprofile-config-1.4/microprofile-config-spec.html
mp.messaging.incoming.from-connector-channel.connector=example-connector | |
mp.messaging.incoming.to-connector-channel.connector=example-connector |
ストリーミングしてみます。
@Outgoing("from-connector-channel") | |
public Publisher<Integer> produceInts() { | |
return Flowable.just(1, 2, 3); | |
} | |
> Connector says: 1 | |
> Connector says: 2 | |
> Connector says: 3 |
コネクタで読み取りましょう。
@Incoming("to-connector-channel") | |
public void consumeInt(Integer value) { | |
System.out.println("Consuming Integer: " + value); | |
} | |
> Consuming Integer: 1 | |
> Consuming Integer: 2 | |
> Consuming Integer: 3 |
ご覧の通り、構成はシンプルかつ簡単です。まさにマイクロサービスのあるべき姿です。
ここで、メッセージの受信確認方法が必要です。回復性のあるメッセージングシステムを構築したい場合、KafkaやJMSセッションからプルされたメッセージのオフセットをコミットするためには、受信確認はきわめて重要です。
Acknowledgement
最初の例で気付いてらっしゃるかもしれませんが、リアクティブメッセージングを使うと半自動的にあらゆるストリーミングされたアイテムをラッピングされます。ラッパーエンベロープであるMessageはペイロードだけでなく、メッセージの確認時に送信されるコードバックも取り得ます。メッセージングには自動もしくは手動でのメッセージ確認手法があります。
Message
https://download.eclipse.org/microprofile/microprofile-reactive-messaging-1.0/microprofile-reactive-messaging-spec.html#_message
Acknowledgement Examples
https://download.eclipse.org/microprofile/microprofile-reactive-messaging-1.0/microprofile-reactive-messaging-spec.html#_acknowledgement_examples
最も簡単な利用例です。
// Create a message | |
Message<String> message = Message.of("payload!", () -> { | |
System.out.println("Executed when acked!"); | |
return CompletableFuture.completedFuture(null); | |
}); | |
// Acknowledge | |
message.ack(); |
メッセージングにまかせる例です。
@Outgoing("test-channel") | |
public Publisher<Message<String>> produceMessage() { | |
return ReactiveStreams.of(Message.of("test-data", () -> { | |
System.out.println("Message acked!"); | |
return CompletableFuture.completedStage(null); | |
})).buildRs(); | |
} | |
@Incoming("test-channel") | |
@Acknowledgment(Acknowledgment.Strategy.POST_PROCESSING) | |
public void receiveMessage(String msg) { | |
System.out.println("Received payload:" + msg); | |
} | |
> Received payload: test-data | |
> Message acked! |
たくさんの可能性がありますので、網羅的なリストは仕様を確認してください。
Message acknowledgement
https://download.eclipse.org/microprofile/microprofile-reactive-messaging-1.0/microprofile-reactive-messaging-spec.html#_message_acknowledgement
Reactive Streams Operators
Operators SPIが限られた数の最も重要なoperatorsで実際の実装からユーザーコードを抽象化するため、その他のリアクティブ・ライブラリのAPIと比較して、MicroProfile Reactive Streams Operatorsは少々Spartanと思われるかもしれません。
MicroProfile Reactive Streams Operators Specification
https://download.eclipse.org/microprofile/microprofile-reactive-streams-operators-1.0.1/microprofile-reactive-streams-operators-spec.html
AtomicInteger sum = new AtomicInteger(); | |
ReactiveStreams.of("1", "2", "3", "4", "5") | |
.limit(3) | |
.map(Integer::parseInt) | |
.forEach(sum::addAndGet) | |
.run() | |
.whenComplete((r, t) -> System.out.println("Sum: " + sum.get())); | |
> Sum: 6 |
でも心配しないでください。引き続きお気に入りのリアクティブ・ライブラリを利用できます。リアクティブ・ライブラリに必要なのは、一般的なReactive Streams API を実装することだけです。いくつかの可能性を比較してみましょう。
Package org.reactivestreams
https://www.reactive-streams.org/reactive-streams-1.0.3-javadoc/org/reactivestreams/package-summary.html
RxJava:
AtomicInteger sum = new AtomicInteger(); | |
Flowable<Integer> flowable = Flowable.just("1", "2", "3", "4", "5") | |
.map(Integer::parseInt); | |
ReactiveStreams.fromPublisher(flowable) | |
.limit(3) | |
.forEach(sum::addAndGet) | |
.run() | |
.whenComplete((r, t) -> System.out.println("Sum: " + sum.get())); | |
> Sum: 6 |
Reactor:
AtomicInteger sum = new AtomicInteger(); | |
Flux<Integer> flux = Flux.just("1", "2", "3", "4", "5") | |
.map(Integer::parseInt); | |
ReactiveStreams.fromPublisher(flux) | |
.limit(3) | |
.forEach(sum::addAndGet) | |
.run() | |
.whenComplete((r, t) -> System.out.println("Sum: " + sum.get())); | |
> Sum: 6 |
ほら、必要な配管工具は全て揃っているので、問題ありません。
また、メッセージングについても同様で、様々なメッセージングメソッドのシグネチャが用意されており、reactive streamsのPublisher/Subscriber/Processorとの直接対話を可能にしています。
Supported method signatures
https://download.eclipse.org/microprofile/microprofile-reactive-messaging-1.0/microprofile-reactive-messaging-spec.html#_supported_method_signatures
@Outgoing("int-channel") | |
public Publisher<Integer> produceInts() { | |
return Flowable.just(1, 2, 3); | |
} | |
@Incoming("int-channel") | |
public void consumeInt(Integer value) { | |
System.out.println("Consuming Integer: " + value); | |
} | |
> Consuming Integer: 1 | |
> Consuming Integer: 2 | |
> Consuming Integer: 3 |
Reactive Streams in Helidon SE
MicroProfile Reactive Streams Operatorsはその他のReactive Streamsライブラリと同様、Helidon SEでご利用いただけます。依存関係のみ追加すればすぐに利用できます。
<dependency> | |
<groupId>io.helidon.microprofile.reactive-streams</groupId> | |
<artifactId>helidon-microprofile-reactive-streams</artifactId> | |
<version>${helidon.version}</version> | |
</dependency> |
Helidon reactive engine
Helidonは自身でリアクティブエンジンを備えています。つまり、独自のreactive operatorとjava.util.concurrent.Flowをベースにした独自のAPIがあります。MultiとSingleでできることはたくさんありますが、別の一連の記事全体のためには十分です。以下の例はプレビューです。
Multi.just(1, 2, 3, 4, 5) | |
.flatMap(i -> Multi.just(i, i)) | |
.limit(4) | |
.distinct() | |
.forEach(System.out::println); | |
> 1 | |
> 2 |
世界的に有名なリアクティブプログラミングの専門家であり、RxJavaのプロジェクトリーダーであり、Reactorプロジェクトの共同創設者でもあるDavid Karnok博士が、Helidon 2.0のためにHelidonのリアクティブエンジンの新しい実装に貢献してくださったことを誇りに思います。この貢献により、パフォーマンスの飛躍的な向上だけでなく、多くの新機能、一連の新しいクールなoperator、そしてもちろんHelidon独自のMicroProfile Reactive Streams Operatorsの実装のファインチューニングがもたらされました。
David Karnok (Twitter)
https://twitter.com/akarnokd
MicroProfile Reactive Streams Operators Specification
https://download.eclipse.org/microprofile/microprofile-reactive-streams-operators-1.0.1/microprofile-reactive-streams-operators-spec.html
だからHelidon 2.0のreactive messagingは速く飛べるのです。
新しいreactiveの機能をご自身でお試しください。Server Sent EventsでReactive Messagingを利用するサンプルを確認ください。これは既にリリース済みのHelidon 2.0.0-M2で動作します。
Helidon Reactive Messaging Example
https://github.com/danielkec/helidon-messaging-example
そして、フィードバックを原文のコメント欄やTwitter、Slackに残してください。どうぞよろしくお願いいたします。
Project Helidon (Twitter)
https://twitter.com/helidon_project