(2019/08/16に3.1.0でCosmosDatabase#createContaineIfNotExists()の挙動が修正されていることを確認しました)
このエントリは2019/07/22現在の情報に基づくものです。将来の機能追加・廃止に伴い、記載内容との乖離が発生する可能性があります。
先日、Azure Cosmos DB Java V3 SDKがリリースされました。
Azure Cosmos DB Java V3 SDK now available
https://azure.microsoft.com/updates/azure-cosmos-db-java-v3-sdk-now-available/
特徴は以下の通りです。
- Reactorベースの非同期プログラミングモデル(v2は非同期の場合Completable Futureベース)
- Azure Cosmos DBのdirect TCPトランスポートプロトコルをサポート
- 直感的で使いやすいAPI (Fluentスタイルで記述できる)
- 名前空間の変更(以前はcom.microsoft.azure.cosmosdb.* だったが、v3ではcom.azure.data.cosmos.*)。
JavaDocは以下から参照できます。
Package com.azure.data.cosmos
https://azure.github.io/azure-cosmosdb-java/3.0.0/
ソースコードは以下から参照できます。ただ、masterブランチは2.4.3のままなので、明示的にv3を指定してください。
https://github.com/Azure/azure-cosmosdb-java/tree/v3
動作確認
依存関係
依存関係解決のために以下をpom.xmlに追加します。
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-cosmos</artifactId>
<version>3.1.0</version>
</dependency>
CosmosClientインスタンスの作成と終了
Cosmos DBの接続クライアントであるCosmosClientインスタンスを生成します。終了はCosmosClient#close()を発行します。
CosmosClient client = CosmosClient.builder()
.endpoint(ENDPOINT)
.key(KEY)
.build();
...
client.close();
Databaseへの接続を開く
接続は既存のデータベースへの接続のためのCosmosClient#getDatabase()、もしくは新規データベース作成のためのCosmosClient#createDatabaseIfNotExists()もしくはCosmosClient#createDatabase()を使ってCosmosDatabaseインスタンスを作成します。それぞれの違いは以下の通りです。
CosmosClient# | 動作 | 備考 |
---|---|---|
getDatabase | 指定したデータベースを開く | データベースが存在しない場合は例外発生(CosmosClientException) |
createDatabase | 指定した名称のデータベースを作成する | データベースが存在する場合は例外発生(CosmosClientException) |
createDatabaseIfNotExists | 指定した名称のデータベースが存在しない場合にデータベースを作成 存在する場合はデータベースを開く |
以下はCosmosClient#createDatabaseIfNotExists()を使った例です。わかりやすくするためにblock()を使っていますが、実運用のコードではReactiveのメリットを生かしつつ、Fluentスタイルで記述を連ねてください。
CosmosDatabase cosmosDatabase = client.createDatabaseIfNotExists(DATABASE)
.doOnError(throwable -> logger.info("[Create] Unable to create Database: " + throwable.getMessage()))
.doOnSuccess(cosmosDatabaseResponse -> logger.info("[Create] Database created " + cosmosDatabaseResponse.database().id()))
.flatMap(cosmosDatabaseResponse -> Mono.just(cosmosDatabaseResponse.database()))
.publishOn(Schedulers.elastic()).block();
Containerへの接続を開く
接続は既存のコンテナーへの接続のためのCosmosDatabase#getContainer()、もしくは新規データベース作成のためのCosmosDatabase#createContaineIfNotExists()もしくはCosmosDatabase#createContainer()を使ってCosmosContainerインスタンスを作成します。それぞれの違いは以下の通りです。
CosmosDatabase# | 動作 | 備考 |
---|---|---|
getContainer | 指定したコンテナーを開く | コンテナーが存在しない場合は例外発生(CosmosClientException) |
createContainer | 指定した名称のコンテナーを作成する | コンテナーが存在する場合は例外発生(CosmosClientException) |
createContainerIfNotExists | 指定した名称のコンテナーが存在しない場合にコンテナーを作成 存在する場合はコンテナーを開く | 3.1.0で修正済み https://github.com/Azure/azure-cosmosdb-java/issues/221 |
以下はCosmosDatabase#createContainer()を使った例です。doOnError()などを使い、エラー時の処理を追加できます。
CosmosContainer container1 = cosmosDatabase.createContainer(new CosmosContainerProperties(CONTAINER, "/country"))
.doOnError(throwable -> logger.info("[Create] Unable to create Container " + throwable.getMessage()))
.doOnSuccess(cosmosContainerResponse -> logger.info("[Create] Container created " + cosmosContainerResponse.container().id()))
.flatMap(cosmosContainerResponse -> Mono.just(cosmosContainerResponse.container()))
.publishOn(Schedulers.elastic())...
もちろん、CosmosClientインスタンス作成からCosmosContainerインスタンス作成まで一気に書くこともできます。
CosmosContainer cosmosContainer = CosmosClient.builder()
.endpoint(ENDPOINT)
.key(KEY)
.build()
.getDatabase(DATABASE)
.getContainer(CONTAINER);
アイテムの取得、追加・更新
コンテナからのアイテム取得もしくはアイテムの追加・更新には、以下のメソッドを利用します。
CosmosContainer# | 動作 | 備考 |
---|---|---|
getItem | id、partitionKeyを指定してアイテムを取得する | 該当する値がない場合はCosmosClientException |
createItem | 指定されたPOJOインスタンスでアイテムを作成する。 | 重複時はCosmosClientException |
upsertItem | 指定されたPOJOインスタンスでアイテムを作成する。重複する場合は更新する。 |
以下はPeopleというクラスのインスタンスp1、p2を順次追加する例です。この例ではupsertItemを使っています。
CosmosItemResponse response1 = container1.upsertItem(p1)
.doOnError(throwable -> logger.info("[Create] Unable to load p1 " + throwable.getMessage()))
.mergeWith(container1.upsertItem(p2))
.doOnError(throwable -> logger.info("[Create] Unable to load p2 " + throwable.getMessage()))
.doOnComplete(()->logger.info("[Create] Loading completed"))
.publishOn(Schedulers.elastic()).blockLast();
アイテムの更新・削除
アイテムの更新、削除は、一度CosmosContainer#getItem()で取得したのちに、CosmosItem#replace()もしくはCosmosItem#delete()を使います。CosmosContainer#delete()というメソッドもありますが、これはコンテナーを削除するためのメソッドで用途が異なります。
CosmosItem cosmosItem1 = container1.getItem("111", "US");
p1.setDescription("変更するよ");
cosmosItem1.replace(p1)
.doOnError(throwable -> logger.info("[Create] Unable to replace p1 " + throwable.getMessage()))
.publishOn(Schedulers.immediate()).block();
...
CosmosItem cosmosItem2 = container1.getItem("222", "CA");
cosmosItem2.delete().publishOn(Schedulers.immediate()).block();
クエリ、読み取り
CosmosContainer#queryItems()やCosmosContainer#readItems()を使います。用途の違いは以下の通りです。
CosmosContainer# | 動作 |
---|---|
queryItems | SQLで指定した要素、フィールドを取得する |
readAllItems | Cosmos DBのContainerに格納された値を全て取得する(つまり、Data Explorerで確認できる、_ridや_etagなどを含んだ値) |
以下はqueryItemsを使った例です。
FeedResponse<CosmosItemProperties> feedResponse = cosmosContainer.queryItems("select c.id, c.name, c.country, c.description from " + CONTAINER + " c", new FeedOptions().enableCrossPartitionQuery(true).maxDegreeOfParallelism(2))
.doOnError(throwable -> logger.info("[Query] Unable to query : " + throwable.getMessage()))
.publishOn(Schedulers.elastic()).blockLast();
変更フィード
変更フィードは、フィード対象のコンテナー、リースのためのコンテナーを指定し、変更フィードを取得した後の処理をChangeFeedProcessor.BuilderDefinition#handleChanges()に登録します。
Polling間隔はChangeFeedProcessor.BuilderDefinition#options()で登録します。
ChangeFeedProcessorインスタンスを生成後、ChangeFeedProcessor#start()でリスニングを開始します。なお、ChangeFeedProcessor#startではリスニングを開始するだけですので、変更フィードをサブスクライブしなければ情報は取得できません。そのため、subscribeメソッドを明示的に呼ぶ必要があります。
以下は変更フィードを取得する例です。Cosmos DBにデータが追加されたり、既存レコードへの更新が発生したりすると、ログに表示されます。
CosmosClient client = CosmosClient.builder()
.endpoint(ENDPOINT)
.key(KEY)
.build();
CosmosContainer feedContainer = client.getDatabase(DATABASE).createContainerIfNotExists(CONTAINER, "/country").flatMap(f -> Mono.just(f.container())).publishOn(Schedulers.elastic()).block();
CosmosContainer leaseContainer = client.getDatabase(DATABASE).createContainerIfNotExists(LEASE_CONTAINER, "/id").flatMap(f -> Mono.just(f.container())).publishOn(Schedulers.elastic()).block();
ChangeFeedProcessor.Builder()
.hostName(ENDPOINT)
.feedContainer(feedContainer)
.leaseContainer(leaseContainer)
.handleChanges(docs -> {
logger.info("[ChangeFeedSample] handleChanges START!");
for (CosmosItemProperties cosmosItemProperties : docs) {
logger.info("[ChangeFeedSample] ChangeFeed Received " + cosmosItemProperties.toJson(SerializationFormattingPolicy.INDENTED));
}
logger.info("[ChangeFeedSample] handleChanges End!");
})
.options(new ChangeFeedProcessorOptions().feedPollDelay(Duration.ofSeconds(3)))
.build()
.start()
.subscribe();