このエントリは以下のエントリをベースにしています。
This entry is based on the following one written by Otavio Santana (Software engineer in Tomitribe).
https://dzone.com/articles/using-jakarta-eemicroprofile-to-connect-to-apache
Apache Kafkaは、分散ストリーミングプラットフォームで、以下の機能を備えています。
- レコードストリームをパブリッシュおよびサブスクライブ
- レコードストリームを耐障害性のある永続的な方法で格納
- ストリーム発生時にストリーム処理
Apache Kafkaは、Javaの世界で成功事例があり、このエントリではJakarta EE/MicroProfileの世界でApache Kafkaを活用する方法をご紹介します。
Apache Kafka Core Concepts
Kafkaは1個以上のサーバー上でクラスタとして動作します。このサーバー(群)は複数のデータセンターにまたがることができます。Kafkaクラスタはトピック(Topic)と呼ばれるカテゴリにレコードストリームを格納し、各レコードはKey、Value、タイムスタンプで構成されています。
ドキュメントによると、Kafkaには主要なAPIが4個あります。
Apache Kafka
https://kafka.apache.org/intro
- Producer API
- アプリケーションが1個以上のKafkaのトピックに対してレコードストリームをパブリッシュするためのAPI
- Consumer API
- アプリケーションが1個以上のトピックをサブスクライブし、トピックに作成されたレコードストリームを処理するためのAPI
- Streams API
- 入力ストリームを1個以上のトピックから消費したり、1個以上のトピックに対して出力ストリームを生成したり、効率的に入力ストリームを出力ストリームに変換したり、という、ストリーム・プロセッサとしてアプリケーションが振る舞うためのAPI
- Connector API
- 既存のアプリケーションやデータシステムとKafkaのトピックを接続する、再利用可能なプロデューサーやコンシューマーを作成、実行するためのAPI
Install Apache Kafka
公式ドキュメントにはApache Kafkaをはじめるにあたってのよいエントリがあり、Zookeeperを使ったインストール方法が紹介されています。かいつまんでご紹介すると、KafkaはZookeeperを使ってクラスタメンバーの構成やトピックの構成などを行います。
Quickstart
https://kafka.apache.org/quickstart
2.1系をダウンロードします。
Downloads
https://kafka.apache.org/downloads
(訳注)
2019/05/01現在、最新版は2.2.0ですが、原文では2.1.0 (Scala 2.11)をインストールするように指示しています。
tar-ballを展開します。
tar -xzf kafka_2.11-2.1.0.tgz
cd kafka_2.11-2.1.0
`まず、Zookeeperインスタンスを起動します。
bin/zookeeper-server-start.sh config/zookeeper.properties
そして、Apache Kafkaを起動します。
bin/kafka-server-start.sh config/server.properties
Using Docker
Dockerを使用するという手もあります。ZookeeperとApache Kafkaの2つのDockerイメージが必要なので、このチュートリアルではdocker-composeを使用します。以下の手順に従ってください。
Install Docker
https://docs.docker.com/install/
Install Docker Compose
https://docs.docker.com/compose/install/
docker-compose.ymlを作成し、以下のように構成します。
version: '3.2'
services:
zookeeper:
image: "confluent/zookeeper"
networks:
- microprofile
ports:
- 2181:2181
kafka:
image: "confluent/kafka"
networks:
- microprofile
ports:
- 9092:9092
environment:
- KAFKA_ADVERTISED_HOST_NAME=kafka
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_ADVERTISED_PORT=9092
depends_on:
- zookeeper
networks:
microprofile:
以下のコマンドを実行します。
docker-compose -f docker-compose.yml up -d
localhostで接続できるよう、kafkaをホストのlocalhostとして定義するため、以下の値を /etc/hosts に追加します。
127.0.0.1 localhost kafka
Application With Eclipse MicroProfile
Eclipse MicroProfileは、複数のランタイム間での可搬性があるマイクロサービスアプリケーションプラットフォームを定義するために構成されたオープンソースプロジェクトです。Eclipse MicroProfileには、マイクロサービスとクラウドの時代にエンタープライズ・アプリケーションをより簡単に作成するためのプランがあります。Apache Kafkaと接続するために、Reactive Messageが使えるEclipse MicroProfile Reactiveがあります。
How to Write Reactive Applications with MicroProfile
https://www.eclipse.org/community/eclipse_newsletter/2018/september/reactive_mp.php
CDI 2.0とJava SEを使い、サンプルアプリケーションでApache Kafkaからの送受信メッセージをスムーズに作成しますが、その前に、このプロジェクトではMavenを使用するため、まずは依存関係を定義する必要があります。このプロジェクトは、MicroProfile Configurationの実装、CDI 2.0の実装、およびMicroProfile Reactiveという依存関係を有しています。
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<weld-core.version>3.1.0.Final</weld-core.version>
<slf4j-api.version>1.7.26</slf4j-api.version>
</properties>
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>${slf4j-api.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>io.smallrye</groupId>
<artifactId>smallrye-config</artifactId>
<version>1.3.5</version>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-provider</artifactId>
<version>0.0.4</version>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-kafka</artifactId>
<version>0.0.4</version>
<exclusions>
<exclusion>
<!-- this avoid having the logging working, to exclude it -->
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-streams-operators</artifactId>
<version>0.4.1</version>
</dependency>
<dependency>
<groupId>org.jboss.weld.se</groupId>
<artifactId>weld-se-core</artifactId>
<version>${weld-core.version}</version>
</dependency>
<dependency>
<groupId>javax.enterprise</groupId>
<artifactId>cdi-api</artifactId>
<version>2.0.SP1</version>
</dependency>
</dependencies>
続いて、 src/main/resources/META-INFに配置する構成ファイルを作成します。
1個目はCDIを有効にするためのbean.xmlファイルです。
<beans xmlns="http://xmlns.jcp.org/xml/ns/javaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee
http://xmlns.jcp.org/xml/ns/javaee/beans_1_1.xsd"
bean-discovery-mode="all">
</beans>
もう一つはmicroprofile-config.propertiesで、レコードのシリアライズ・デシリアライズの実装など、Apache Kafkaと接続するEclipse MicroProfileへのセットアップ構成をkey-value形式で記述します。
# Kafka Sink
smallrye.messaging.sink.data.type=io.smallrye.reactive.messaging.kafka.Kafka
smallrye.messaging.sink.data.bootstrap.servers=localhost:9092
smallrye.messaging.sink.data.key.serializer=org.apache.kafka.common.serialization.StringSerializer
smallrye.messaging.sink.data.value.serializer=org.apache.kafka.common.serialization.StringSerializer
smallrye.messaging.sink.data.acks=1
## Kafka Source
smallrye.messaging.source.kafka.type=io.smallrye.reactive.messaging.kafka.Kafka
smallrye.messaging.source.kafka.bootstrap.servers=localhost:9092
smallrye.messaging.source.kafka.topic=kafka
smallrye.messaging.source.kafka.group.id=demo
smallrye.messaging.source.kafka.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
smallrye.messaging.source.kafka.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
すべての設定が完了したら、最後にApache Kafkaからメッセージを送受信するクラスを作成します。APIでは Incoming と Outgoing という注釈が使えます。 Sender クラスにはBlockingQueueがあり、キューにテキストがあるときにメッセージを送信します。
Interface BlockingQueue
https://docs.oracle.com/javase/jp/8/docs/api/java/util/concurrent/BlockingQueue.html
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/BlockingQueue.html
@ApplicationScoped
public class Receiver {
private static final Logger LOGGER = Logger.getLogger(Receiver.class.getName());
@Incoming("kafka")
public CompletionStage<Void> consume(KafkaMessage<String, String> message) {
String payload = message.getPayload();
String key = message.getKey();
MessageHeaders headers = message.getHeaders();
Integer partition = message.getPartition();
Long timestamp = message.getTimestamp();
LOGGER.info("received: " + payload + " from topic " + message.getTopic());
return message.ack();
}
}
@ApplicationScoped
public class Sender {
private static final Logger LOGGER = Logger.getLogger(Sender.class.getName());
private BlockingQueue<String> messages = new LinkedBlockingQueue<>();
public void add(String message) {
messages.add(message);
}
@Outgoing("data")
public CompletionStage<KafkaMessage<String, String>> send() {
return CompletableFuture.supplyAsync(() -> {
try {
String message = messages.take();
LOGGER.info("Sending message to kafka with the message: " + message);
return KafkaMessage.of("kafka", "key", message);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
}
}
最後に、CDIコンテナを起動するクラスを作成します。完了したら、Kafkaにメッセージを送信してみましょう。結果はログで確認できます。
public class App {
public static void main(String[] args) {
SeContainer container = SeContainerInitializer.newInstance().initialize();
Sender sender = container.select(Sender.class).get();
sender.add("Hello world");
sender.add("Otávio");
sender.add("Poliana");
sender.add("Clement");
}
}
Apache Kafkaの可能性と、このプロジェクトがBig-Dataプレーヤにとってアクセシブルになった理由がわかりました。この簡単な例から、Eclipse MicroProfileとの統合に不安がないことがわかります。このサンプルコードを手助けしてくれたClement Escoffierに感謝します。サンプルコードはGitHubにUp済みです。
kafka-cdi-microprofile-hello-world – A hello world using Apache Kafka and Eclipse Microprofile
https://github.com/soujava/kafka-cdi-microprofile-hello-world