Azure Cosmos DB Java v4 SDK

このエントリは2019/11/07現在の情報に基づいています。将来の機能追加・変更に伴い、記載内容との乖離が発生する可能性があります。

Igniteで発表された通り、Cosmos DB Java SDK v4がPreviewとして利用できるようになった。

What’s new in Azure Cosmos DB: November 2019
https://devblogs.microsoft.com/cosmosdb/whats-new-in-azure-cosmos-db-nov-2019/

Java SDKのアップデートは以下の通り。

  • v3で露出していたReactor関連の要素が、同期クライアントを扱う場合には意識する必要がなくなった
    • 言い換えると、同期クライアントを使う場合はv3から書き換えたほうが見通しが良くなる。
  • 非同期であることを明示するようになった
    • 例えば、CosmosClientは同期型で、CosmosAsyncClientは非同期型と、Asyncが入る
  • メソッド表記がJavaふうに変わった(これまではsetXXX、getXXXというメソッド名ではなかった)

v3で非同期処理をしている場合は、メソッド名やクラス名の変更をすれば、ロジックの変更をせずに動作する。なお、.NET SDKで追加されたTransactional batchやBulk execution modeは含まれていない。

動作確認

依存関係

依存関係解決のために以下をpom.xmlに追加する。groupIdが変わっていることに注意。

<dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-cosmos</artifactId>
    <version>4.0.0-preview.1</version>
</dependency>

CosmosClient/CosmosAsyncClientインスタンスの作成と終了

同期クライアントと非同期クライアントが明確に分離された。同期クライアントはCosmosClient、非同期クライアントはCosmosAsyncClient。終了はclose()を発行する。以下は同期クライアントの例。

CosmosClient client = CosmosClient.builder()
            .setEndpoint(ENDPOINT)
            .setKey(KEY)
            .setConnectionPolicy(connectionPolicy)
            .buildClient();
...
client.close();

Databaseへの接続を開く

接続は既存のデータベースへの接続のためのgetDatabase()、もしくは新規データベース作成のためのcreateDatabaseIfNotExists()もしくはcreateDatabase()を使ってCosmosDatabaseもしくはCosmosAsyncDatabaseインスタンスを作成する。Containerの場合も、CosmosContainerもしくはCosmosAsyncContainerインスタンスを作成する。

以下は同期クライアントの場合。v3ではReactorを意識してflatMapやMono.justを使っていたが、記載の必要がなくなっていることがわかる。

try {
    container = client.createDatabaseIfNotExists(DATABASE)
            .getDatabase()
            .createContainerIfNotExists(CONTAINER,"/country")
            .getContainer();
} catch (CosmosClientException e) {
    e.printStackTrace();
}

アイテムの取得、追加・更新

このあたりは変更がない。

CosmosItemResponse res1 = container.upsertItem(f);
CosmosItemResponse res2 = container.createItem(f);
CosmosItemResponse res3 = container.getItem(f.getId(), f.getMyPartitionKey());

アイテムの更新・削除

これもあまり変更がない。一度CosmosContainer#getItem()もしくはCosmosAsyncContainer#getItem()で取得したのちに、同期型の場合はCosmosItem#replace()もしくはCosmosItem#delete()、非同期型の場合はCosmosAsyncItem#replace()もしくはCosmosAsyncItem#delete()を使う。CosmosContainer#delete()やCosmosAsyncContainer#delete()というメソッドはコンテナーを削除するためのメソッドで用途が異なる。

同期型の場合、CosmosItemRequestOptionsが必須なので注意。

