Azure Cosmos DB Java SDK v4 (1)

このエントリは2019/11/17現在の情報に基づいています。将来の機能追加・変更に伴い、記載内容との乖離が発生する可能性があります(2020/06/24更新済み)。

Igniteで発表された通り、Cosmos DB Java SDK v4がPreviewとして利用できるようになり、2020/06/10に一般提供された。

What’s new in Azure Cosmos DB: November 2019
https://devblogs.microsoft.com/cosmosdb/whats-new-in-azure-cosmos-db-nov-2019/

Java SDKのアップデートは以下の通り。

  • v3で露出していたReactor関連の要素が、同期クライアントを扱う場合には意識する必要がなくなった
    • 言い換えると、同期クライアントを使う場合はv3から書き換えたほうが見通しが良くなる。
  • 同期、非同期が明確に分離された。
    • 例えば、CosmosClientは同期型で、CosmosAsyncClientは非同期型と、Asyncが入る
  • v4では接続のデフォルトがDirectモードでTCP接続に変わっている。もしAzure外部からなど、HTTPSでのアクセスが必要な場合はGatewayモードを明示的に指定しなければならない。

v3で非同期処理をしている場合は、メソッド名やクラス名の変更をすれば、ロジックの変更をせずに動作する。なお、.NET SDKで追加されたTransactional batchやBulk execution modeは含まれていない。

依存関係

依存関係解決のために以下をpom.xmlに追加する。groupIdが変わっていることに注意(以下は2020/06/24現在の情報)。

<!-- https://mvnrepository.com/artifact/com.azure/azure-cosmos -->
<dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-cosmos</artifactId>
    <version>4.0.1</version>
</dependency>

同期型の場合

CosmosClientインスタンスの作成と終了

CosmosClientインスタンスを作成するため、CosmosClientBuilder()の最後でbuildClient()を呼び出す。終了はclose()を発行することで明示的にクローズできるが、CosmosClientはauto closableなので、try-with-resourceで囲むことができる。

なお、クライアント作成時に接続モードを指定できるが、先述の通り、HTTPSでの接続が必要な場合は、gatewayModeメソッドを明示的に呼び出す必要がある。

// auto closable
try (CosmosClient client = new CosmosClientBuilder()
        .endpoint(ENDPOINT)
        .key(KEY)
        .consistencyLevel(ConsistencyLevel.SESSION)
        .contentResponseOnWriteEnabled(true)
        .multipleWriteRegionsEnabled(true)
        .preferredRegions(Arrays.asList(REGION))
        .readRequestsFallbackEnabled(true)
        .endpointDiscoveryEnabled(true)
        .gatewayMode()
        .connectionSharingAcrossClientsEnabled(true)
        .buildClient()) {
...
} catch(CosmosException e) {
    e.printStackTrace();
}

Databaseへの接続

接続は既存のデータベースへの接続のためのgetDatabase()、もしくは新規データベース作成のためのcreateDatabaseIfNotExists()もしくはcreateDatabase()を使ってCosmosDatabaseインスタンスを作成する。Containerも同様のメソッドでCosmosContainerインスタンスを作成する。

以下の例からもわかるように、v3ではReactorを意識してflatMapやMono.justを使っていたが、記載の必要がなくなっていることがわかる。また、このタイミングでAutoscaleのスループット上限値を設定できる。

// Autoscale throughput settings
ThroughputProperties autoscaleThroughputProperties
        = ThroughputProperties.createAutoscaledThroughput(40000);

//Create the database with autoscale enabled
CosmosDatabaseResponse databaseResponse
        = client.createDatabaseIfNotExists(DATABASE, autoscaleThroughputProperties);
CosmosDatabase cosmosDatabase
        = client.getDatabase(databaseResponse.getProperties().getId());

// Configure container properties
CosmosContainerProperties autoscaleContainerProperties
        = new CosmosContainerProperties(CONTAINER, "/country");

CosmosContainerResponse response
        = cosmosDatabase.createContainerIfNotExists(autoscaleContainerProperties);
CosmosContainer cosmosContainer
        = cosmosDatabase.getContainer(response.getProperties().getId());

アイテムの取得、追加・更新

追加(createItem)、追加もしくは更新(upsertItem もしくは replace)は変化がないが、取得はgetItemからreadItemに変わった。例えばPeopleというクラスのインスタンスpを追加、追加もしくは更新、取得は以下のような感じ。

People p = new People();

// createによる追加
CosmosItemResponse<People> res1 = container.createItem(p);

// upsertによる更新 もしくは 追加(この場合は更新)
CosmosItemResponse<People> res2 = container.upsertItem(p);

// readによる取得
CosmosItemResponse<People> res3 = container.readItem(p.getId(), f.getCountry());

// replaceによる更新
// partition keyであるcountryを変更
res3.getItem().setCountry("US");
container.replaceItem(res3.getItem(),
        res3.getItem().getId(),
        new PartitionKey(res3.getItem().getCountry()),
        new CosmosItemRequestOptions());

アイテムの削除

