Reactive Messaging with Helidon 2.0

原文はこちら。
The original article was written by Daniel Kec (Java developer at Oracle).
https://medium.com/helidon/reactive-messaging-with-helidon-2-0-f5de1ca5dc63

非同期メッセージングはマイクロサービスの世界でよく使われる通信形式で、MicroProfile Reactive Messaging Specificationは、ものごとの接続方式を形式化する必要性に対する必然的な反応です。

MicroProfile Reactive Messaging Specification
https://download.eclipse.org/microprofile/microprofile-reactive-messaging-1.0/microprofile-reactive-messaging-spec.html

非同期通信はどうするのが最善なのでしょうか。もちろん、Reactive Streamsですね。Reactive Streamsでの操作を形式化するため、別の仕様を作る必要がありました。それがMicroProfile Reactive Streams Operatorsです。Reactive Messagingは既存の実装間での可搬性をもたらすために標準化されたoperatorに大きく依存しています。

Reactive Streams
https://www.reactive-streams.org/
MicroProfile Reactive Streams Operators Specification
https://download.eclipse.org/microprofile/microprofile-reactive-streams-operators-1.0.1/microprofile-reactive-streams-operators-spec.html

HelidonのReactive MessagingとReactive Streams Operatorsのプレリリース版がHelidon 2.0.0-M2リリースに含まれています。

Helidon 2.0.0-M2 Release Notes
https://github.com/oracle/helidon/releases/tag/2.0.0-M2

[注意]
プレリリース版は実験的なもので、本番環境での利用を想定していません。APIや機能はまだ完全にテストされておらず、変更される可能性があります。


Reactive Messaging

ものごとをつなぐ、これこそがReactive Messagingがやることです。Reactive Messagingは、Reactive Streamsを構成するために必要なplumbing tools(配管工具)であり、これにより非同期メッセージングをより効率的に取り扱うことができます。これは、メソッドやコネクタの名前付きペアをチャネルに接続することで実現します。

どのように動作するのか見てみましょう。

@Outgoing("publisher-payload")
public PublisherBuilder<Integer> streamOfMessages() {
return ReactiveStreams.of(1, 2, 3);
}
@Incoming("publisher-payload")
@Outgoing("wrapped-message")
public Message<String> rewrapMessageManually(Message<Integer> message) {
return Message.of(Integer.toString(message.getPayload()));
}
@Incoming("wrapped-message")
public void consumeImplicitlyUnwrappedMessage(String value) {
System.out.println("Consuming message: " + value);
}
> Consuming message: 1
> Consuming message: 2
> Consuming message: 3
view raw ma1.java hosted with ❤ by GitHub

2メソッド間のチャネル

この例では、3個のメッセージングメソッドを2個のチャネルで接続しています。最初のメソッドはアセンブリ時に呼び出され、2つ目のメソッドrewrapMessageManuallyに接続される整数のパブリッシャーを準備します。

2つ目のメソッドはパブリッシャーから届く全てのアイテムに対して呼び出されます。メッセージングは動的に全ての整数アイテムをMessagingラッパーにラップします。これは2つ目のメソッドの引数型です。その後、メソッドのユーザーコードがオリジナルのペイロードを取り出し、Stringに変換の後、再度ラップします。rewrapMessageManuallyメソッドは3つ目のメソッドに接続されます。3つ目のメソッドは到達する全てのペイロードを消費し、標準出力に表示します。再度、メッセージングはメソッドシグネチャに基づいて自動的にMessageラッパーからペイロードを取り出します。

Messageラッパー
https://download.eclipse.org/microprofile/microprofile-reactive-messaging-1.0/microprofile-reactive-messaging-spec.html#_message

この例は非常にシンプルですが、メッセージングチャネルが全てのapplication-scoped beanや様々なコネクタにまたがって接続可能であることがわかると、このツールが非常に強力であることがわかるでしょう。このようにクリーンでリアクティブな方法ですべてのメッセージを覗いたり変換したりしながら、JMSをKafkaと接続することを想像してみてください。

Connectors

同じアプリケーションコンテキストのbean間のメッセージングであれば、設定可能なコネクタを使わないのであればただのおもちゃになってしまいます。現在、Helidon 2.0用の本格的なKafkaコネクタを準備しており、その後につづくコネクタを多数準備中です。しかし、特定のユースケースに特化したカスタムメイドのコネクタを作成することほど簡単なことはありません。結局のところ、それはただのbeanだからです。

Connector
https://download.eclipse.org/microprofile/microprofile-reactive-messaging-1.0/microprofile-reactive-messaging-spec.html#_connector

