タグ別アーカイブ: Cosmos DB

Azure Databricksの集計結果をCosmos DB(MongoDB API)に書き込む

このエントリは2019/09/02現在の情報に基づくもので、将来の機能追加や廃止に伴い、記載内容との乖離が発生する可能性があります。

Cosmos DBには種々のAPIがあり、例えばMongoDBやCassandraなどを使っているアプリケーションをCosmos DBに置き換える場合、接続文字列を置き換えるだけで移行できる、ということになっているが、果たしてそれがApache Sparkの場合にでも言えるのかを確認した。

結論から言うと、確かにMongoDB Connector for Apache Sparkを設定し直すことで対応できた。以下に記録を残す。

Databricksで実施する処理

Azure SQL DatabaseのAdventureworksデータベース内にあるSalesLT.CustomerAddress、SalesLT.Address、SalesLT.CustomerをJoinし、_idとして、CustomerIDとAddressIDを組み合わせた文字列を生成する。単なるJoinなので、本来ならDatabase内で完結すべきなのは承知の上で、テスト目的で実施した。

準備するもの

  • Cosmos DB
    • MongoDB APIを選択
  • Azure Databricks
    • 今回は5.5 LTS (Apache Spark 2.4.3 / Scala 2.11) を選択
  • MongoDB Connector for Apache Spark
    • Maven Repositoryからorg.mongodb.spark:mongo-spark-connector_2.11の2.4.1を選択し、ライブラリとして登録しておく
    • Scalaのバージョンと整合しておく必要がある
  • Azure Databricks Notebook
    • 今回は動作確認のため、Notebookを使用。
    • 今回、言語はScalaを利用
  • サンプルデータソース
    • ファイルでも何でもよいが、今回はAzure SQL Databaseのサンプルデータベース(Adventureworks)を使用

Databricksにデータを取り込み、DataSetを生成

JDBCドライバを使ってAzure DatabricksでSQL Databaseからデータを取り込む。DataSetを作成する場合はCase Classが必要なので、こちらも作成しておく。

val driverClass = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
connectionProperties.setProperty("Driver", driverClass)
// Case Classes
case class Address(AddressID: Int, AddressLine1: String, AddressLine2: String, City: String, StateProvince: String, CountryRegion: String, PostalCode: String, rowguid: String, ModifiedDate: String)
case class Customer(CustomerID: Int, NameStyle: Boolean, Title: String, FirstName: String, MiddleName: String, LastName: String, Suffix: String, CompanyName: String, SalesPerson: String, EmailAddress: String, Phone: String, PasswordHash: String, PasswordSalt: String, rowguid: String, ModifiedDate: String)
case class CustomerAddress(CustomerID: Int, AddressID: Int, AddressType: String, rowguid: String, ModifiedDate: String)
// Create DataSets and load data to them
val customerAddress_DS = spark.read.jdbc(jdbcUrl, "SalesLT.CustomerAddress", connectionProperties).as[CustomerAddress]
val address_DS = spark.read.jdbc(jdbcUrl, "SalesLT.Address", connectionProperties).as[Address]
val customer_DS = spark.read.jdbc(jdbcUrl, "SalesLT.Customer", connectionProperties).as[Customer]

DataSet間でJoinしDataFrameを生成

各DataSetをJoinする。このとき、列名を===でつなぐJoinだと同じ名称の列ができてしまうので、列名だけを指定するようにしている。

val tempDS = customerAddress_DS.join(customer_DS, "CustomerID")
                          .join(address_DS, "AddressID")
                          .withColumn("_id", concat($"CustomerID", lit("-"), $"AddressID"))
                          .select("_id", "AddressID", "AddressLine1", "CustomerID", "CompanyName", "AddressType")

この結果、以下の列を持つDataFrameができあがる。

  • _id:string
  • AddressID:integer
  • AddressLine1:string
  • CustomerID:integer
  • CompanyName:string
  • AddressType:string

SQLで記述すると、以下のような処理を実行している。

select B.CustomerID || '-' || A.AddressID as _id,
          A.AddressID as AddressID,
          A.AddressLine1 as AddressLine1,
          B.CustomerID as CustomerID,
          B.CompanyName as CompanyName,
          C.AddressType as AddressType
  from Address A, Customer B, CustomerAddress C
where A.AddressID = C.AddressID and C.CustomerID = B.CustomerID

Cosmos DB (MongoDB API) に格納

Connectorを使うためのお作法に則って、構成を作成し、saveメソッドを呼び出す。

val writeConfigMap = Map(
    "database" -> "{database name}", 
    "collection" -> "{collection name}", 
    "uri" -> "mongodb://...", 
    "writeConcern.w" -> "majority")
val writeConfig: WriteConfig = WriteConfig(writeConfigMap, Some(WriteConfig(sc)))
MongoSpark.save(tempDF.write.mode(SaveMode.Overwrite), writeConfig)

注意すべきこと

Cosmos DB側のスループット設定が不足していると、最後の書き込みでエラーが発生することがある(HTTP 429)。これは書き込むデータ量に依存するので、単純な推奨値はない。そのため、適切なスループットの割り当てや、整合性の変更などを考慮する必要がある。もしどうしてもスループットが足りない場合は、何らかのスロットリング機構が必要である。

Cosmos DB SDK v3で冗長化されたインスタンスのリージョンやURLを取得する

このエントリは2019/08/26現在の情報を基にしており、将来の機能追加や廃止に伴い、記載内容との乖離が発生する可能性があります。また、この内容はSDK v3をベースにしています。


以前、以下のエントリを記載したが、SDK v2ベースだったため、v3を基にした内容を別途記載しておく。

Cosmos DBの地理冗長化に関するあれこれ (v2)
https://logico-jp.io/2019/05/09/cosmos-db-distributed-across-multi-regions-v2/

