このエントリは2019/11/17現在の情報に基づいています。将来の機能追加・変更に伴い、記載内容との乖離が発生する可能性があります(2020/06/24更新済み)。
Igniteで発表された通り、Cosmos DB Java SDK v4がPreviewとして利用できるようになり、2020/06/10に一般提供された。
What’s new in Azure Cosmos DB: November 2019
https://devblogs.microsoft.com/cosmosdb/whats-new-in-azure-cosmos-db-nov-2019/
Java SDKのアップデートは以下の通り。
- v3で露出していたReactor関連の要素が、同期クライアントを扱う場合には意識する必要がなくなった
- 言い換えると、同期クライアントを使う場合はv3から書き換えたほうが見通しが良くなる。
- 同期、非同期が明確に分離された。
- 例えば、CosmosClientは同期型で、CosmosAsyncClientは非同期型と、Asyncが入る
- v4では接続のデフォルトがDirectモードでTCP接続に変わっている。もしAzure外部からなど、HTTPSでのアクセスが必要な場合はGatewayモードを明示的に指定しなければならない。
v3で非同期処理をしている場合は、メソッド名やクラス名の変更をすれば、ロジックの変更をせずに動作する。なお、.NET SDKで追加されたTransactional batchやBulk execution modeは含まれていない。
依存関係
依存関係解決のために以下をpom.xmlに追加する。groupIdが変わっていることに注意(以下は2020/06/24現在の情報)。
<!-- https://mvnrepository.com/artifact/com.azure/azure-cosmos -->
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-cosmos</artifactId>
<version>4.0.1</version>
</dependency>
同期型の場合
CosmosClientインスタンスの作成と終了
CosmosClientインスタンスを作成するため、CosmosClientBuilder()の最後でbuildClient()を呼び出す。終了はclose()を発行することで明示的にクローズできるが、CosmosClientはauto closableなので、try-with-resourceで囲むことができる。
なお、クライアント作成時に接続モードを指定できるが、先述の通り、HTTPSでの接続が必要な場合は、gatewayModeメソッドを明示的に呼び出す必要がある。
// auto closable
try (CosmosClient client = new CosmosClientBuilder()
.endpoint(ENDPOINT)
.key(KEY)
.consistencyLevel(ConsistencyLevel.SESSION)
.contentResponseOnWriteEnabled(true)
.multipleWriteRegionsEnabled(true)
.preferredRegions(Arrays.asList(REGION))
.readRequestsFallbackEnabled(true)
.endpointDiscoveryEnabled(true)
.gatewayMode()
.connectionSharingAcrossClientsEnabled(true)
.buildClient()) {
...
} catch(CosmosException e) {
e.printStackTrace();
}
Databaseへの接続
接続は既存のデータベースへの接続のためのgetDatabase()、もしくは新規データベース作成のためのcreateDatabaseIfNotExists()もしくはcreateDatabase()を使ってCosmosDatabaseインスタンスを作成する。Containerも同様のメソッドでCosmosContainerインスタンスを作成する。
以下の例からもわかるように、v3ではReactorを意識してflatMapやMono.justを使っていたが、記載の必要がなくなっていることがわかる。また、このタイミングでAutoscaleのスループット上限値を設定できる。
// Autoscale throughput settings
ThroughputProperties autoscaleThroughputProperties
= ThroughputProperties.createAutoscaledThroughput(40000);
//Create the database with autoscale enabled
CosmosDatabaseResponse databaseResponse
= client.createDatabaseIfNotExists(DATABASE, autoscaleThroughputProperties);
CosmosDatabase cosmosDatabase
= client.getDatabase(databaseResponse.getProperties().getId());
// Configure container properties
CosmosContainerProperties autoscaleContainerProperties
= new CosmosContainerProperties(CONTAINER, "/country");
CosmosContainerResponse response
= cosmosDatabase.createContainerIfNotExists(autoscaleContainerProperties);
CosmosContainer cosmosContainer
= cosmosDatabase.getContainer(response.getProperties().getId());
アイテムの取得、追加・更新
追加(createItem)、追加もしくは更新(upsertItem もしくは replace)は変化がないが、取得はgetItemからreadItemに変わった。例えばPeopleというクラスのインスタンスpを追加、追加もしくは更新、取得は以下のような感じ。
People p = new People();
// createによる追加
CosmosItemResponse<People> res1 = container.createItem(p);
// upsertによる更新 もしくは 追加(この場合は更新)
CosmosItemResponse<People> res2 = container.upsertItem(p);
// readによる取得
CosmosItemResponse<People> res3 = container.readItem(p.getId(), f.getCountry());
// replaceによる更新
// partition keyであるcountryを変更
res3.getItem().setCountry("US");
container.replaceItem(res3.getItem(),
res3.getItem().getId(),
new PartitionKey(res3.getItem().getCountry()),
new CosmosItemRequestOptions());
アイテムの削除
これもv3と同じ。データの存在を確認するために一度CosmosContainer#readItem()やCosmosContainer#queryItems()で取得したのちに、CosmosItem#delete()を使う。CosmosContainer#delete()はコンテナーを削除するためのメソッドで用途が異なる。
同期型の場合、CosmosItemRequestOptionsが必須なので注意。
container.deleteItem(res3.getItem().getId(), new PartitionKey(res3.getItem().getCountry()), new CosmosItemRequestOptions());
クエリ
以前はreadItems()で全件読めたが、このメソッドがdeprecatedになった。queryItems()のみ利用できる。
// 同期
String sql = "select c.id, c.name, c.country, c.description from " + CONTAINER + " c";
CosmosQueryRequestOptions cosmosQueryRequestOptions
= new CosmosQueryRequestOptions().setMaxDegreeOfParallelism(2);
CosmosPagedIterable<People> filteredPeople
= cosmosContainer.queryItems(sql, cosmosQueryRequestOptions, People.class);
filteredPeople.forEach(people -> logger.info("Query result:[id] " + people.getId() + " [Country] " + people.getCountry() + " [Description] " + people.getDescription() + " [Name] " + people.getName()));
非同期型の場合
CosmosAsyncClientインスタンスの作成と終了
CosmosAsyncClientインスタンスを作成するため、CosmosClientBuilder()の最後でbuildAsyncClient()を呼び出す。終了はclose()を発行することで明示的にクローズできるが、CosmosClientはauto closableなので、try-with-resourceで囲むことができる。
クライアント作成時の接続モード指定は同期型の場合と同じ。
// auto closable
try (CosmosAsyncClient client = new CosmosClientBuilder()
.endpoint(ENDPOINT)
.key(KEY)
.consistencyLevel(ConsistencyLevel.SESSION)
.contentResponseOnWriteEnabled(true)
.multipleWriteRegionsEnabled(true)
.preferredRegions(Arrays.asList(REGION))
.readRequestsFallbackEnabled(true)
.endpointDiscoveryEnabled(true)
.gatewayMode()
.connectionSharingAcrossClientsEnabled(true)
.buildAsyncClient()) {
...
} catch(CosmosException e) {
e.printStackTrace();
}
Databaseへの接続
接続は既存のデータベースへの接続のためのgetDatabase()、もしくは新規データベース作成のためのcreateDatabaseIfNotExists()もしくはcreateDatabase()を使ってCosmosAsyncDatabaseインスタンスを作成する。Containerも同様のメソッドでCosmosAsyncContainerインスタンスを作成する。このタイミングでAutoscaleのスループット上限値を設定できるのは同期型と同じ。
以下ではblock()を使っているが、これはあくまでも説明のためであり、実際にはblock()を極力使わないことが望ましい。
ThroughputProperties autoscaleThroughputProperties
= ThroughputProperties.createAutoscaledThroughput(40000);
CosmosAsyncDatabase database = client.createDatabaseIfNotExists(DATABASE, autoscaleThroughputProperties)
.doOnError(throwable -> logger.info("[Create] Unable to create Database: " + throwable.getMessage()))
.doOnSuccess(response -> {
logger.info(">>[Create] Database created " + response .getProperties().getId());
logger.info(">>[Create] databaseUsage " + response .getDatabaseUsage() + "[RU]");
})
.flatMap(response -> Mono.just(client.getDatabase(response .getProperties().getId())))
.publishOn(Schedulers.elastic())
.block();
CosmosContainerProperties containerProperties
= new CosmosContainerProperties(CONTAINER, "/country");
CosmosAsyncContainer container
= database.createContainerIfNotExists(containerProperties)
.doOnError(throwable -> logger.info("[Create] Unable to create Container " + throwable.getMessage()))
.doOnSuccess(response-> {
logger.info(">>[Create] Container created " + response.getProperties().getId());
logger.info(">>[Create] requestCharge " + response.getRequestCharge() + "[RU]");
})
.flatMap(response -> Mono.just(database.getContainer(response.getProperties().getId())))
.publishOn(Schedulers.elastic())
.block();
アイテムの取得、追加・更新
同期型と同じく、追加(createItem)、追加もしくは更新(upsertItem もしくは replace)は変化がないが、取得はgetItemからreadItemに変わった。
People p = new People();
// createによる追加
CosmosItemResponse<People> res1 = container.createItem(p);
// upsertによる更新 もしくは 追加(この場合は更新)
CosmosItemResponse<People> res2 = container.upsertItem(p);
// replaceによる更新
CosmosItemResponse<People> res3 = container.upsertItem(p);
// readによる取得
CosmosItemResponse<People> res4 = container.readItem(p.getId(), p.getCountry());
非同期API、というよりReactorベースなので、追加・変更などの処理をmergeWithで連続して記述できる(が、実際にはArrayListの要素をこんな感じで入れることはないはず)。
cosmosAsyncContainer.upsertItem(peopleArrayList.get(0))
.doOnError(t->logger.info("[upsert] Unable to load/update : reason ==>" + t.getMessage()))
.mergeWith(cosmosAsyncContainer.upsertItem(peopleArrayList.get(1)))
.doOnError(t->logger.info("[upsert] Unable to load/update : reason ==>" + t.getMessage()))
.mergeWith(cosmosAsyncContainer.upsertItem(peopleArrayList.get(2)))
.doOnError(t->logger.info("[upsert] Unable to load/update : reason ==>" + t.getMessage()))
.mergeWith(cosmosAsyncContainer.upsertItem(peopleArrayList.get(3)))
.doOnError(t->logger.info("[upsert] Unable to load/update : reason ==>" + t.getMessage()))
.mergeWith(cosmosAsyncContainer.upsertItem(peopleArrayList.get(4)))
.doOnError(t->logger.info("[upsert] Unable to load/update : reason ==>" + t.getMessage()))
.mergeWith(cosmosAsyncContainer.upsertItem(peopleArrayList.get(5)))
.doOnError(t->logger.info("[upsert] Unable to load/update : reason ==>" + t.getMessage()))
.doOnComplete(() -> logger.info("[Create] Loading completed"))
.publishOn(Schedulers.elastic())
.blockLast();
アイテムの削除
同期型と同じく、データの存在を確認するために一度CosmosAsyncContainer#readItem()で取得したのちに、CosmosAsyncItem#delete()を使う。CosmosAsyncContainer#delete()というメソッドはコンテナーを削除するためのメソッドで用途が異なる。
container.deleteItem(res4.getItem().getId(), new PartitionKey(res4.getItem().getCountry()), new CosmosItemRequestOptions());
クエリ
同期型の場合と同様、readItems()がdeprecatedになっており、queryItems()のみ利用できる。
String sql = "select c.id, c.name, c.country, c.description from " + CONTAINER + " c";
CosmosQueryRequestOptions op
= new CosmosQueryRequestOptions().setQueryMetricsEnabled(true)
.setMaxDegreeOfParallelism(2);
CosmosPagedFlux<People> response1 = container.queryItems(sql, op, People.class);
response1.byPage().flatMap(f -> {
f.getResults().stream().forEach(p ->
logger.info("Query result:[id] " + p.getId() + " [Country] " + p.getCountry() + " [Description] " + p.getDescription() + " [Name] " + p.getName())
);
return Flux.empty();
}).blockLast();
変更フィード
変更フィードは構成はv3と同じだが、メソッド名に一部変更がある。
以下は変更フィードを取得する例。Cosmos DBにデータが追加されたり、既存レコードへの更新が発生したりすると、ログに表示する。
try (CosmosAsyncClient client = new CosmosClientBuilder()
.endpoint(ENDPOINT)
.key(KEY)
.consistencyLevel(ConsistencyLevel.SESSION)
.contentResponseOnWriteEnabled(true)
.multipleWriteRegionsEnabled(true)
.preferredRegions(Arrays.asList(REGION))
.readRequestsFallbackEnabled(true)
.endpointDiscoveryEnabled(true)
.gatewayMode()
.connectionSharingAcrossClientsEnabled(true)
.buildAsyncClient()) {
CosmosAsyncDatabase db = client.getDatabase(DATABASE);
feedc = db.createContainerIfNotExists(CONTAINER, "/country")
.flatMap(res->Mono.just(db.getContainer(res.getProperties().getId())))
.publishOn(Schedulers.elastic())
.block();
leasec = db.createContainerIfNotExists(LEASE_CONTAINER, "/id")
.flatMap(res->Mono.just(db.getContainer(res.getProperties().getId())))
.publishOn(Schedulers.elastic())
.block();
ChangeFeedProcessor cfp = new ChangeFeedProcessorBuilder()
.hostName(ENDPOINT)
.feedContainer(feedc)
.leaseContainer(leasec)
.handleChanges(docs -> {
logger.info("[ChangeFeedSample] handleChanges START!");
for (JsonNode cosmosItemProperties : docs) {
logger.info("[ChangeFeedSample] ChangeFeed Received " + cosmosItemProperties.toPrettyString());
}
logger.info("[ChangeFeedSample] handleChanges End!");
})
.options(new ChangeFeedProcessorOptions().setFeedPollDelay(Duration.ofSeconds(3)))
.buildChangeFeedProcessor();
cfp.start().subscribe();
}catch(CosmosException e1) {
logger.severe(e1.getMessage());
}catch(Exception e2) {
logger.severe(e2.getMessage());
}
参考
v3を紹介したエントリは以下。
Azure Cosmos DB Java v3 SDK
https://logico-jp.io/2019/07/22/azure-cosmos-db-java-v3-sdk/