@ApplicationScoped
@Connector("example-connector")
public class ExampleConnector implements IncomingConnectorFactory, OutgoingConnectorFactory {
@Override
public PublisherBuilder<? extends Message<?>> getPublisherBuilder(Config config) {
return ReactiveStreams.of("foo", "bar")
.map(Message::of);
}
@Override
public SubscriberBuilder<? extends Message<?>, Void> getSubscriberBuilder(Config config) {
return ReactiveStreams.<Message<?>>builder()
.map(Message::getPayload)
.forEach(o -> System.out.println("Connector says: " + o));
}
}
view raw ma2.java hosted with ❤ by GitHub

では、MicroProfile configと接続するためのチャネルを構成します。

Configuration for MicroProfile
https://download.eclipse.org/microprofile/microprofile-config-1.4/microprofile-config-spec.html

mp.messaging.incoming.from-connector-channel.connector=example-connector
mp.messaging.incoming.to-connector-channel.connector=example-connector
view raw ma3.properties hosted with ❤ by GitHub

ストリーミングしてみます。

@Outgoing("from-connector-channel")
public Publisher<Integer> produceInts() {
return Flowable.just(1, 2, 3);
}
> Connector says: 1
> Connector says: 2
> Connector says: 3
view raw ma4.java hosted with ❤ by GitHub

コネクタで読み取りましょう。

@Incoming("to-connector-channel")
public void consumeInt(Integer value) {
System.out.println("Consuming Integer: " + value);
}
> Consuming Integer: 1
> Consuming Integer: 2
> Consuming Integer: 3
view raw ma5.java hosted with ❤ by GitHub

ご覧の通り、構成はシンプルかつ簡単です。まさにマイクロサービスのあるべき姿です。

ここで、メッセージの受信確認方法が必要です。回復性のあるメッセージングシステムを構築したい場合、KafkaやJMSセッションからプルされたメッセージのオフセットをコミットするためには、受信確認はきわめて重要です。

Acknowledgement

最初の例で気付いてらっしゃるかもしれませんが、リアクティブメッセージングを使うと半自動的にあらゆるストリーミングされたアイテムをラッピングされます。ラッパーエンベロープであるMessageはペイロードだけでなく、メッセージの確認時に送信されるコードバックも取り得ます。メッセージングには自動もしくは手動でのメッセージ確認手法があります。

Message
https://download.eclipse.org/microprofile/microprofile-reactive-messaging-1.0/microprofile-reactive-messaging-spec.html#_message
Acknowledgement Examples
https://download.eclipse.org/microprofile/microprofile-reactive-messaging-1.0/microprofile-reactive-messaging-spec.html#_acknowledgement_examples

最も簡単な利用例です。

// Create a message
Message<String> message = Message.of("payload!", () -> {
System.out.println("Executed when acked!");
return CompletableFuture.completedFuture(null);
});
// Acknowledge
message.ack();
view raw ma6.java hosted with ❤ by GitHub

メッセージングにまかせる例です。

@Outgoing("test-channel")
public Publisher<Message<String>> produceMessage() {
return ReactiveStreams.of(Message.of("test-data", () -> {
System.out.println("Message acked!");
return CompletableFuture.completedStage(null);
})).buildRs();
}
@Incoming("test-channel")
@Acknowledgment(Acknowledgment.Strategy.POST_PROCESSING)
public void receiveMessage(String msg) {
System.out.println("Received payload:" + msg);
}
> Received payload: test-data
> Message acked!
view raw ma7.java hosted with ❤ by GitHub

たくさんの可能性がありますので、網羅的なリストは仕様を確認してください。

Message acknowledgement
https://download.eclipse.org/microprofile/microprofile-reactive-messaging-1.0/microprofile-reactive-messaging-spec.html#_message_acknowledgement

Reactive Streams Operators

Operators SPIが限られた数の最も重要なoperatorsで実際の実装からユーザーコードを抽象化するため、その他のリアクティブ・ライブラリのAPIと比較して、MicroProfile Reactive Streams Operatorsは少々Spartanと思われるかもしれません。

MicroProfile Reactive Streams Operators Specification
https://download.eclipse.org/microprofile/microprofile-reactive-streams-operators-1.0.1/microprofile-reactive-streams-operators-spec.html

AtomicInteger sum = new AtomicInteger();
ReactiveStreams.of("1", "2", "3", "4", "5")
.limit(3)
.map(Integer::parseInt)
.forEach(sum::addAndGet)
.run()
.whenComplete((r, t) -> System.out.println("Sum: " + sum.get()));
> Sum: 6
view raw ma8.java hosted with ❤ by GitHub

でも心配しないでください。引き続きお気に入りのリアクティブ・ライブラリを利用できます。リアクティブ・ライブラリに必要なのは、一般的なReactive Streams API を実装することだけです。いくつかの可能性を比較してみましょう。

