日別アーカイブ: 2019年5月1日

Using Jakarta EE/MicroProfile to Connect to Apache Kafka: Part 1 — Hello World

このエントリは以下のエントリをベースにしています。
This entry is based on the following one written by Otavio Santana (Software engineer in Tomitribe).
https://dzone.com/articles/using-jakarta-eemicroprofile-to-connect-to-apache

Apache Kafkaは、分散ストリーミングプラットフォームで、以下の機能を備えています。

  • レコードストリームをパブリッシュおよびサブスクライブ
  • レコードストリームを耐障害性のある永続的な方法で格納
  • ストリーム発生時にストリーム処理

Apache Kafkaは、Javaの世界で成功事例があり、このエントリではJakarta EE/MicroProfileの世界でApache Kafkaを活用する方法をご紹介します。

Apache Kafka Core Concepts

Kafkaは1個以上のサーバー上でクラスタとして動作します。このサーバー(群)は複数のデータセンターにまたがることができます。Kafkaクラスタはトピック(Topic)と呼ばれるカテゴリにレコードストリームを格納し、各レコードはKey、Value、タイムスタンプで構成されています。

ドキュメントによると、Kafkaには主要なAPIが4個あります。

Apache Kafka
https://kafka.apache.org/intro

  • Producer API
    • アプリケーションが1個以上のKafkaのトピックに対してレコードストリームをパブリッシュするためのAPI
  • Consumer API
    • アプリケーションが1個以上のトピックをサブスクライブし、トピックに作成されたレコードストリームを処理するためのAPI
  • Streams API
    • 入力ストリームを1個以上のトピックから消費したり、1個以上のトピックに対して出力ストリームを生成したり、効率的に入力ストリームを出力ストリームに変換したり、という、ストリーム・プロセッサとしてアプリケーションが振る舞うためのAPI
  • Connector API
    • 既存のアプリケーションやデータシステムとKafkaのトピックを接続する、再利用可能なプロデューサーやコンシューマーを作成、実行するためのAPI

Install Apache Kafka

公式ドキュメントにはApache Kafkaをはじめるにあたってのよいエントリがあり、Zookeeperを使ったインストール方法が紹介されています。かいつまんでご紹介すると、KafkaはZookeeperを使ってクラスタメンバーの構成やトピックの構成などを行います。

Quickstart
https://kafka.apache.org/quickstart

2.1系をダウンロードします。

Downloads
https://kafka.apache.org/downloads

(訳注)
2019/05/01現在、最新版は2.2.0ですが、原文では2.1.0 (Scala 2.11)をインストールするように指示しています。

tar-ballを展開します。

tar -xzf kafka_2.11-2.1.0.tgz
cd kafka_2.11-2.1.0