各Regionに冗長化されたCosmos DBのURIを取得する

v2で利用しているAPIは以下の場所に移動した。Package名称からもわかるように、将来隠蔽される可能性があるので注意が必要。

Package com.azure.data.cosmos.internal
https://azure.github.io/azure-sdk-for-java/track1reports/com/azure/data/cosmos/internal/package-summary.html

基本的にはv2と同じメソッドを呼び出すことができるが、少々異なる箇所もあるので以下に記載しておく。

地理冗長化先のリージョンとURL

AsyncCosmosClientクラスのインスタンスを作成する。getDatabaseAccount()はFlux<DatabaseAccount>を返すので、今回はListで受け取るようにしている。もちろんStreamで受け取ることもできる。

AsyncDocumentClient tempClient = new AsyncDocumentClient.Builder()
        .withServiceEndpoint(ENDPOINT)
        .withMasterKeyOrResourceToken(KEY)
        .build();

List<DatabaseAccount> databaseAccountList = tempClient.getDatabaseAccount().buffer().blockLast();

あとは、取得したDatabaseAccountに設定されているリージョンとURLを調べればよい。

databaseAccountList.forEach(databaseAccount -> {
    Iterator<DatabaseAccountLocation> it = databaseAccount.getReadableLocations().iterator();
    while (it.hasNext()) {
        DatabaseAccountLocation loc = it.next();
        System.out.println(loc.getName() + "---" + loc.getEndpoint());
    }
});

Cosmos DB Java SDK v3でマルチマスタを構成する

これは2019/08/21現在の情報を基にしたものです。将来の機能追加や廃止に伴って記載内容との乖離が発生する可能性があります。


Cosmos DBのSDKを使ってマルチマスタを構成する方法は以下のドキュメントに記載がある。

Azure Cosmos DB を使用するアプリケーションでマルチマスターを構成する
https://docs.microsoft.com/ja-jp/azure/cosmos-db/how-to-multi-master
Configure multi-master in your applications that use Azure Cosmos DB
https://docs.microsoft.com/en-us/azure/cosmos-db/how-to-multi-master

ただ、2019/08/21時点において、Java SDKを使った例は、v2のAsync Java SDK(CompletableFutureを使った実装)についてのみ紹介されており、SDK v3のReactorを使った実装を基にした記述にはなっていない。そのため、V3の場合の例を以下にまとめておく(当然、将来ドキュメントの記載が変更されるはず)。

v2の場合

v2の場合、以下のようにConnectionPolicyのプロパティを設定することで、マルチマスタ構成を設定していた。

ConnectionPolicy policy = new ConnectionPolicy();
policy.setUsingMultipleWriteLocations(true);
policy.setPreferredLocations(Collections.singletonList(region));
AsyncDocumentClient client =
    new AsyncDocumentClient.Builder()
        .withMasterKeyOrResourceToken(this.accountKey)
        .withServiceEndpoint(this.accountEndpoint)
        .withConsistencyLevel(ConsistencyLevel.Eventual)
        .withConnectionPolicy(policy).build();

v3の場合

v3でも同様のプロパティは存在する。設定方法はほぼ同じだが、メソッドが異なるので注意。

// String[] REGION = {"Japan East", "Japan West"};
ConnectionPolicy policy = new ConnectionPolicy()
        .usingMultipleWriteLocations(true)
        .preferredLocations(Arrays.asList(REGION));
CosmosClient client = CosmosClient.builder()
        .endpoint(ENDPOINT)
        .key(KEY)
        .consistencyLevel(ConsistencyLevel.EVENTUAL)
        .connectionPolicy(policy)
        .build();

ConnectionPolicy#usingMultipleWriteLocations(true) でマルチマスタの利用を宣言し、ConnectionPolicy#preferredLocations() を呼び出して、アクセス先のリージョン(Cosmos DBがレプリケートされている、書き込み可能なリージョン)を指定する。preferredLocationsへ渡す引数はListであることに注意(配列ではない)。ここで、1個だけリージョンを指定すると、当該リージョンが障害でアクセスできない場合は例外が発生するが、アプリケーションのデプロイ先リージョンを指定することで、プライマリリージョンに書き込みが集中しないように構成できる。

また、preferredLocationsに複数個のリージョンを指定すると、明示的にフォールバック時のアクセス順序を指定できる。ConnectionPolicyクラスのjavadocは以下。

com.azure.data.cosmos.ConnectionPolicy
https://azure.github.io/azure-cosmosdb-java/3.0.0/com/azure/data/cosmos/ConnectionPolicy.html

Azure Cosmos DB Java v3 SDK