Package org.reactivestreams
https://www.reactive-streams.org/reactive-streams-1.0.3-javadoc/org/reactivestreams/package-summary.html

RxJava:

AtomicInteger sum = new AtomicInteger();
Flowable<Integer> flowable = Flowable.just("1", "2", "3", "4", "5")
.map(Integer::parseInt);
ReactiveStreams.fromPublisher(flowable)
.limit(3)
.forEach(sum::addAndGet)
.run()
.whenComplete((r, t) -> System.out.println("Sum: " + sum.get()));
> Sum: 6
view raw ma9.java hosted with ❤ by GitHub

Reactor:

AtomicInteger sum = new AtomicInteger();
Flux<Integer> flux = Flux.just("1", "2", "3", "4", "5")
.map(Integer::parseInt);
ReactiveStreams.fromPublisher(flux)
.limit(3)
.forEach(sum::addAndGet)
.run()
.whenComplete((r, t) -> System.out.println("Sum: " + sum.get()));
> Sum: 6
view raw ma10.java hosted with ❤ by GitHub

ほら、必要な配管工具は全て揃っているので、問題ありません。

また、メッセージングについても同様で、様々なメッセージングメソッドのシグネチャが用意されており、reactive streamsのPublisher/Subscriber/Processorとの直接対話を可能にしています。

Supported method signatures
https://download.eclipse.org/microprofile/microprofile-reactive-messaging-1.0/microprofile-reactive-messaging-spec.html#_supported_method_signatures

@Outgoing("int-channel")
public Publisher<Integer> produceInts() {
return Flowable.just(1, 2, 3);
}
@Incoming("int-channel")
public void consumeInt(Integer value) {
System.out.println("Consuming Integer: " + value);
}
> Consuming Integer: 1
> Consuming Integer: 2
> Consuming Integer: 3
view raw ma11.java hosted with ❤ by GitHub

Reactive Streams in Helidon SE

MicroProfile Reactive Streams Operatorsはその他のReactive Streamsライブラリと同様、Helidon SEでご利用いただけます。依存関係のみ追加すればすぐに利用できます。

<dependency>
<groupId>io.helidon.microprofile.reactive-streams</groupId>
<artifactId>helidon-microprofile-reactive-streams</artifactId>
<version>${helidon.version}</version>
</dependency>
view raw ma12.xml hosted with ❤ by GitHub

Helidon reactive engine

Helidonは自身でリアクティブエンジンを備えています。つまり、独自のreactive operatorとjava.util.concurrent.Flowをベースにした独自のAPIがあります。MultiSingleでできることはたくさんありますが、別の一連の記事全体のためには十分です。以下の例はプレビューです。

Multi.just(1, 2, 3, 4, 5)
.flatMap(i -> Multi.just(i, i))
.limit(4)
.distinct()
.forEach(System.out::println);
> 1
> 2
view raw ma13.java hosted with ❤ by GitHub

世界的に有名なリアクティブプログラミングの専門家であり、RxJavaのプロジェクトリーダーであり、Reactorプロジェクトの共同創設者でもあるDavid Karnok博士が、Helidon 2.0のためにHelidonのリアクティブエンジンの新しい実装に貢献してくださったことを誇りに思います。この貢献により、パフォーマンスの飛躍的な向上だけでなく、多くの新機能、一連の新しいクールなoperator、そしてもちろんHelidon独自のMicroProfile Reactive Streams Operatorsの実装のファインチューニングがもたらされました。

David Karnok (Twitter)
https://twitter.com/akarnokd
MicroProfile Reactive Streams Operators Specification
https://download.eclipse.org/microprofile/microprofile-reactive-streams-operators-1.0.1/microprofile-reactive-streams-operators-spec.html

だからHelidon 2.0のreactive messagingは速く飛べるのです。

新しいreactiveの機能をご自身でお試しください。Server Sent EventsでReactive Messagingを利用するサンプルを確認ください。これは既にリリース済みのHelidon 2.0.0-M2で動作します。

Helidon Reactive Messaging Example
https://github.com/danielkec/helidon-messaging-example

そして、フィードバックを原文のコメント欄やTwitter、Slackに残してください。どうぞよろしくお願いいたします。

Project Helidon (Twitter)
https://twitter.com/helidon_project

コメントを残す

以下に詳細を記入するか、アイコンをクリックしてログインしてください。

WordPress.com ロゴ

WordPress.com アカウントを使ってコメントしています。 ログアウト /  変更 )

Facebook の写真

Facebook アカウントを使ってコメントしています。 ログアウト /  変更 )

%s と連携中