Azure DatabricksでAzure Cache for Redisにデータを流し込む

このエントリは2019/12/25現在の情報に基づいています。将来の機能追加・変更に伴って記載内容との乖離が発生する可能性があります。

Key-Valueストアとして利用可能なAzureのサービスを考えると、Cosmos DBやTable Storageなどが候補に挙がるが、もちろんAzure Cache for Redisも利用可能である。Azure Cache for Redisはメモリ上にデータを保持するため、高速な読み書きができるだけではなく、ストレージへ永続化したり、ストレージからのインポート・エクスポートを利用できたりするのはすでに以下のエントリで記載している通り。

Azure Cache for Redis
https://logico-jp.io/2019/10/29/azure-cache-for-redis/

Databricksでデータを流し込む

ユースケースと背景

リクエストに付加されたクエリパラメータに従い、RDBMSに対してクエリを発行し、そのレスポンスを返すREST APIがあるものとする。クエリを処理するRDBMSのパフォーマンスが(チューニングをしたとしても)頭打ちになってしまったため、事前にクエリ結果を取得し、キャッシュに載せておくことを考えた。

なお、REST APIからは当該RDBMSを参照するのみで、更新することはないが、RDBMSのデータ更新は60分ごとに発生するため、定期的なバッチ処理としてクエリ結果を作成しておく必要があった。

ソリューション例

上記のような場合、RDBMSとアプリケーション(API)間にCacheを挟むのはある意味王道ではある。In-memory Data Gridと称されるものであれば、RDBMSのデータを保持しておき、クエリロジックを仕込んでおいて結果を返してもらう仕組みも取ることが出来るが、今回はただのCacheとしてRedisを使うことにする。利用するツールは以下の通り(完全にお手盛りではある)。

  • キャッシュ:Azure Cache for Redis
  • RDBMS:SQL Database(何でもよいが、たまたま手近にあるのがこれだったので)
  • データ処理:Azure Databricks

流れは、SQL DatabaseからデータをDatabricksにロードし、Spark SQLでごにょごにょ操作し、結果をRedisに書き込む、というシンプルなもの。注意点は、Spark SQLを使って作成しているDataFrame/DataSetからRDDに変換する際、単純にRDDを作成すると、

org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]

になってしまい、SparkContext#toRedisKV() が期待する

org.apache.spark.rdd.RDD[(String, String)]

の形にはならない。そのため、RDD作成時に

rdd.map(row=>(row.getString(0), row.getString(1)))

のようにmapで整形しておく必要がある。

あとはドキュメント等に記載の通りにやればOK。以下はその例。ここでは、_idとvalという列で構成されているDataSetを作り、それをDataFrame、RDDに変換している。

val tempRDD = customerAddress_DS.join(customer_DS, "CustomerID")
                          .join(address_DS, "AddressID")
                          .withColumn("_id", concat(lit("address="), $"AddressID", lit("&customer="), $"CustomerID"))
                          .withColumn("val", to_json(struct($"AddressID", $"AddressLine1", $"CustomerID", $"CompanyName", $"AddressType")))
                          .select("_id", "val")
                          .repartitionByRange($"_id").toDF().rdd.map(row=>(row.getString(0), row.getString(1)))
val redisServerDnsAddress = "Redisのホスト"
val redisPortNumber = (Redisのポート番号、数値として指定)
val redisPassword = "Redisへの接続パスワード"
val redisConfig = new RedisConfig(new RedisEndpoint(redisServerDnsAddress, redisPortNumber, redisPassword))
val sc = SparkSession
                .builder()
                .appName("MyApp")
                .master("local[*]")
                .config("spark.redis.host", redisServerDnsAddress)
                .config("spark.redis.port", redisPortNumber)
                .getOrCreate().sparkContext
sc.toRedisKV(tempRDD)(redisConfig)

やはりメモリアクセスだと速い、というのを実感。以前Cosmos DBでやってみたときは、事前にスループットを十分に割り当てないとHTTP 429が返ることがあったが、今回はエラーもなく無事に成功。

Azure Databricksの集計結果をCosmos DB(MongoDB API)に書き込む
https://logico-jp.io/2019/09/04/write-data-to-cosmos-db-mongodb-api-using-azure-databricks/

とはいえ、メモリなのでお財布にはちょっと厳しい。

コメントを残す

以下に詳細を記入するか、アイコンをクリックしてログインしてください。

WordPress.com ロゴ

WordPress.com アカウントを使ってコメントしています。 ログアウト /  変更 )

Facebook の写真

Facebook アカウントを使ってコメントしています。 ログアウト /  変更 )

%s と連携中