(2019/08/16に3.1.0でCosmosDatabase#createContaineIfNotExists()の挙動が修正されていることを確認しました)

このエントリは2019/07/22現在の情報に基づくものです。将来の機能追加・廃止に伴い、記載内容との乖離が発生する可能性があります。

先日、Azure Cosmos DB Java V3 SDKがリリースされました。

Azure Cosmos DB Java V3 SDK now available
https://azure.microsoft.com/ja-jp/updates/azure-cosmos-db-java-v3-sdk-now-available/

特徴は以下の通りです。

  • Reactorベースの非同期プログラミングモデル(v2は非同期の場合Completable Futureベース)
  • Azure Cosmos DBのdirect TCPトランスポートプロトコルをサポート
  • 直感的で使いやすいAPI (Fluentスタイルで記述できる)
  • 名前空間の変更(以前はcom.microsoft.azure.cosmosdb.* だったが、v3ではcom.azure.data.cosmos.*)。

JavaDocは以下から参照できます。

Package com.azure.data.cosmos
https://azure.github.io/azure-cosmosdb-java/3.0.0/

ソースコードは以下から参照できます。ただ、masterブランチは2.4.3のままなので、明示的にv3を指定してください。

https://github.com/Azure/azure-cosmosdb-java/tree/v3

動作確認

依存関係

依存関係解決のために以下をpom.xmlに追加します。

<dependency>
  <groupId>com.microsoft.azure</groupId>
  <artifactId>azure-cosmos</artifactId>
  <version>3.1.0</version>
</dependency>

CosmosClientインスタンスの作成と終了

Cosmos DBの接続クライアントであるCosmosClientインスタンスを生成します。終了はCosmosClient#close()を発行します。

CosmosClient client = CosmosClient.builder()
        .endpoint(ENDPOINT)
        .key(KEY)
        .build();
...
client.close();

Databaseへの接続を開く

接続は既存のデータベースへの接続のためのCosmosClient#getDatabase()、もしくは新規データベース作成のためのCosmosClient#createDatabaseIfNotExists()もしくはCosmosClient#createDatabase()を使ってCosmosDatabaseインスタンスを作成します。それぞれの違いは以下の通りです。

CosmosClient#動作備考
getDatabase指定したデータベースを開くデータベースが存在しない場合は例外発生(CosmosClientException)
createDatabase指定した名称のデータベースを作成するデータベースが存在する場合は例外発生(CosmosClientException)
createDatabaseIfNotExists指定した名称のデータベースが存在しない場合にデータベースを作成
存在する場合はデータベースを開く

以下はCosmosClient#createDatabaseIfNotExists()を使った例です。わかりやすくするためにblock()を使っていますが、実運用のコードではReactiveのメリットを生かしつつ、Fluentスタイルで記述を連ねてください。

CosmosDatabase cosmosDatabase = client.createDatabaseIfNotExists(DATABASE)
        .doOnError(throwable -> logger.info("[Create] Unable to create Database: " + throwable.getMessage()))
        .doOnSuccess(cosmosDatabaseResponse -> logger.info("[Create] Database created " + cosmosDatabaseResponse.database().id()))
        .flatMap(cosmosDatabaseResponse -> Mono.just(cosmosDatabaseResponse.database()))
        .publishOn(Schedulers.elastic()).block();

Containerへの接続を開く

接続は既存のコンテナーへの接続のためのCosmosDatabase#getContainer()、もしくは新規データベース作成のためのCosmosDatabase#createContaineIfNotExists()もしくはCosmosDatabase#createContainer()を使ってCosmosContainerインスタンスを作成します。それぞれの違いは以下の通りです。

CosmosDatabase#動作備考
getContainer指定したコンテナーを開くコンテナーが存在しない場合は例外発生(CosmosClientException)
createContainer指定した名称のコンテナーを作成するコンテナーが存在する場合は例外発生(CosmosClientException)
createContainerIfNotExists指定した名称のコンテナーが存在しない場合にコンテナーを作成
存在する場合はコンテナーを開く
3.1.0で修正済み
2019/07/22現在、以下のBugでNull Pointer Exceptionが返る
https://github.com/Azure/azure-cosmosdb-java/issues/221

以下はCosmosDatabase#createContainer()を使った例です。doOnError()などを使い、エラー時の処理を追加できます。

CosmosContainer container1 = cosmosDatabase.createContainer(new CosmosContainerProperties(CONTAINER, "/country"))
        .doOnError(throwable -> logger.info("[Create] Unable to create Container " + throwable.getMessage()))
        .doOnSuccess(cosmosContainerResponse -> logger.info("[Create] Container created " + cosmosContainerResponse.container().id()))
        .flatMap(cosmosContainerResponse -> Mono.just(cosmosContainerResponse.container()))
        .publishOn(Schedulers.elastic())...

もちろん、CosmosClientインスタンス作成からCosmosContainerインスタンス作成まで一気に書くこともできます(Reactorを使っている感じがする)。

CosmosContainer cosmosContainer = CosmosClient.builder()
        .endpoint(ENDPOINT)
        .key(KEY)
        .build()
        .getDatabase(DATABASE)
        .getContainer(CONTAINER);

アイテムの取得、追加・更新

コンテナからのアイテム取得もしくはアイテムの追加・更新には、以下のメソッドを利用します。

CosmosContainer#動作備考
getItemid、partitionKeyを指定してアイテムを取得する該当する値がない場合はCosmosClientException
createItem指定されたPOJOインスタンスでアイテムを作成する。重複時はCosmosClientException
upsertItem指定されたPOJOインスタンスでアイテムを作成する。重複する場合は更新する。

以下はPeopleというクラスのインスタンスp1、p2を順次追加する例です。この例ではupsertItemを使っています。

CosmosItemResponse response1 = container1.upsertItem(p1)
        .doOnError(throwable -> logger.info("[Create] Unable to load p1 " + throwable.getMessage()))
        .mergeWith(container1.upsertItem(p2))
        .doOnError(throwable -> logger.info("[Create] Unable to load p2 " + throwable.getMessage()))
        .doOnComplete(()->logger.info("[Create] Loading completed"))
        .publishOn(Schedulers.elastic()).blockLast();

アイテムの更新・削除

アイテムの更新、削除は、一度CosmosContainer#getItem()で取得したのちに、CosmosItem#replace()もしくはCosmosItem#delete()を使います。CosmosContainer#delete()というメソッドもありますが、これはコンテナーを削除するためのメソッドで用途が異なります。

CosmosItem cosmosItem1 = container1.getItem("111", "US");
p1.setDescription("変更するよ");
cosmosItem1.replace(p1)
        .doOnError(throwable -> logger.info("[Create] Unable to replace p1 " + throwable.getMessage()))
        .publishOn(Schedulers.immediate()).block();
CosmosItem cosmosItem2 = container1.getItem("222", "CA");
cosmosItem2.delete().publishOn(Schedulers.immediate()).block();

クエリ、読み取り

CosmosContainer#queryItems()やCosmosContainer#readItems()を使います。用途の違いは以下の通りです。

CosmosContainer#動作
queryItemsSQLで指定した要素、フィールドを取得する
readAllItemsCosmos DBのContainerに格納された値を全て取得する(つまり、Data Explorerで確認できる、_ridや_etagなどを含んだ値)

以下はqueryItemsを使った例です。

FeedResponse<CosmosItemProperties&gt; feedResponse = cosmosContainer.queryItems("select c.id, c.name, c.country, c.description from " + CONTAINER + " c", new FeedOptions().enableCrossPartitionQuery(true).maxDegreeOfParallelism(2))
        .doOnError(throwable -&gt; logger.info("[Query] Unable to query : " + throwable.getMessage()))
        .publishOn(Schedulers.elastic()).blockLast();

変更フィード

変更フィードは、フィード対象のコンテナー、リースのためのコンテナーを指定し、変更フィードを取得した後の処理をChangeFeedProcessor.BuilderDefinition#handleChanges()に登録します。

Polling間隔はChangeFeedProcessor.BuilderDefinition#options()で登録します。

ChangeFeedProcessorインスタンスを生成後、ChangeFeedProcessor#start()でリスニングを開始します。なお、ChangeFeedProcessor#startではリスニングを開始するだけですので、変更フィードをサブスクライブしなければ情報は取得できません。そのため、subscribeメソッドを明示的に呼ぶ必要があります。

以下は変更フィードを取得する例です。Cosmos DBにデータが追加されたり、既存レコードへの更新が発生したりすると、ログに表示されます。

CosmosClient client = CosmosClient.builder()
        .endpoint(ENDPOINT)
        .key(KEY)
        .build();
CosmosContainer feedContainer = client.getDatabase(DATABASE).createContainerIfNotExists(CONTAINER, "/country").flatMap(f -> Mono.just(f.container())).publishOn(Schedulers.elastic()).block();
CosmosContainer leaseContainer = client.getDatabase(DATABASE).createContainerIfNotExists(LEASE_CONTAINER, "/id").flatMap(f -> Mono.just(f.container())).publishOn(Schedulers.elastic()).block();
ChangeFeedProcessor.Builder()
        .hostName(ENDPOINT)
        .feedContainer(feedContainer)
        .leaseContainer(leaseContainer)
        .handleChanges(docs -> {
            logger.info("[ChangeFeedSample] handleChanges START!");
            for (CosmosItemProperties cosmosItemProperties : docs) {
                logger.info("[ChangeFeedSample] ChangeFeed Received " + cosmosItemProperties.toJson(SerializationFormattingPolicy.INDENTED));
            }
            logger.info("[ChangeFeedSample] handleChanges End!");
        })
        .options(new ChangeFeedProcessorOptions().feedPollDelay(Duration.ofSeconds(3)))
        .build()
        .start()
        .subscribe();

Azure SignalR Serviceを試す

このエントリは2019/07/12現在の情報に基づいています。将来の機能追加や廃止に伴い、このエントリの内容との乖離が発生する可能性があります。

Azure SignalR Serviceとは

リアルタイムWebを実現するためのコンポーネントを提供するマネージドサービス。簡単に言うと、WebSocketsやServer Sent Events、Long Pollingなどの機能を提供する。詳細は以下のドキュメントを参照。

Azure SignalR サービスとは
https://docs.microsoft.com/ja-jp/azure/azure-signalr/signalr-overview
What is Azure SignalR Service?
https://docs.microsoft.com/en-us/azure/azure-signalr/signalr-overview

単純にTutorialをやるのも何かあれなので、FucntionsでCosmos DBの変更フィードを読み取り、SignalR Serviceに投げ込んで、投げ込まれたメッセージをリアルタイムで確認する、というある種の技術の無駄遣いを通じて、動作確認をしてみた。使う言語はいつも通りJavaのみである。

なお、Cosmos DBの変更フィードを取得して複数のサービスに伝播する場合、Functionsの後続にEvent HubやEvent Grid、もしくはService Busを配置して連携するのが王道である。

やりたいこと(動作確認のためのシナリオ)

IoTデバイス等からCosmos DBに温度、湿度、デバイスID、測定時刻がPOSTされるので、そのデータ着信を変更フィードで受け取り、WebSocket経由でデータを受け取りたい。具体的には、下図の赤枠で囲んだ部分。簡単のため、認証の仕組みは入れていない。

必要なもの

  • Azure SignalR Service
  • Functions
  • Cosmos DB

準備

Azure SignalR Service

ふつうにインスタンスを作成する。接続文字列は後ほどFunctionsで使うため、記録しておく。

Cosmos DB

こちらもふつうにPortalもしくはAzure CLIでCosmos DBのアカウントを作成する。変更フィードを利用するため、SQL APIもしくはGremlin APIを選択しなければならないが、今回はSQL APIを使う。接続文字列はFunctionsで使うため、記録しておく。 Database、Containerも作成しておく。スループットはデフォルトでかまわない。

今回は以下のようなデータを使う。

{
    "device":"deviceXXX",
    "timestamp":"2019-07-01T01:00:00+09:00",
    "humid":40.1,
    "temp":22.1
}

Functions

PortalもしくはAzure CLIでFunctionsアプリを作成する。SignalRとCosmos DBの接続文字列を設定しておく(Fuction App > 構成)。

SignalRの接続文字列は、[アプリケーション設定]で設定する。名前をAzureSignalRConnectionString としておく。

Cosmos DBの接続文字列は、[接続文字列]で設定する。名前はAzureCosmosDBConnection としておく。データソースの種類はCustomでよい。

開発言語としてJavaを使うため、以下を実行してソースコードを生成しておく。リージョンなどは適宜変更する。

mvn archetype:generate \
-DarchetypeGroupId=com.microsoft.azure \
-DarchetypeArtifactId=azure-functions-archetype \
-DappName={functionAppName} \
-DappRegion={region} \
-DresourceGroup={resourceGroup} \
-DgroupId=com.{functionAppName}.group \
-DartifactId={functionAppName}-functions \
-Dpackage=com.{functionAppName} \
-DinteractiveMode=false

ローカル環境でテストできるよう、以下のコマンドを実行して、local.settings.jsonにAzure Portalで登録した値を反映させておく。

func azure functionapp fetch-app-settings {functionAppName}

上記コマンドを実行すると、Azure上の設定をlocal.settings.jsonに反映できる。以下は実行した結果のlocal.settings.jsonの例(文字列等はマスクしている)。

{
  "IsEncrypted": false,
  "Values": {
    "AzureWebJobsStorage": "...",
    "FUNCTIONS_WORKER_RUNTIME": "java",
    "FUNCTIONS_EXTENSION_VERSION": "~2",
    "APPINSIGHTS_INSTRUMENTATIONKEY": "...",
    "AzureSignalRConnectionString": "..."
  },
  "ConnectionStrings": {
    "AzureCosmosDBConnection": {
      "ConnectionString": "..."
    }
  },
  "Host": {
    "CORS": "*"
  }
}
negotiate function

クライアントが接続して、WebSocket接続のための情報を入手するためのFunction。これはお作法が決まっているので、Hubだけ変更し、それ以外はサンプルコードをそのまま使う。

Azure Functions における SignalR サービスのバインド
https://docs.microsoft.com/ja-jp/azure/azure-functions/functions-bindings-signalr-service
SignalR Service bindings for Azure Functions
https://docs.microsoft.com/en-us/azure/azure-functions/functions-bindings-signalr-service

具体的には以下。

@FunctionName("negotiate")
public SignalRConnectionInfo negotiate(
        @HttpTrigger(name = "req",
                methods = {HttpMethod.POST},
                authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String&gt;&gt; req,
        @SignalRConnectionInfoInput(name = "connectionInfo",
                hubName = "climatehub") SignalRConnectionInfo connectionInfo) {
    return connectionInfo;
}

このFunctionはHTTP Triggerなので、POSTで呼び出すと以下のような応答が返る。

{
    "url": "https://xxxx.service.signalr.net/client/?hub=climatehub",
    "accessToken": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJuYmYiOjE1NjIwMzU3MTYsImV4cCI6MTU2MjAzNzUxNiwiaWF0IjoxNTYyMDM1NzE2LCJhdWQiOiJodHRwczovL2NsaW1hdGVpbmZvLnNlcnZpY2Uuc2lnbmFsci5uZXQvY2xpZW50Lz9odWI9Y2xpbWF0ZWh1YiJ9.z7WSaTmMGT34D5leeiqfJT4XUUet66eDW7ZMMKpDubk"
}
sendMessage function

こちらは、Cosmos DBの変更フィードをSignalRに投げ込む。そのため、バインドは以下のものを使う。

  • Input : @CosmosDBTrigger
  • Output : @SignalROutput

@CosmosDBTriggerでCosmos DB接続のための設定を追加し、文字列(JSON)として渡ってくる変更フィードを受けるためのArrayListを引数にしている。Functionの戻り値をSignalRへの出力にバインドしている。

@FunctionName("sendMessage")
@SignalROutput(name = "$return", hubName = "climatehub")
public SignalRMessage sendClimateInfo(
        @CosmosDBTrigger(feedPollDelay = 1000,
                name = "climateBinding",
                databaseName = "ClimateDB",
                collectionName = "ClimateContainer",
                leaseCollectionName = "Leases",
                createLeaseCollectionIfNotExists = true,
                connectionStringSetting = "AzureCosmosDBConnection")
                ArrayList<String&gt; climateJSONList,
        final ExecutionContext context) {
    context.getLogger().info(climateJSONList.size() + " record(s) is/are inserted.");
    climateJSONList.stream().forEach(s -&gt; System.err.println("Changed: " + s));
    ArrayList<ClimateInfo&gt; climateInfoArrayList = new ArrayList<&gt;();
    for (String s : climateJSONList) {
        JSONObject json = new JSONObject(s);
        climateInfoArrayList.add(new ClimateInfo(json.getString("device"), json.getString("timestamp"), json.getFloat("humid"), json.getFloat("temp")));
    }
    SignalRMessage message = new SignalRMessage();
    message.target = "climates";
    message.arguments.addAll(climateInfoArrayList);
    return message;
}

Cosmos DBの変更フィードはCosmos DBに格納されているデータがそのまま渡ってくるため、必要な情報だけSignalRに渡すように編集している。

クライアントからの接続時には、Hub名としてmessage.targetに指定しているclimatesを使う(climatehubではないことに注意)。

クライアント

簡単のため、Javaコンソールアプリケーションを使うことにする(本当はJavaScriptのほうがもっと楽だとは思うが)。

public static void main(String... args) {
    connection = HubConnectionBuilder.create(apiBaseUrl).build();

    connection.setServerTimeout(100000);
    connection.start().blockingAwait();

    connection.on("climates", (message) -> {
        System.out.println("[climates] device: " + message.getDevice() + " timestamp: " + message.getTimestamp() + " humid: " + message.getHumid() + " temp: " + message.getTemp());
    }, ClimateInfo.class);
}

URLを指定して接続を作成、開始し、Hub名を指定して着信を待つ。

動作確認

事前にクライアントを複数起動しておき、Cosmos DBにデータを追加していった場合の挙動を確認する。クライアントにはデータが届いていることがわかる。

SignalRのメトリックを見ると、接続クライアント数は2。

そして、流通しているメッセージの件数もトラッキングできていることを確認。

Cosmos DB (Core) のSQLクエリ

これは2019/05/15現在の備忘録であり、今後機能の改廃によって記述内容と一致しなくなる可能性があります。

Azure Cosmos DB 用の SQL 言語リファレンス
https://docs.microsoft.com/ja-jp/azure/cosmos-db/sql-api-query-reference
Azure Cosmos DB の SQL クエリの例
https://docs.microsoft.com/ja-jp/azure/cosmos-db/how-to-sql-query

このエントリで使うデータ

国内の空港のIATAコードや日本語および英語での略称を保持するようなデータベース(Domestic)を使います。具体的には、以下のようなデータが格納されています。idとしてIATAコードを使っています。

[
    ....
    {
        "id": "KMI",
        "ja": "宮崎",
        "en": "Miyazaki"
    },
    {
        "id": "FUJ",
        "ja": "五島福江",
        "en": "Goto Fukue"
    },
    {
        "id": "IKI",
        "ja": "壱岐",
        "en": "Iki"
    },
    {
        "id": "TSJ",
        "ja": "対馬",
        "en": "Tsushima"
    },
    {
        "id": "NGS",
        "ja": "長崎",
        "en": "Nagasaki"
    },
    {
        "id": "KMJ",
        "ja": "熊本",
        "en": "Kumamoto"
    },
    {
        "id": "OIT",
        "ja": "大分",
        "en": "Oita"
    },
    {
        "id": "HSG",
        "ja": "佐賀",
        "en": "Saga"
    },
    {
        "id": "KKJ",
        "ja": "北九州",
        "en": "Kitakyushu"
    },
    {
        "id": "FUK",
        "ja": "福岡",
        "en": "Fukuoka"
    },
    {
        "id": "KCZ",
        "ja": "高知",
        "en": "Kochi"
    },
    {
        "id": "MYJ",
        "ja": "松山",
        "en": "Matsuyama"
    },
    {
        "id": "TKS",
        "ja": "徳島",
        "en": "Tokushima"
    },
    ....
]

全件検索

これは単純にselect * from Airportsでよいですが、Cosmos DB内部で使う要素まで取得してしまうので、取得したい要素を明示したほうが扱いやすいです。また、要素はCase Sensitiveなので注意が必要です。

SELECT D.id, D.ja, D.en from Domestic D
[
    ....
    {
        "id": "KMI",
        "ja": "宮崎",
        "en": "Miyazaki"
    },
    {
        "id": "FUJ",
        "ja": "五島福江",
        "en": "Goto Fukue"
    },
    {
        "id": "IKI",
        "ja": "壱岐",
        "en": "Iki"
    },
    {
        "id": "TSJ",
        "ja": "対馬",
        "en": "Tsushima"
    },
    {
        "id": "NGS",
        "ja": "長崎",
        "en": "Nagasaki"
    },
    {
        "id": "KMJ",
        "ja": "熊本",
        "en": "Kumamoto"
    },
    {
        "id": "OIT",
        "ja": "大分",
        "en": "Oita"
    },
    {
        "id": "HSG",
        "ja": "佐賀",
        "en": "Saga"
    },
    {
        "id": "KKJ",
        "ja": "北九州",
        "en": "Kitakyushu"
    },
    {
        "id": "FUK",
        "ja": "福岡",
        "en": "Fukuoka"
    },
    {
        "id": "KCZ",
        "ja": "高知",
        "en": "Kochi"
    },
    {
        "id": "MYJ",
        "ja": "松山",
        "en": "Matsuyama"
    },
    {
        "id": "TKS",
        "ja": "徳島",
        "en": "Tokushima"
    },
    ....
]

条件指定(完全一致)

where xxx=yyy です。

SELECT D.id, D.ja, D.en from Domestic D where D.id = 'HND'
[
    {
        "id": "HND",
        "ja": "東京(羽田)",
        "en": "Tokyo (Haneda)"
    }
]

包含

where xxx like ‘%yyy%’ は使えません。where contains( xxx, yyy ) です。

SELECT D.id, D.ja, D.en from Domestic D where contains( D.id, 'D')
[
    {
        "id": "HKD",
        "ja": "函館",
        "en": "Hakodate"
    },
    {
        "id": "DNA",
        "ja": "嘉手納",
        "en": "Kadena"
    },
    {
        "id": "OKD",
        "ja": "札幌(丘珠)",
        "en": "Sapporo (Okadama)"
    },
    {
        "id": "HND",
        "ja": "東京(羽田)",
        "en": "Tokyo (Haneda)"
    },
    {
        "id": "SDJ",
        "ja": "仙台",
        "en": "Sendai"
    }
]

前方一致

where xxx like ‘yyy%’ は使えません。where startswith( xxx, yyy ) です。

SELECT D.id, D.ja, D.en from Domestic D where startswith( D.id, 'HN')
[
    {
        "id": "HNA",
        "ja": "花巻",
        "en": "Hanamaki"
    },
    {
        "id": "HND",
        "ja": "東京(羽田)",
        "en": "Tokyo (Haneda)"
    }
]

後方一致

同様に、where xxx like ‘%yyy’ は使えません。where endswith( xxx, yyy ) です。

SELECT D.id, D.ja, D.en from Domestic D where endswith( D.id, 'ND')
[
    {
        "id": "HND",
        "ja": "東京(羽田)",
        "en": "Tokyo (Haneda)"
    }
]

範囲指定

文字列値や数値の範囲指定はANSI SQLと同様、betweenを使います。

SELECT D.id, D.ja, D.en from Domestic D where SUBSTRING(D.id, 0, 1) between 'A' and 'D'
[
    {
        "id": "AOJ",
        "ja": "青森",
        "en": "Aomori"
    },
    {
        "id": "DNA",
        "ja": "嘉手納",
        "en": "Kadena"
    },
    {
        "id": "ASJ",
        "ja": "奄美",
        "en": "Amami"
    },
    {
        "id": "AXT",
        "ja": "秋田",
        "en": "Akita"
    },
    {
        "id": "AKJ",
        "ja": "旭川",
        "en": "Asahikawa"
    },
    {
        "id": "CTS",
        "ja": "札幌(千歳)",
        "en": "Sapporo (Chitose)"
    }
]

候補への合致

1個以上の選択肢に合致するものを抽出する場合は、ANSI SQLと同じくINを使います。

SELECT D.id, D.ja, D.en from Domestic D where D.id in ('HND', 'NRT', 'KOJ')
[
    {
        "id": "KOJ",
        "ja": "鹿児島",
        "en": "Kagoshima"
    },
    {
        "id": "NRT",
        "ja": "東京(成田)",
        "en": "Tokyo (Narita)"
    },
    {
        "id": "HND",
        "ja": "東京(羽田)",
        "en": "Tokyo (Haneda)"
    }
]

Cosmos DB Binding for Azure Functions

このエントリは2019/05/13現在の備忘録であり、以下のドキュメントの内容に対するメモです。将来この機能や設定方法が変わる可能性はあります。対象はFunction 2.xです。また、以下のコード例はいずれもJavaで記述しています。

Azure Functions 2.x の Azure Cosmos DB バインド
https://docs.microsoft.com/ja-jp/azure/azure-functions/functions-bindings-cosmosdb-v2

Functionsをローカル環境でテストする場合(http://localhost:7071/…)、以下の前提条件を満たしておく必要があります。

Azure Functions バインド拡張機能を登録する
ローカル開発の Azure Functions Core Tools
https://docs.microsoft.com/ja-jp/azure/azure-functions/functions-bindings-register#local-development-azure-functions-core-tools

このドキュメントには記載がありませんが、実際にはService Fabric SDKも必要です。必ずダウンロードしてインストールしておく必要があります。

Windows で開発環境を準備する(OS X、Linuxでの開発の場合は、下記URLでドロップダウンボックスでLinux、OS Xを選択します)
https://docs.microsoft.com/ja-jp/azure/service-fabric/service-fabric-get-started

あと、Cosmos DBへの接続のための接続文字列は、ローカルテスト環境とAzure上では設定箇所が違うのでご注意を。

  • ローカルテスト環境:local.settings.json
  • 本番環境:Application settings > Connection strings

Trigger

Azure Cosmos DB のトリガーは Azure Cosmos DB 変更フィードを使用して、パーティション間の挿入と更新をリッスンします。 変更フィードは、削除ではなく挿入と更新を発行します。

Azure Functions 2.x の Azure Cosmos DB バインド – トリガー
https://docs.microsoft.com/ja-jp/azure/azure-functions/functions-bindings-cosmosdb-v2#trigger

変更されたドキュメントは変更順にリストとして出力されます。@CosmosDBTriggerをアノテーションとして付加したFunctionが定期的に変更リストをポーリングしており、リストに追加があれば、そのFunctionsのロジックが動作します。

package cosmos2functions;

import com.microsoft.azure.functions.ExecutionContext;
import com.microsoft.azure.functions.annotation.CosmosDBTrigger;
import com.microsoft.azure.functions.annotation.FunctionName;

import java.util.List;

public class Function {
    @FunctionName("cosmosDBMonitor")
    public void cosmosDbLog(
            @CosmosDBTrigger(
                    name = "CosmosDB2Functions",
                    databaseName = "database",
                    collectionName = "Items",
                    leaseCollectionName = "Leases",
                    createLeaseCollectionIfNotExists = true,
                    connectionStringSetting = "AzureCosmosDBConnection")
                    List<String&gt; items,

            final ExecutionContext context
    ) {
        context.getLogger().info(items.size() + " item(s) is/are inserted.");
        if (!items.isEmpty()) {
            items.stream().forEach(s -&gt;context.getLogger().info("Element: " + s));
        }
    }
}

取得できるのは変更された結果のドキュメントです。Stringとして取得してJSONObjectに変換すれば、変更データを取得できます。

なお、アノテーションで設定するプロパティは以下のドキュメントに記載があります。

CosmosDBTrigger Interface
https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.functions.annotation.CosmosDBTrigger?view=azure-java-stable

ドキュメントに記載のないデフォルト値は以下の通りです。

プロパティデフォルト値
name
dataType“”
databaseName
collectionName
leaseConnectionStringSetting“”
leaseCollectionName“”
leaseDatabaseName“”
createLeaseCollectionIfNotExistsfalse
leasesCollectionThroughput-1
leaseCollectionPrefix“”
checkpointInterval-1
checkpointDocumentCount-1
feedPollDelay5000
connectionStringSetting
leaseRenewInterval17000
leaseAcquireInterval13000
leaseExpirationInterval60000
maxItemsPerInvocation-1
startFromBeginningfalse
preferredLocations“”

Input

SQL API を使用して 1 つ以上の Azure Cosmos DB ドキュメントを取得して関数の入力パラメーターに渡します。ドキュメント ID またはクエリ パラメーターは、関数を呼び出したトリガーに基づいて決定することができます。

Azure Functions 2.x の Azure Cosmos DB バインド – 入力
https://docs.microsoft.com/ja-jp/azure/azure-functions/functions-bindings-cosmosdb-v2#input

動的な条件設定のために、Query ParameterもしくはPath Parameterで渡す値を指定できます。ドキュメント上はPath Parameterをルートデータと表現しています。Cosmos DBからPOJOもしくはStringとしてデータを取得できます。

Query Parameter (+ SQL Query)

Query Parameter(以下の例ではaddress)をFunctionに渡しています。Function内では、指定されたAddressをwhere句に指定するSQL Queryを発行し、結果をPOJOとして取得、HTTP Responseとして返しています。

このFunctionには、 <Function App URL>/api/cosmosDBInputMonitor でアクセスできます。

// Itemというクラスを作成しておく

@FunctionName("cosmosDBInputMonitor")
public HttpResponseMessage cosmosdbQuery(
        @HttpTrigger(
                name = "req",
                methods = {HttpMethod.GET},
                authLevel = AuthorizationLevel.ANONYMOUS)
                HttpRequestMessage<Optional<String&gt;&gt; request,
        @CosmosDBInput(
                name = "CosmosDB2Functions",
                databaseName = "database",
                collectionName = "Items",
                partitionKey = "{Query.address}",
                sqlQuery = "select * from Items r where contains(r.address, {address})",
                connectionStringSetting = "AzureCosmosDBConnection")
                Item[] items,
        final ExecutionContext context
) {
    if (Optional.of(items).isPresent()) {
        return request.createResponseBuilder(HttpStatus.OK)
                .header("Content-Type", "application/json")
                .body(items)
                .build();
    } else {
        return request.createResponseBuilder(HttpStatus.NOT_FOUND)
                      .body("{\"error\":\"Not found\"}")
                      .build();
    }
}

Path Parameter (+ SQL Query)

以下の例はPath Parameterで指定する例です。この例では、

route = items/{address}

と設定しているため、このFunctionには、 <Function App URL>/api/items/{address} でアクセスできます。

// Itemというクラスを作成しておく

@FunctionName("cosmosDBInputMonitor2")
public HttpResponseMessage cosmosdbQuery2(
        @HttpTrigger(
                name = "req",
                methods = {HttpMethod.GET},
                authLevel = AuthorizationLevel.ANONYMOUS,
                route = "items/{address}")
                HttpRequestMessage<Optional<String&gt;&gt; request,
        @CosmosDBInput(
                name = "CosmosDB2Functions",
                databaseName = "database",
                collectionName = "Items",
                sqlQuery = "select * from items r where contains(r.address, {address})",
                connectionStringSetting = "AzureCosmosDBConnection")
                Items[] items,
        final ExecutionContext context
) {
    if (Optional.of(items).isPresent()) {
        return request.createResponseBuilder(HttpStatus.OK)
                .header("Content-Type", "application/json")
                .body(items)
                .build();
    } else {
        return request.createResponseBuilder(HttpStatus.NOT_FOUND)
                      .body("{\"error\":\"Not found\"}")
                      .build();
    }
}

アノテーションで設定するプロパティは以下のドキュメントに記載があります。

CosmosDBInput Interface
https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.functions.annotation.cosmosdbinput?view=azure-java-stable

ドキュメントに記載のないデフォルト値は以下の通りです。

プロパティデフォルト値
name
dataType“”
databaseName
collectionName
id“”
sqlQuery“”
connectionStringSetting
partitionKey“”

Output

Azure Cosmos DB 出力バインドを使用すると、SQL API を使って Azure Cosmos DB データベースに新しいドキュメントを記述できます。

Azure Functions 2.x の Azure Cosmos DB バインド – 入力
https://docs.microsoft.com/ja-jp/azure/azure-functions/functions-bindings-cosmosdb-v2#output

@CosmosDBOutput アノテーションをつける場合、

  • Functionの戻り値をCosmos DBに格納することを意図している場合は、メソッドに注釈をつける
  • Outputバインディングを使ってCosmos DBに格納することを意図している場合は、OutputBindingクラスインスタンスに注釈をつける

という注意事項があります。

戻り値を格納する

POSTで複数の項目を入れて、Echo backするようなFunctionです。このFunctionの戻り値をCosmos DBに格納しています。

// 戻り値を格納したいので、 @CosmosDBOutputは外
@FunctionName("output1")
@CosmosDBOutput(name = "CosmosDB2Functions",
        databaseName = "database",
        collectionName = "Items",
        connectionStringSetting = "AzureCosmosDBConnection")
public Item[] echo (
        @HttpTrigger(
                name = "req",
                methods = {HttpMethod.POST},
                authLevel = AuthorizationLevel.ANONYMOUS)
                HttpRequestMessage<Optional<Item[]&gt;&gt; request,
        final ExecutionContext context) {
    return request.getBody().get();
}

戻り値としてrequest.getBody()ではなく、request.getBody().get()を指定しているのは、前者の場合、以下のようにvalue要素ごと格納されるからです。

{
  "value": {
    ....
  }
}

OutputBindingを利用して格納する

この場合、メソッドの引数を使ってCosmos DBにデータを格納するため、@CosmosDBOutputは引数outputItemに対して指定していることに注意する必要があります。

// OutputBindingを使うので、@CosmosDBOutputは引数outputItemに指定

@FunctionName("output2")
public HttpResponseMessage echo (
        @HttpTrigger(
                name = "req",
                methods = {HttpMethod.POST},
                authLevel = AuthorizationLevel.ANONYMOUS)
                HttpRequestMessage<Optional<List<Item&gt;&gt;&gt; request,
        @CosmosDBOutput(
                name = "CosmosDB2Functions",
                databaseName = "database",
                collectionName = "Items",
                connectionStringSetting = "AzureCosmosDBConnection")
                OutputBinding<List<Item&gt;&gt; outputItem,
        final ExecutionContext context
) {
    outputItem.setValue(request.getBody().get());
    return request.createResponseBuilder(HttpStatus.CREATED)
                  .header("content-type","application/json")
                  .body(request.getBody().get())
                  .build();
}

アノテーションで設定するプロパティは以下のドキュメントに記載があります。

CosmosDBOutput Interface
https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.functions.annotation.cosmosdboutput?view=azure-java-stable

ドキュメントに記載のないデフォルト値は以下の通りです。

プロパティデフォルト値
name
dataType“”
databaseName
collectionName
createIfNotExistsfalse
connectionStringSetting
partitionKey“”
collectionThroughput-1
useMultipleWriteLocationsfalse
preferredLocations“”