このエントリは以下のエントリをベースにしています。
This entry is based on the following one written by Otavio Santana (Software engineer in Tomitribe).
https://dzone.com/articles/using-jakarta-eemicroprofile-to-connect-to-apache
Apache Kafkaは、分散ストリーミングプラットフォームで、以下の機能を備えています。
- レコードストリームをパブリッシュおよびサブスクライブ
- レコードストリームを耐障害性のある永続的な方法で格納
- ストリーム発生時にストリーム処理
Apache Kafkaは、Javaの世界で成功事例があり、このエントリではJakarta EE/MicroProfileの世界でApache Kafkaを活用する方法をご紹介します。
Apache Kafka Core Concepts
Kafkaは1個以上のサーバー上でクラスタとして動作します。このサーバー(群)は複数のデータセンターにまたがることができます。Kafkaクラスタはトピック(Topic)と呼ばれるカテゴリにレコードストリームを格納し、各レコードはKey、Value、タイムスタンプで構成されています。
ドキュメントによると、Kafkaには主要なAPIが4個あります。
Apache Kafka
https://kafka.apache.org/intro
- Producer API
- アプリケーションが1個以上のKafkaのトピックに対してレコードストリームをパブリッシュするためのAPI
- Consumer API
- アプリケーションが1個以上のトピックをサブスクライブし、トピックに作成されたレコードストリームを処理するためのAPI
- Streams API
- 入力ストリームを1個以上のトピックから消費したり、1個以上のトピックに対して出力ストリームを生成したり、効率的に入力ストリームを出力ストリームに変換したり、という、ストリーム・プロセッサとしてアプリケーションが振る舞うためのAPI
- Connector API
- 既存のアプリケーションやデータシステムとKafkaのトピックを接続する、再利用可能なプロデューサーやコンシューマーを作成、実行するためのAPI
Install Apache Kafka
公式ドキュメントにはApache Kafkaをはじめるにあたってのよいエントリがあり、Zookeeperを使ったインストール方法が紹介されています。かいつまんでご紹介すると、KafkaはZookeeperを使ってクラスタメンバーの構成やトピックの構成などを行います。
Quickstart
https://kafka.apache.org/quickstart
2.1系をダウンロードします。
Downloads
https://kafka.apache.org/downloads
(訳注)
2019/05/01現在、最新版は2.2.0ですが、原文では2.1.0 (Scala 2.11)をインストールするように指示しています。
tar-ballを展開します。
tar -xzf kafka_2.11-2.1.0.tgz cd kafka_2.11-2.1.0
`まず、Zookeeperインスタンスを起動します。
bin/zookeeper-server-start.sh config/zookeeper.properties
そして、Apache Kafkaを起動します。
bin/kafka-server-start.sh config/server.properties
Using Docker
Dockerを使用するという手もあります。ZookeeperとApache Kafkaの2つのDockerイメージが必要なので、このチュートリアルではdocker-composeを使用します。以下の手順に従ってください。
Install Docker
https://docs.docker.com/install/
Install Docker Compose
https://docs.docker.com/compose/install/
docker-compose.ymlを作成し、以下のように構成します。
version: '3.2' services: zookeeper: image: "confluent/zookeeper" networks: - microprofile ports: - 2181:2181 kafka: image: "confluent/kafka" networks: - microprofile ports: - 9092:9092 environment: - KAFKA_ADVERTISED_HOST_NAME=kafka - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 - KAFKA_ADVERTISED_PORT=9092 depends_on: - zookeeper networks: microprofile:
以下のコマンドを実行します。
docker-compose -f docker-compose.yml up -d
localhostで接続できるよう、kafkaをホストのlocalhostとして定義するため、以下の値を /etc/hosts に追加します。
127.0.0.1 localhost kafka
Application With Eclipse MicroProfile
Eclipse MicroProfileは、複数のランタイム間での可搬性があるマイクロサービスアプリケーションプラットフォームを定義するために構成されたオープンソースプロジェクトです。Eclipse MicroProfileには、マイクロサービスとクラウドの時代にエンタープライズ・アプリケーションをより簡単に作成するためのプランがあります。Apache Kafkaと接続するために、Reactive Messageが使えるEclipse MicroProfile Reactiveがあります。
How to Write Reactive Applications with MicroProfile
https://www.eclipse.org/community/eclipse_newsletter/2018/september/reactive_mp.php
CDI 2.0とJava SEを使い、サンプルアプリケーションでApache Kafkaからの送受信メッセージをスムーズに作成しますが、その前に、このプロジェクトではMavenを使用するため、まずは依存関係を定義する必要があります。このプロジェクトは、MicroProfile Configurationの実装、CDI 2.0の実装、およびMicroProfile Reactiveという依存関係を有しています。
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <weld-core.version>3.1.0.Final</weld-core.version> <slf4j-api.version>1.7.26</slf4j-api.version> </properties> <dependencies> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>${slf4j-api.version}</version> <scope>compile</scope> </dependency> <dependency> <groupId>io.smallrye</groupId> <artifactId>smallrye-config</artifactId> <version>1.3.5</version> </dependency> <dependency> <groupId>io.smallrye.reactive</groupId> <artifactId>smallrye-reactive-messaging-provider</artifactId> <version>0.0.4</version> </dependency> <dependency> <groupId>io.smallrye.reactive</groupId> <artifactId>smallrye-reactive-messaging-kafka</artifactId> <version>0.0.4</version> <exclusions> <exclusion> <!-- this avoid having the logging working, to exclude it --> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>io.smallrye.reactive</groupId> <artifactId>smallrye-reactive-streams-operators</artifactId> <version>0.4.1</version> </dependency> <dependency> <groupId>org.jboss.weld.se</groupId> <artifactId>weld-se-core</artifactId> <version>${weld-core.version}</version> </dependency> <dependency> <groupId>javax.enterprise</groupId> <artifactId>cdi-api</artifactId> <version>2.0.SP1</version> </dependency> </dependencies>
続いて、 src/main/resources/META-INFに配置する構成ファイルを作成します。
1個目はCDIを有効にするためのbean.xmlファイルです。
<beans xmlns="http://xmlns.jcp.org/xml/ns/javaee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/beans_1_1.xsd" bean-discovery-mode="all"> </beans>
もう一つはmicroprofile-config.propertiesで、レコードのシリアライズ・デシリアライズの実装など、Apache Kafkaと接続するEclipse MicroProfileへのセットアップ構成をkey-value形式で記述します。
# Kafka Sink smallrye.messaging.sink.data.type=io.smallrye.reactive.messaging.kafka.Kafka smallrye.messaging.sink.data.bootstrap.servers=localhost:9092 smallrye.messaging.sink.data.key.serializer=org.apache.kafka.common.serialization.StringSerializer smallrye.messaging.sink.data.value.serializer=org.apache.kafka.common.serialization.StringSerializer smallrye.messaging.sink.data.acks=1 ## Kafka Source smallrye.messaging.source.kafka.type=io.smallrye.reactive.messaging.kafka.Kafka smallrye.messaging.source.kafka.bootstrap.servers=localhost:9092 smallrye.messaging.source.kafka.topic=kafka smallrye.messaging.source.kafka.group.id=demo smallrye.messaging.source.kafka.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer smallrye.messaging.source.kafka.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
すべての設定が完了したら、最後にApache Kafkaからメッセージを送受信するクラスを作成します。APIでは Incoming と Outgoing という注釈が使えます。 Sender クラスにはBlockingQueueがあり、キューにテキストがあるときにメッセージを送信します。
Interface BlockingQueue
https://docs.oracle.com/javase/jp/8/docs/api/java/util/concurrent/BlockingQueue.html
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/BlockingQueue.html
@ApplicationScoped public class Receiver { private static final Logger LOGGER = Logger.getLogger(Receiver.class.getName()); @Incoming("kafka") public CompletionStage<Void> consume(KafkaMessage<String, String> message) { String payload = message.getPayload(); String key = message.getKey(); MessageHeaders headers = message.getHeaders(); Integer partition = message.getPartition(); Long timestamp = message.getTimestamp(); LOGGER.info("received: " + payload + " from topic " + message.getTopic()); return message.ack(); } }
@ApplicationScoped public class Sender { private static final Logger LOGGER = Logger.getLogger(Sender.class.getName()); private BlockingQueue<String> messages = new LinkedBlockingQueue<>(); public void add(String message) { messages.add(message); } @Outgoing("data") public CompletionStage<KafkaMessage<String, String>> send() { return CompletableFuture.supplyAsync(() -> { try { String message = messages.take(); LOGGER.info("Sending message to kafka with the message: " + message); return KafkaMessage.of("kafka", "key", message); } catch (InterruptedException e) { throw new RuntimeException(e); } }); } }
最後に、CDIコンテナを起動するクラスを作成します。完了したら、Kafkaにメッセージを送信してみましょう。結果はログで確認できます。
public class App { public static void main(String[] args) { SeContainer container = SeContainerInitializer.newInstance().initialize(); Sender sender = container.select(Sender.class).get(); sender.add("Hello world"); sender.add("Otávio"); sender.add("Poliana"); sender.add("Clement"); } }
Apache Kafkaの可能性と、このプロジェクトがBig-Dataプレーヤにとってアクセシブルになった理由がわかりました。この簡単な例から、Eclipse MicroProfileとの統合に不安がないことがわかります。このサンプルコードを手助けしてくれたClement Escoffierに感謝します。サンプルコードはGitHubにUp済みです。
kafka-cdi-microprofile-hello-world – A hello world using Apache Kafka and Eclipse Microprofile
https://github.com/soujava/kafka-cdi-microprofile-hello-world