Azure Cosmos DB Java v3 SDK

(2019/08/16に3.1.0でCosmosDatabase#createContaineIfNotExists()の挙動が修正されていることを確認しました)

このエントリは2019/07/22現在の情報に基づくものです。将来の機能追加・廃止に伴い、記載内容との乖離が発生する可能性があります。

先日、Azure Cosmos DB Java V3 SDKがリリースされました。

Azure Cosmos DB Java V3 SDK now available
https://azure.microsoft.com/updates/azure-cosmos-db-java-v3-sdk-now-available/

特徴は以下の通りです。

  • Reactorベースの非同期プログラミングモデル(v2は非同期の場合Completable Futureベース)
  • Azure Cosmos DBのdirect TCPトランスポートプロトコルをサポート
  • 直感的で使いやすいAPI (Fluentスタイルで記述できる)
  • 名前空間の変更(以前はcom.microsoft.azure.cosmosdb.* だったが、v3ではcom.azure.data.cosmos.*)。

JavaDocは以下から参照できます。

Package com.azure.data.cosmos
https://azure.github.io/azure-cosmosdb-java/3.0.0/

ソースコードは以下から参照できます。ただ、masterブランチは2.4.3のままなので、明示的にv3を指定してください。

https://github.com/Azure/azure-cosmosdb-java/tree/v3

動作確認

依存関係

依存関係解決のために以下をpom.xmlに追加します。

<dependency>
  <groupId>com.microsoft.azure</groupId>
  <artifactId>azure-cosmos</artifactId>
  <version>3.1.0</version>
</dependency>

CosmosClientインスタンスの作成と終了

Cosmos DBの接続クライアントであるCosmosClientインスタンスを生成します。終了はCosmosClient#close()を発行します。

CosmosClient client = CosmosClient.builder()
        .endpoint(ENDPOINT)
        .key(KEY)
        .build();
...
client.close();

Databaseへの接続を開く

接続は既存のデータベースへの接続のためのCosmosClient#getDatabase()、もしくは新規データベース作成のためのCosmosClient#createDatabaseIfNotExists()もしくはCosmosClient#createDatabase()を使ってCosmosDatabaseインスタンスを作成します。それぞれの違いは以下の通りです。

CosmosClient#動作備考
getDatabase指定したデータベースを開くデータベースが存在しない場合は例外発生(CosmosClientException)
createDatabase指定した名称のデータベースを作成するデータベースが存在する場合は例外発生(CosmosClientException)
createDatabaseIfNotExists指定した名称のデータベースが存在しない場合にデータベースを作成
存在する場合はデータベースを開く

以下はCosmosClient#createDatabaseIfNotExists()を使った例です。わかりやすくするためにblock()を使っていますが、実運用のコードではReactiveのメリットを生かしつつ、Fluentスタイルで記述を連ねてください。

CosmosDatabase cosmosDatabase = client.createDatabaseIfNotExists(DATABASE)
        .doOnError(throwable -> logger.info("[Create] Unable to create Database: " + throwable.getMessage()))
        .doOnSuccess(cosmosDatabaseResponse -> logger.info("[Create] Database created " + cosmosDatabaseResponse.database().id()))
        .flatMap(cosmosDatabaseResponse -> Mono.just(cosmosDatabaseResponse.database()))
        .publishOn(Schedulers.elastic()).block();

Containerへの接続を開く

接続は既存のコンテナーへの接続のためのCosmosDatabase#getContainer()、もしくは新規データベース作成のためのCosmosDatabase#createContaineIfNotExists()もしくはCosmosDatabase#createContainer()を使ってCosmosContainerインスタンスを作成します。それぞれの違いは以下の通りです。

CosmosDatabase#動作備考
getContainer指定したコンテナーを開くコンテナーが存在しない場合は例外発生(CosmosClientException)
createContainer指定した名称のコンテナーを作成するコンテナーが存在する場合は例外発生(CosmosClientException)
createContainerIfNotExists指定した名称のコンテナーが存在しない場合にコンテナーを作成
存在する場合はコンテナーを開く
3.1.0で修正済み
2019/07/22現在、以下のBugでNull Pointer Exceptionが返る
https://github.com/Azure/azure-cosmosdb-java/issues/221

以下はCosmosDatabase#createContainer()を使った例です。doOnError()などを使い、エラー時の処理を追加できます。

CosmosContainer container1 = cosmosDatabase.createContainer(new CosmosContainerProperties(CONTAINER, "/country"))
        .doOnError(throwable -> logger.info("[Create] Unable to create Container " + throwable.getMessage()))
        .doOnSuccess(cosmosContainerResponse -> logger.info("[Create] Container created " + cosmosContainerResponse.container().id()))
        .flatMap(cosmosContainerResponse -> Mono.just(cosmosContainerResponse.container()))
        .publishOn(Schedulers.elastic())...

もちろん、CosmosClientインスタンス作成からCosmosContainerインスタンス作成まで一気に書くこともできます。

CosmosContainer cosmosContainer = CosmosClient.builder()
        .endpoint(ENDPOINT)
        .key(KEY)
        .build()
        .getDatabase(DATABASE)
        .getContainer(CONTAINER);

アイテムの取得、追加・更新

コンテナからのアイテム取得もしくはアイテムの追加・更新には、以下のメソッドを利用します。

CosmosContainer#動作備考
getItemid、partitionKeyを指定してアイテムを取得する該当する値がない場合はCosmosClientException
createItem指定されたPOJOインスタンスでアイテムを作成する。重複時はCosmosClientException
upsertItem指定されたPOJOインスタンスでアイテムを作成する。重複する場合は更新する。

以下はPeopleというクラスのインスタンスp1、p2を順次追加する例です。この例ではupsertItemを使っています。

CosmosItemResponse response1 = container1.upsertItem(p1)
        .doOnError(throwable -> logger.info("[Create] Unable to load p1 " + throwable.getMessage()))
        .mergeWith(container1.upsertItem(p2))
        .doOnError(throwable -> logger.info("[Create] Unable to load p2 " + throwable.getMessage()))
        .doOnComplete(()->logger.info("[Create] Loading completed"))
        .publishOn(Schedulers.elastic()).blockLast();

アイテムの更新・削除

アイテムの更新、削除は、一度CosmosContainer#getItem()で取得したのちに、CosmosItem#replace()もしくはCosmosItem#delete()を使います。CosmosContainer#delete()というメソッドもありますが、これはコンテナーを削除するためのメソッドで用途が異なります。

CosmosItem cosmosItem1 = container1.getItem("111", "US");
p1.setDescription("変更するよ");
cosmosItem1.replace(p1)
        .doOnError(throwable -> logger.info("[Create] Unable to replace p1 " + throwable.getMessage()))
        .publishOn(Schedulers.immediate()).block();

...

CosmosItem cosmosItem2 = container1.getItem("222", "CA");
cosmosItem2.delete().publishOn(Schedulers.immediate()).block();

クエリ、読み取り

CosmosContainer#queryItems()やCosmosContainer#readItems()を使います。用途の違いは以下の通りです。

CosmosContainer#動作
queryItemsSQLで指定した要素、フィールドを取得する
readAllItemsCosmos DBのContainerに格納された値を全て取得する(つまり、Data Explorerで確認できる、_ridや_etagなどを含んだ値)

以下はqueryItemsを使った例です。

FeedResponse<CosmosItemProperties> feedResponse = cosmosContainer.queryItems("select c.id, c.name, c.country, c.description from " + CONTAINER + " c", new FeedOptions().enableCrossPartitionQuery(true).maxDegreeOfParallelism(2))
        .doOnError(throwable -> logger.info("[Query] Unable to query : " + throwable.getMessage()))
        .publishOn(Schedulers.elastic()).blockLast();

変更フィード

変更フィードは、フィード対象のコンテナー、リースのためのコンテナーを指定し、変更フィードを取得した後の処理をChangeFeedProcessor.BuilderDefinition#handleChanges()に登録します。

Polling間隔はChangeFeedProcessor.BuilderDefinition#options()で登録します。

ChangeFeedProcessorインスタンスを生成後、ChangeFeedProcessor#start()でリスニングを開始します。なお、ChangeFeedProcessor#startではリスニングを開始するだけですので、変更フィードをサブスクライブしなければ情報は取得できません。そのため、subscribeメソッドを明示的に呼ぶ必要があります。

以下は変更フィードを取得する例です。Cosmos DBにデータが追加されたり、既存レコードへの更新が発生したりすると、ログに表示されます。

CosmosClient client = CosmosClient.builder()
        .endpoint(ENDPOINT)
        .key(KEY)
        .build();
CosmosContainer feedContainer = client.getDatabase(DATABASE).createContainerIfNotExists(CONTAINER, "/country").flatMap(f -> Mono.just(f.container())).publishOn(Schedulers.elastic()).block();
CosmosContainer leaseContainer = client.getDatabase(DATABASE).createContainerIfNotExists(LEASE_CONTAINER, "/id").flatMap(f -> Mono.just(f.container())).publishOn(Schedulers.elastic()).block();
ChangeFeedProcessor.Builder()
        .hostName(ENDPOINT)
        .feedContainer(feedContainer)
        .leaseContainer(leaseContainer)
        .handleChanges(docs -> {
            logger.info("[ChangeFeedSample] handleChanges START!");
            for (CosmosItemProperties cosmosItemProperties : docs) {
                logger.info("[ChangeFeedSample] ChangeFeed Received " + cosmosItemProperties.toJson(SerializationFormattingPolicy.INDENTED));
            }
            logger.info("[ChangeFeedSample] handleChanges End!");
        })
        .options(new ChangeFeedProcessorOptions().feedPollDelay(Duration.ofSeconds(3)))
        .build()
        .start()
        .subscribe();

コメントを残す

以下に詳細を記入するか、アイコンをクリックしてログインしてください。

WordPress.com ロゴ

WordPress.com アカウントを使ってコメントしています。 ログアウト /  変更 )

Facebook の写真

Facebook アカウントを使ってコメントしています。 ログアウト /  変更 )

%s と連携中