// 同期
CosmosItemRequestOptions cosmosItemRequestOptions = new CosmosItemRequestOptions().setPartitionKey(new PartitionKey("JP"));
CosmosItem cosmosItem1 = container1.getItem("111", "US");
p1.setDescription("変更するよ");
cosmosItem1.replace(p1, cosmosItemRequestOptions);
...
cosmosItem1.delete(cosmosItemRequestOptions);
// 非同期
CosmosAsyncItem cosmosItem1 = container1.getItem("111", "US");
p1.setDescription("変更するよ");
cosmosItem1.replace(p1)
        .doOnError(throwable -> logger.info("[Create] Unable to replace p1 " + throwable.getMessage()))
        .doOnSuccess(cosmosItemResponse -> logger.info(">>[Create-replace] " + cosmosItemResponse.getRequestCharge() + "[RU] Latency: " + cosmosItemResponse.getRequestLatency()))
        .publishOn(Schedulers.immediate())
        .block();

クエリ、読み取り

これは変更がない。queryItems()やreadItems()を使う。

// 同期
Iterator<FeedResponse<CosmosItemProperties>> it = container.queryItems("select c.id, c.name, c.country, c.description from " + CONTAINER + " c", new FeedOptions().setEnableCrossPartitionQuery(true).setMaxDegreeOfParallelism(2));
while(it.hasNext()) {
    it.next().getResults().forEach(f->System.out.println(f.toJson()));
}
// 非同期
FeedResponse<CosmosItemProperties> feedResponse = cosmosContainer.queryItems("select c.id, c.name, c.country, c.description from " + CONTAINER + " c", new FeedOptions().setEnableCrossPartitionQuery(true).setMaxDegreeOfParallelism(2))
.doOnError(throwable -> logger.info("[querySample1] Unable to create Database: " + throwable.getMessage()))
.publishOn(Schedulers.elastic())
.blockLast();

変更フィード

変更フィードは構成はv3と同じだが、メソッド名に一部変更がある。

以下は変更フィードを取得する例。Cosmos DBにデータが追加されたり、既存レコードへの更新が発生したりすると、ログに表示する。

CosmosAsyncClient client = 
    CosmosClient.builder()
        .setEndpoint(ENDPOINT)
        .setKey(KEY)
        .buildAsyncClient();
CosmosAsyncContainer feedContainer = 
    client.getDatabase(DATABASE)
        .createContainerIfNotExists(CONTAINER, "/country")
        .flatMap(f -> Mono.just(f.getContainer()))
        .publishOn(Schedulers.elastic())
        .block();
CosmosAsyncContainer leaseContainer = 
    client.getDatabase(DATABASE)
        .createContainerIfNotExists(LEASE_CONTAINER, "/id")
        .flatMap(f -> Mono.just(f.getContainer()))
        .publishOn(Schedulers.elastic())
        .block();
ChangeFeedProcessor changeFeedProcessor = 
    ChangeFeedProcessor.Builder()
        .setHostName(ENDPOINT)
        .setFeedContainer(feedContainer)
        .setLeaseContainer(leaseContainer)
        .setHandleChanges(docs -> {
            logger.info("[ChangeFeedSample] handleChanges START!");
            for (CosmosItemProperties cosmosItemProperties : docs) {
                logger.info("[ChangeFeedSample] ChangeFeed Received " + cosmosItemProperties.toJson(SerializationFormattingPolicy.INDENTED));
            }
            logger.info("[ChangeFeedSample] handleChanges End!");
        })
        .setOptions(new ChangeFeedProcessorOptions().setFeedPollDelay(Duration.ofSeconds(3)))
        .build();
changeFeedProcessor.start().subscribe();

参考

v3を紹介したエントリは以下。

Azure Cosmos DB Java v3 SDK
https://logico-jp.io/2019/07/22/azure-cosmos-db-java-v3-sdk/

コメントを残す

以下に詳細を記入するか、アイコンをクリックしてログインしてください。

WordPress.com ロゴ

WordPress.com アカウントを使ってコメントしています。 ログアウト /  変更 )

Google フォト

Google アカウントを使ってコメントしています。 ログアウト /  変更 )

Twitter 画像

Twitter アカウントを使ってコメントしています。 ログアウト /  変更 )

Facebook の写真

Facebook アカウントを使ってコメントしています。 ログアウト /  変更 )

%s と連携中