これもv3と同じ。データの存在を確認するために一度CosmosContainer#readItem()やCosmosContainer#queryItems()で取得したのちに、CosmosItem#delete()を使う。CosmosContainer#delete()はコンテナーを削除するためのメソッドで用途が異なる。

同期型の場合、CosmosItemRequestOptionsが必須なので注意。

container.deleteItem(res3.getItem().getId(), new PartitionKey(res3.getItem().getCountry()), new CosmosItemRequestOptions());

クエリ

以前はreadItems()で全件読めたが、このメソッドがdeprecatedになった。queryItems()のみ利用できる。

// 同期
String sql = "select c.id, c.name, c.country, c.description from " + CONTAINER + " c";

CosmosQueryRequestOptions cosmosQueryRequestOptions
        = new CosmosQueryRequestOptions().setMaxDegreeOfParallelism(2);
CosmosPagedIterable<People> filteredPeople
        = cosmosContainer.queryItems(sql, cosmosQueryRequestOptions, People.class);

filteredPeople.forEach(people -> logger.info("Query result:[id] " + people.getId() + " [Country] " + people.getCountry() + " [Description] " + people.getDescription() + " [Name] " + people.getName()));

非同期型の場合

CosmosAsyncClientインスタンスの作成と終了

CosmosAsyncClientインスタンスを作成するため、CosmosClientBuilder()の最後でbuildAsyncClient()を呼び出す。終了はclose()を発行することで明示的にクローズできるが、CosmosClientはauto closableなので、try-with-resourceで囲むことができる。

クライアント作成時の接続モード指定は同期型の場合と同じ。

// auto closable
try (CosmosAsyncClient client = new CosmosClientBuilder()
        .endpoint(ENDPOINT)
        .key(KEY)
        .consistencyLevel(ConsistencyLevel.SESSION)
        .contentResponseOnWriteEnabled(true)
        .multipleWriteRegionsEnabled(true)
        .preferredRegions(Arrays.asList(REGION))
        .readRequestsFallbackEnabled(true)
        .endpointDiscoveryEnabled(true)
        .gatewayMode()
        .connectionSharingAcrossClientsEnabled(true)
        .buildAsyncClient()) {
...
} catch(CosmosException e) {
    e.printStackTrace();
}

Databaseへの接続

接続は既存のデータベースへの接続のためのgetDatabase()、もしくは新規データベース作成のためのcreateDatabaseIfNotExists()もしくはcreateDatabase()を使ってCosmosAsyncDatabaseインスタンスを作成する。Containerも同様のメソッドでCosmosAsyncContainerインスタンスを作成する。このタイミングでAutoscaleのスループット上限値を設定できるのは同期型と同じ。

以下ではblock()を使っているが、これはあくまでも説明のためであり、実際にはblock()を極力使わないことが望ましい。

ThroughputProperties autoscaleThroughputProperties
        = ThroughputProperties.createAutoscaledThroughput(40000);

CosmosAsyncDatabase database = client.createDatabaseIfNotExists(DATABASE, autoscaleThroughputProperties)
        .doOnError(throwable -> logger.info("[Create] Unable to create Database: " + throwable.getMessage()))
        .doOnSuccess(response -> {
            logger.info(">>[Create] Database created " + response .getProperties().getId());
            logger.info(">>[Create] databaseUsage " + response .getDatabaseUsage() + "[RU]");
        })
        .flatMap(response  -> Mono.just(client.getDatabase(response .getProperties().getId())))
        .publishOn(Schedulers.elastic())
        .block();

CosmosContainerProperties containerProperties
        = new CosmosContainerProperties(CONTAINER, "/country");
CosmosAsyncContainer container
        = database.createContainerIfNotExists(containerProperties)
        .doOnError(throwable -> logger.info("[Create] Unable to create Container " + throwable.getMessage()))
        .doOnSuccess(response-> {
            logger.info(">>[Create] Container created " + response.getProperties().getId());
            logger.info(">>[Create] requestCharge " + response.getRequestCharge() + "[RU]");
        })
        .flatMap(response -> Mono.just(database.getContainer(response.getProperties().getId())))
        .publishOn(Schedulers.elastic())
        .block();

アイテムの取得、追加・更新

同期型と同じく、追加(createItem)、追加もしくは更新(upsertItem もしくは replace)は変化がないが、取得はgetItemからreadItemに変わった。

People p = new People();

// createによる追加
CosmosItemResponse<People> res1 = container.createItem(p);

// upsertによる更新 もしくは 追加(この場合は更新)
CosmosItemResponse<People> res2 = container.upsertItem(p);

// replaceによる更新 
CosmosItemResponse<People> res3 = container.upsertItem(p);

// readによる取得
CosmosItemResponse<People> res4 = container.readItem(p.getId(), p.getCountry());

非同期API、というよりReactorベースなので、追加・変更などの処理をmergeWithで連続して記述できる(が、実際にはArrayListの要素をこんな感じで入れることはないはず)。

