このエントリは2019/09/02現在の情報に基づくもので、将来の機能追加や廃止に伴い、記載内容との乖離が発生する可能性があります。
Cosmos DBには種々のAPIがあり、例えばMongoDBやCassandraなどを使っているアプリケーションをCosmos DBに置き換える場合、接続文字列を置き換えるだけで移行できる、ということになっているが、果たしてそれがApache Sparkの場合にでも言えるのかを確認した。
結論から言うと、確かにMongoDB Connector for Apache Sparkを設定し直すことで対応できた。以下に記録を残す。
Databricksで実施する処理
Azure SQL DatabaseのAdventureworksデータベース内にあるSalesLT.CustomerAddress、SalesLT.Address、SalesLT.CustomerをJoinし、_idとして、CustomerIDとAddressIDを組み合わせた文字列を生成する。単なるJoinなので、本来ならDatabase内で完結すべきなのは承知の上で、テスト目的で実施した。
準備するもの
- Cosmos DB
- MongoDB APIを選択
- Azure Databricks
- 今回は5.5 LTS (Apache Spark 2.4.3 / Scala 2.11) を選択

- MongoDB Connector for Apache Spark
- Maven Repositoryからorg.mongodb.spark:mongo-spark-connector_2.11の2.4.1を選択し、ライブラリとして登録しておく
- Scalaのバージョンと整合しておく必要がある

- Azure Databricks Notebook
- 今回は動作確認のため、Notebookを使用。
- 今回、言語はScalaを利用
- サンプルデータソース
- ファイルでも何でもよいが、今回はAzure SQL Databaseのサンプルデータベース(Adventureworks)を使用
Databricksにデータを取り込み、DataSetを生成
JDBCドライバを使ってAzure DatabricksでSQL Databaseからデータを取り込む。DataSetを作成する場合はCase Classが必要なので、こちらも作成しておく。
val driverClass = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
connectionProperties.setProperty("Driver", driverClass)
// Case Classes
case class Address(AddressID: Int, AddressLine1: String, AddressLine2: String, City: String, StateProvince: String, CountryRegion: String, PostalCode: String, rowguid: String, ModifiedDate: String)
case class Customer(CustomerID: Int, NameStyle: Boolean, Title: String, FirstName: String, MiddleName: String, LastName: String, Suffix: String, CompanyName: String, SalesPerson: String, EmailAddress: String, Phone: String, PasswordHash: String, PasswordSalt: String, rowguid: String, ModifiedDate: String)
case class CustomerAddress(CustomerID: Int, AddressID: Int, AddressType: String, rowguid: String, ModifiedDate: String)
// Create DataSets and load data to them
val customerAddress_DS = spark.read.jdbc(jdbcUrl, "SalesLT.CustomerAddress", connectionProperties).as[CustomerAddress]
val address_DS = spark.read.jdbc(jdbcUrl, "SalesLT.Address", connectionProperties).as[Address]
val customer_DS = spark.read.jdbc(jdbcUrl, "SalesLT.Customer", connectionProperties).as[Customer]
DataSet間でJoinしDataFrameを生成
各DataSetをJoinする。このとき、列名を===でつなぐJoinだと同じ名称の列ができてしまうので、列名だけを指定するようにしている。
val tempDS = customerAddress_DS.join(customer_DS, "CustomerID")
.join(address_DS, "AddressID")
.withColumn("_id", concat($"CustomerID", lit("-"), $"AddressID"))
.select("_id", "AddressID", "AddressLine1", "CustomerID", "CompanyName", "AddressType")
この結果、以下の列を持つDataFrameができあがる。
- _id:string
- AddressID:integer
- AddressLine1:string
- CustomerID:integer
- CompanyName:string
- AddressType:string
SQLで記述すると、以下のような処理を実行している。
select B.CustomerID || '-' || A.AddressID as _id,
A.AddressID as AddressID,
A.AddressLine1 as AddressLine1,
B.CustomerID as CustomerID,
B.CompanyName as CompanyName,
C.AddressType as AddressType
from Address A, Customer B, CustomerAddress C
where A.AddressID = C.AddressID and C.CustomerID = B.CustomerID
Cosmos DB (MongoDB API) に格納
Connectorを使うためのお作法に則って、構成を作成し、saveメソッドを呼び出す。
val writeConfigMap = Map(
"database" -> "{database name}",
"collection" -> "{collection name}",
"uri" -> "mongodb://...",
"writeConcern.w" -> "majority")
val writeConfig: WriteConfig = WriteConfig(writeConfigMap, Some(WriteConfig(sc)))
MongoSpark.save(tempDF.write.mode(SaveMode.Overwrite), writeConfig)
注意すべきこと
Cosmos DB側のスループット設定が不足していると、最後の書き込みでエラーが発生することがある(HTTP 429)。これは書き込むデータ量に依存するので、単純な推奨値はない。そのため、適切なスループットの割り当てや、整合性の変更などを考慮する必要がある。もしどうしてもスループットが足りない場合は、何らかのスロットリング機構が必要である。