`まず、Zookeeperインスタンスを起動します。

bin/zookeeper-server-start.sh config/zookeeper.properties

そして、Apache Kafkaを起動します。

bin/kafka-server-start.sh config/server.properties

Using Docker

Dockerを使用するという手もあります。ZookeeperとApache Kafkaの2つのDockerイメージが必要なので、このチュートリアルではdocker-composeを使用します。以下の手順に従ってください。

Install Docker
https://docs.docker.com/install/
Install Docker Compose
https://docs.docker.com/compose/install/

docker-compose.ymlを作成し、以下のように構成します。

version:  '3.2'
services:
  zookeeper:
    image: "confluent/zookeeper"
    networks:
      - microprofile
    ports:
      - 2181:2181
  kafka:
    image: "confluent/kafka"
    networks:
      - microprofile
    ports:
      - 9092:9092
    environment:
      - KAFKA_ADVERTISED_HOST_NAME=kafka
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_ADVERTISED_PORT=9092
    depends_on:
      - zookeeper
networks:
    microprofile:

以下のコマンドを実行します。

docker-compose -f docker-compose.yml up -d

localhostで接続できるよう、kafkaをホストのlocalhostとして定義するため、以下の値を /etc/hosts に追加します。

127.0.0.1       localhost kafka

Application With Eclipse MicroProfile

Eclipse MicroProfileは、複数のランタイム間での可搬性があるマイクロサービスアプリケーションプラットフォームを定義するために構成されたオープンソースプロジェクトです。Eclipse MicroProfileには、マイクロサービスとクラウドの時代にエンタープライズ・アプリケーションをより簡単に作成するためのプランがあります。Apache Kafkaと接続するために、Reactive Messageが使えるEclipse MicroProfile Reactiveがあります。

How to Write Reactive Applications with MicroProfile
https://www.eclipse.org/community/eclipse_newsletter/2018/september/reactive_mp.php

CDI 2.0とJava SEを使い、サンプルアプリケーションでApache Kafkaからの送受信メッセージをスムーズに作成しますが、その前に、このプロジェクトではMavenを使用するため、まずは依存関係を定義する必要があります。このプロジェクトは、MicroProfile Configurationの実装、CDI 2.0の実装、およびMicroProfile Reactiveという依存関係を有しています。

    <properties&gt;
        <project.build.sourceEncoding&gt;UTF-8</project.build.sourceEncoding&gt;
        <maven.compiler.source&gt;1.8</maven.compiler.source&gt;
        <maven.compiler.target&gt;1.8</maven.compiler.target&gt;
        <weld-core.version&gt;3.1.0.Final</weld-core.version&gt;
        <slf4j-api.version&gt;1.7.26</slf4j-api.version&gt;
    </properties&gt;
    <dependencies&gt;
        <dependency&gt;
            <groupId&gt;org.slf4j</groupId&gt;
            <artifactId&gt;slf4j-simple</artifactId&gt;
            <version&gt;${slf4j-api.version}</version&gt;
            <scope&gt;compile</scope&gt;
        </dependency&gt;
        <dependency&gt;
            <groupId&gt;io.smallrye</groupId&gt;
            <artifactId&gt;smallrye-config</artifactId&gt;
            <version&gt;1.3.5</version&gt;
        </dependency&gt;
        <dependency&gt;
            <groupId&gt;io.smallrye.reactive</groupId&gt;
            <artifactId&gt;smallrye-reactive-messaging-provider</artifactId&gt;
            <version&gt;0.0.4</version&gt;
        </dependency&gt;
        <dependency&gt;
            <groupId&gt;io.smallrye.reactive</groupId&gt;
            <artifactId&gt;smallrye-reactive-messaging-kafka</artifactId&gt;
            <version&gt;0.0.4</version&gt;
            <exclusions&gt;
                <exclusion&gt;
                    <!-- this avoid having the logging working, to exclude it --&gt;
                    <groupId&gt;org.slf4j</groupId&gt;
                    <artifactId&gt;slf4j-log4j12</artifactId&gt;
                </exclusion&gt;
            </exclusions&gt;
        </dependency&gt;
        <dependency&gt;
            <groupId&gt;io.smallrye.reactive</groupId&gt;
            <artifactId&gt;smallrye-reactive-streams-operators</artifactId&gt;
            <version&gt;0.4.1</version&gt;
        </dependency&gt;
        <dependency&gt;
            <groupId&gt;org.jboss.weld.se</groupId&gt;
            <artifactId&gt;weld-se-core</artifactId&gt;
            <version&gt;${weld-core.version}</version&gt;
        </dependency&gt;
        <dependency&gt;
            <groupId&gt;javax.enterprise</groupId&gt;
            <artifactId&gt;cdi-api</artifactId&gt;
            <version&gt;2.0.SP1</version&gt;
        </dependency&gt;
    </dependencies&gt;

続いて、 src/main/resources/META-INFに配置する構成ファイルを作成します。

1個目はCDIを有効にするためのbean.xmlファイルです。

<beans xmlns="http://xmlns.jcp.org/xml/ns/javaee"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee
http://xmlns.jcp.org/xml/ns/javaee/beans_1_1.xsd"
       bean-discovery-mode="all"&gt;
</beans&gt;

もう一つはmicroprofile-config.propertiesで、レコードのシリアライズ・デシリアライズの実装など、Apache Kafkaと接続するEclipse MicroProfileへのセットアップ構成をkey-value形式で記述します。

# Kafka Sink
smallrye.messaging.sink.data.type=io.smallrye.reactive.messaging.kafka.Kafka
smallrye.messaging.sink.data.bootstrap.servers=localhost:9092
smallrye.messaging.sink.data.key.serializer=org.apache.kafka.common.serialization.StringSerializer
smallrye.messaging.sink.data.value.serializer=org.apache.kafka.common.serialization.StringSerializer
smallrye.messaging.sink.data.acks=1
## Kafka Source
smallrye.messaging.source.kafka.type=io.smallrye.reactive.messaging.kafka.Kafka
smallrye.messaging.source.kafka.bootstrap.servers=localhost:9092
smallrye.messaging.source.kafka.topic=kafka
smallrye.messaging.source.kafka.group.id=demo
smallrye.messaging.source.kafka.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
smallrye.messaging.source.kafka.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

すべての設定が完了したら、最後にApache Kafkaからメッセージを送受信するクラスを作成します。APIでは Incoming と Outgoing という注釈が使えます。 Sender クラスにはBlockingQueueがあり、キューにテキストがあるときにメッセージを送信します。

Interface BlockingQueue
https://docs.oracle.com/javase/jp/8/docs/api/java/util/concurrent/BlockingQueue.html
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/BlockingQueue.html

@ApplicationScoped
public class Receiver {
    private static final Logger LOGGER = Logger.getLogger(Receiver.class.getName());
    @Incoming("kafka")
    public CompletionStage<Void&gt; consume(KafkaMessage<String, String&gt; message) {
        String payload = message.getPayload();
        String key = message.getKey();
        MessageHeaders headers = message.getHeaders();
        Integer partition = message.getPartition();
        Long timestamp = message.getTimestamp();
        LOGGER.info("received: " + payload + " from topic " + message.getTopic());
        return message.ack();
    }
}
@ApplicationScoped
public class Sender {
    private static final Logger LOGGER = Logger.getLogger(Sender.class.getName());
    private BlockingQueue<String&gt; messages = new LinkedBlockingQueue<&gt;();
    public void add(String message) {
        messages.add(message);
    }
    @Outgoing("data")
    public CompletionStage<KafkaMessage<String, String&gt;&gt; send() {
        return CompletableFuture.supplyAsync(() -&gt; {
            try {
                String message = messages.take();
                LOGGER.info("Sending message to kafka with the message: " + message);
                return KafkaMessage.of("kafka", "key", message);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });
    }
}

最後に、CDIコンテナを起動するクラスを作成します。完了したら、Kafkaにメッセージを送信してみましょう。結果はログで確認できます。

public class App {
    public static void main(String[] args) {
        SeContainer container = SeContainerInitializer.newInstance().initialize();
        Sender sender = container.select(Sender.class).get();
        sender.add("Hello world");
        sender.add("Otávio");
        sender.add("Poliana");
        sender.add("Clement");
    }
}

Apache Kafkaの可能性と、このプロジェクトがBig-Dataプレーヤにとってアクセシブルになった理由がわかりました。この簡単な例から、Eclipse MicroProfileとの統合に不安がないことがわかります。このサンプルコードを手助けしてくれたClement Escoffierに感謝します。サンプルコードはGitHubにUp済みです。

kafka-cdi-microprofile-hello-world – A hello world using Apache Kafka and Eclipse Microprofile
https://github.com/soujava/kafka-cdi-microprofile-hello-world