cosmosAsyncContainer.upsertItem(peopleArrayList.get(0))
        .doOnError(t->logger.info("[upsert] Unable to load/update : reason ==>" +  t.getMessage()))
        .mergeWith(cosmosAsyncContainer.upsertItem(peopleArrayList.get(1)))
        .doOnError(t->logger.info("[upsert] Unable to load/update : reason ==>" +  t.getMessage()))
        .mergeWith(cosmosAsyncContainer.upsertItem(peopleArrayList.get(2)))
        .doOnError(t->logger.info("[upsert] Unable to load/update : reason ==>" +  t.getMessage()))
        .mergeWith(cosmosAsyncContainer.upsertItem(peopleArrayList.get(3)))
        .doOnError(t->logger.info("[upsert] Unable to load/update : reason ==>" +  t.getMessage()))
        .mergeWith(cosmosAsyncContainer.upsertItem(peopleArrayList.get(4)))
        .doOnError(t->logger.info("[upsert] Unable to load/update : reason ==>" +  t.getMessage()))
        .mergeWith(cosmosAsyncContainer.upsertItem(peopleArrayList.get(5)))
        .doOnError(t->logger.info("[upsert] Unable to load/update : reason ==>" +  t.getMessage()))
        .doOnComplete(() -> logger.info("[Create] Loading completed"))
        .publishOn(Schedulers.elastic())
        .blockLast();

アイテムの削除

同期型と同じく、データの存在を確認するために一度CosmosAsyncContainer#readItem()で取得したのちに、CosmosAsyncItem#delete()を使う。CosmosAsyncContainer#delete()というメソッドはコンテナーを削除するためのメソッドで用途が異なる。

container.deleteItem(res4.getItem().getId(), new PartitionKey(res4.getItem().getCountry()), new CosmosItemRequestOptions());

クエリ

同期型の場合と同様、readItems()がdeprecatedになっており、queryItems()のみ利用できる。

String sql = "select c.id, c.name, c.country, c.description from " + CONTAINER + " c";

CosmosQueryRequestOptions op
        = new CosmosQueryRequestOptions().setQueryMetricsEnabled(true)
        .setMaxDegreeOfParallelism(2);

CosmosPagedFlux<People> response1 = container.queryItems(sql, op, People.class);
response1.byPage().flatMap(f -> {
    f.getResults().stream().forEach(p ->
            logger.info("Query result:[id] " + p.getId() + " [Country] " + p.getCountry() + " [Description] " + p.getDescription() + " [Name] " + p.getName())
    );
    return Flux.empty();
}).blockLast();

変更フィード

変更フィードは構成はv3と同じだが、メソッド名に一部変更がある。

以下は変更フィードを取得する例。Cosmos DBにデータが追加されたり、既存レコードへの更新が発生したりすると、ログに表示する。

try (CosmosAsyncClient client = new CosmosClientBuilder()
        .endpoint(ENDPOINT)
        .key(KEY)
        .consistencyLevel(ConsistencyLevel.SESSION)
        .contentResponseOnWriteEnabled(true)
        .multipleWriteRegionsEnabled(true)
        .preferredRegions(Arrays.asList(REGION))
        .readRequestsFallbackEnabled(true)
        .endpointDiscoveryEnabled(true)
        .gatewayMode()
        .connectionSharingAcrossClientsEnabled(true)
        .buildAsyncClient()) {

    CosmosAsyncDatabase db = client.getDatabase(DATABASE);
    feedc = db.createContainerIfNotExists(CONTAINER, "/country")
            .flatMap(res->Mono.just(db.getContainer(res.getProperties().getId())))
            .publishOn(Schedulers.elastic())
            .block();
    leasec = db.createContainerIfNotExists(LEASE_CONTAINER, "/id")
            .flatMap(res->Mono.just(db.getContainer(res.getProperties().getId())))
            .publishOn(Schedulers.elastic())
            .block();

    ChangeFeedProcessor cfp = new ChangeFeedProcessorBuilder()
            .hostName(ENDPOINT)
            .feedContainer(feedc)
            .leaseContainer(leasec)
            .handleChanges(docs -> {
                logger.info("[ChangeFeedSample] handleChanges START!");
                for (JsonNode cosmosItemProperties : docs) {
                    logger.info("[ChangeFeedSample] ChangeFeed Received " + cosmosItemProperties.toPrettyString());
                }
                logger.info("[ChangeFeedSample] handleChanges End!");
            })
            .options(new ChangeFeedProcessorOptions().setFeedPollDelay(Duration.ofSeconds(3)))
            .buildChangeFeedProcessor();
    cfp.start().subscribe();
}catch(CosmosException e1) {
    logger.severe(e1.getMessage());
}catch(Exception e2) {
    logger.severe(e2.getMessage());
}

参考

v3を紹介したエントリは以下。

Azure Cosmos DB Java v3 SDK
https://logico-jp.io/2019/07/22/azure-cosmos-db-java-v3-sdk/

コメントを残す

以下に詳細を記入するか、アイコンをクリックしてログインしてください。

WordPress.com ロゴ

WordPress.com アカウントを使ってコメントしています。 ログアウト /  変更 )

Facebook の写真

Facebook アカウントを使ってコメントしています。 ログアウト /  変更 )

%s と連携中