日別アーカイブ: 2019年9月4日

Azure Databricksの集計結果をCosmos DB(MongoDB API)に書き込む

このエントリは2019/09/02現在の情報に基づくもので、将来の機能追加や廃止に伴い、記載内容との乖離が発生する可能性があります。

Cosmos DBには種々のAPIがあり、例えばMongoDBやCassandraなどを使っているアプリケーションをCosmos DBに置き換える場合、接続文字列を置き換えるだけで移行できる、ということになっているが、果たしてそれがApache Sparkの場合にでも言えるのかを確認した。

結論から言うと、確かにMongoDB Connector for Apache Sparkを設定し直すことで対応できた。以下に記録を残す。

Databricksで実施する処理

Azure SQL DatabaseのAdventureworksデータベース内にあるSalesLT.CustomerAddress、SalesLT.Address、SalesLT.CustomerをJoinし、_idとして、CustomerIDとAddressIDを組み合わせた文字列を生成する。単なるJoinなので、本来ならDatabase内で完結すべきなのは承知の上で、テスト目的で実施した。

準備するもの

  • Cosmos DB
    • MongoDB APIを選択
  • Azure Databricks
    • 今回は5.5 LTS (Apache Spark 2.4.3 / Scala 2.11) を選択
  • MongoDB Connector for Apache Spark
    • Maven Repositoryからorg.mongodb.spark:mongo-spark-connector_2.11の2.4.1を選択し、ライブラリとして登録しておく
    • Scalaのバージョンと整合しておく必要がある
  • Azure Databricks Notebook
    • 今回は動作確認のため、Notebookを使用。
    • 今回、言語はScalaを利用
  • サンプルデータソース
    • ファイルでも何でもよいが、今回はAzure SQL Databaseのサンプルデータベース(Adventureworks)を使用

Databricksにデータを取り込み、DataSetを生成

JDBCドライバを使ってAzure DatabricksでSQL Databaseからデータを取り込む。DataSetを作成する場合はCase Classが必要なので、こちらも作成しておく。

val driverClass = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
connectionProperties.setProperty("Driver", driverClass)
// Case Classes
case class Address(AddressID: Int, AddressLine1: String, AddressLine2: String, City: String, StateProvince: String, CountryRegion: String, PostalCode: String, rowguid: String, ModifiedDate: String)
case class Customer(CustomerID: Int, NameStyle: Boolean, Title: String, FirstName: String, MiddleName: String, LastName: String, Suffix: String, CompanyName: String, SalesPerson: String, EmailAddress: String, Phone: String, PasswordHash: String, PasswordSalt: String, rowguid: String, ModifiedDate: String)
case class CustomerAddress(CustomerID: Int, AddressID: Int, AddressType: String, rowguid: String, ModifiedDate: String)
// Create DataSets and load data to them
val customerAddress_DS = spark.read.jdbc(jdbcUrl, "SalesLT.CustomerAddress", connectionProperties).as[CustomerAddress]
val address_DS = spark.read.jdbc(jdbcUrl, "SalesLT.Address", connectionProperties).as[Address]
val customer_DS = spark.read.jdbc(jdbcUrl, "SalesLT.Customer", connectionProperties).as[Customer]

DataSet間でJoinしDataFrameを生成

各DataSetをJoinする。このとき、列名を===でつなぐJoinだと同じ名称の列ができてしまうので、列名だけを指定するようにしている。

val tempDS = customerAddress_DS.join(customer_DS, "CustomerID")
                          .join(address_DS, "AddressID")
                          .withColumn("_id", concat($"CustomerID", lit("-"), $"AddressID"))
                          .select("_id", "AddressID", "AddressLine1", "CustomerID", "CompanyName", "AddressType")

この結果、以下の列を持つDataFrameができあがる。

  • _id:string
  • AddressID:integer
  • AddressLine1:string
  • CustomerID:integer
  • CompanyName:string
  • AddressType:string

SQLで記述すると、以下のような処理を実行している。

select B.CustomerID || '-' || A.AddressID as _id,
          A.AddressID as AddressID,
          A.AddressLine1 as AddressLine1,
          B.CustomerID as CustomerID,
          B.CompanyName as CompanyName,
          C.AddressType as AddressType
  from Address A, Customer B, CustomerAddress C
where A.AddressID = C.AddressID and C.CustomerID = B.CustomerID

Cosmos DB (MongoDB API) に格納

Connectorを使うためのお作法に則って、構成を作成し、saveメソッドを呼び出す。

val writeConfigMap = Map(
    "database" -> "{database name}", 
    "collection" -> "{collection name}", 
    "uri" -> "mongodb://...", 
    "writeConcern.w" -> "majority")
val writeConfig: WriteConfig = WriteConfig(writeConfigMap, Some(WriteConfig(sc)))
MongoSpark.save(tempDF.write.mode(SaveMode.Overwrite), writeConfig)

注意すべきこと

Cosmos DB側のスループット設定が不足していると、最後の書き込みでエラーが発生することがある(HTTP 429)。これは書き込むデータ量に依存するので、単純な推奨値はない。そのため、適切なスループットの割り当てや、整合性の変更などを考慮する必要がある。もしどうしてもスループットが足りない場合は、何らかのスロットリング機構が必要である。