Azure Blob Storageでの並行処理

このエントリは2020/12/24現在の情報に基づいています。将来の機能追加や変更に伴い、記載内容との乖離が発生する可能性があります。

この頃、とある界隈では、Blob Storageでの書き込み後の読み取り整合性が何やら話題になっているらしい。その話題とは以下のようなもの。

Blob storageにデータを格納したり変更するメソッドを呼び出してデータを永続化してから、参照のためのメソッドの呼出しが可能になるまで時間を要することがあったが、それが改善され、書き込み後すぐに参照できるようになった。

つまり、それまでは結果整合性だけを担保していたが、このたび書き込み後の読み取り整合性を担保するようになった、ということらしい。実に素晴らしい。

さて翻ってAzureではどうなのか?詳細は以下に記載がある通りで、以前から結果整合性だけでなく、Snapshot isolationを使い読み取り一貫性を保証(どこかのDatabaseの売り文句のよう)しており、Optimistic concurrency、Pessimistic concurrency、Last writer wins(最終書込者優先)の3種類の並行処理のモデルが利用可能である。

BLOB ストレージ内でのコンカレンシーの管理 / Managing Concurrency in Blob storage
https://docs.microsoft.com/azure/storage/blobs/concurrency-manage

選択できるということは、Blobおよびコンテナーへのアクセスを管理するにあたり、採用するモデルを指定する必要がある。選択できるのはOptimistic concurrency、Pessimistic concurrencyのいずれか。明示的に指定しない場合はLast writer wins(最終書込者優先)。

で、上記ページのサンプルは例によって.NET SDKのみなので、同等のサンプルをJava SDKで書いてみる。Java SDKの使い方は以下を参照。

クイック スタート:Java v12 SDK で BLOB を管理する / Quickstart: Manage blobs with Java v12 SDK
https://docs.microsoft.com/azure/storage/blobs/storage-quickstart-blobs-java

依存関係は、2020/12/24現在以下のよう。

<dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-storage-blob</artifactId>
    <version>12.10.0-beta.1</version>
</dependency>

事前にやっておくこと

Blob storageを作成しておかねばならないので、Azure CLIでもAzure Portalでもよいので作成しておく。CLIでやるならこんな感じ。

# パラメータの設定
RG=blob-rg
LOCATION=japaneast
STORAGEACCOUNT=blob4logicojp
CONTAINER=logico

# リソースグループの作成
az group create -n $RG -l $LOCATION

# Storage accountの作成
az storage account create \
    -g $RG \
    -n $STORAGEACCOUNT \
    -l $LOCATION \
    --sku Standard_ZRS \
    --encryption-services blob

# 後で必要なので、リソースIDを取得しておく
id=$(az storage account show \
    -g $RG \
    -n $STORAGEACCOUNT \
    --query id -o tsv)

# ユーザーにContributor権限を付与(Portalだと勝手に付与されるが、CLIでやる場合には明示的な付与が必要)
az ad signed-in-user show --query objectId -o tsv | az role assignment create \
    --role "Storage Blob Data Contributor" \
    --assignee @- \
    --scope $id

# Containerの作成
az storage container create \
    --account-name $STORAGEACCOUNT \
    -n $CONTAINER \
    --auth-mode login

Optimistic Concurrency

バージョンを含むETagを使って問い合わせる。3回目の更新では、対象のオブジェクトが更新されているため、ETagの値が変化している。そのため、1回目のETagと一致する前提で更新しようとすると、HTTP 412(Precondition Failed、必須条件の失敗)として例外が発生する。以下の例ではBlobClient#uploadWithResponse()を使っているが、Responseは例外(BlobStorageException)で取得できるので、素のBlobClient#upload()を使ってもよい。

// Optimistic
void optimistic(BlobClient blobClient) {
try {
// 1st
String blobContents1 = "First update. Overwrite blob if it exists.";
byte[] byteArray = blobContents1.getBytes(StandardCharsets.UTF_8);
InputStream targetStream = new ByteArrayInputStream(byteArray);
BlobParallelUploadOptions blobParallelUploadOptions = new BlobParallelUploadOptions(targetStream, byteArray.length);
Response<BlockBlobItem> blobItemResponse = blobClient.uploadWithResponse(blobParallelUploadOptions, null, Context.NONE);
System.out.println("Content: " + blobContents1);
System.out.println("Status: " + blobItemResponse.getStatusCode());
System.out.println("ETag: " + blobItemResponse.getValue().getETag());
String originalEtag = blobItemResponse.getValue().getETag();
// 2nd
String blobContents2 = "Second update overwrites first update.";
byteArray = blobContents2.getBytes(StandardCharsets.UTF_8);
targetStream = new ByteArrayInputStream(byteArray);
blobParallelUploadOptions = new BlobParallelUploadOptions(targetStream, byteArray.length);
blobItemResponse = blobClient.uploadWithResponse(blobParallelUploadOptions, null, Context.NONE);
System.out.println("Content: " + blobContents2);
System.out.println("Status: " + blobItemResponse.getStatusCode());
System.out.println("ETag: " + blobItemResponse.getValue().getETag());
// 3rd ETagが一致する場合に上書き(書き換わっているから失敗する)
String blobContents3 = "Third update. If-Match condition set to original ETag.";
byteArray = blobContents3.getBytes(StandardCharsets.UTF_8);
targetStream = new ByteArrayInputStream(byteArray);
BlobRequestConditions blobRequestConditions = new BlobRequestConditions().setIfMatch(originalEtag);
blobParallelUploadOptions = new BlobParallelUploadOptions(targetStream, byteArray.length)
.setRequestConditions(blobRequestConditions);
blobItemResponse = blobClient.uploadWithResponse(blobParallelUploadOptions, null, Context.NONE);
System.out.println("Content: " + blobContents3);
System.out.println("Status: " + blobItemResponse.getStatusCode());
System.out.println("ETag: " + blobItemResponse.getValue().getETag());
}
catch(BlobStorageException e) {
if(e.getStatusCode() == HttpURLConnection.HTTP_PRECON_FAILED) {
System.out.println("Precondition failure as expected. Blob's ETag does not match ETag provided.");
}
else {
e.printStackTrace();
}
}
}

Pessimistic Concurrency

リース (Lease) 、つまりロックを取得して問い合わせる。3回目の更新ではリースを取得せずに更新しようとしているために、HTTP 412(Precondition Failed、必須条件の失敗)として例外が発生する。また、リースの有効期間は15秒なので、それを超えた時点で以前取得したLease IDを使うと、HTTP 409 (Conflict)としてやはり例外が発生する。

// Pessimistic
void pessimistic(BlobClient blobClient) {
BlobLeaseClient blobLeaseClient = new BlobLeaseClientBuilder().blobClient(blobClient).buildClient();
try {
// 1st
String blobContents1 = "First update. Overwrite blob if it exists.";
byte[] byteArray = blobContents1.getBytes(StandardCharsets.UTF_8);
InputStream targetStream = new ByteArrayInputStream(byteArray);
BlobParallelUploadOptions blobParallelUploadOptions = new BlobParallelUploadOptions(targetStream, byteArray.length);
Response<BlockBlobItem> blobItemResponse = blobClient.uploadWithResponse(blobParallelUploadOptions, null, Context.NONE);
System.out.println("Content: " + blobContents1);
System.out.println("Status: " + blobItemResponse.getStatusCode());
System.out.println("ETag: " + blobItemResponse.getValue().getETag());
// Leaseを取得
String leaseId = blobLeaseClient.acquireLease(15);
System.out.println("LeaseId: " + leaseId);
// 2nd Lease IDを取得しているので成功するはず
String blobContents2 = "Second update. Lease ID provided on request.";
byteArray = blobContents2.getBytes(StandardCharsets.UTF_8);
targetStream = new ByteArrayInputStream(byteArray);
BlobRequestConditions blobRequestConditions = new BlobRequestConditions().setLeaseId(leaseId);
blobParallelUploadOptions = new BlobParallelUploadOptions(targetStream, byteArray.length).setRequestConditions(blobRequestConditions);
blobItemResponse = blobClient.uploadWithResponse(blobParallelUploadOptions, null, Context.NONE);
System.out.println("Content: " + blobContents2);
System.out.println("Status: " + blobItemResponse.getStatusCode());
System.out.println("ETag: " + blobItemResponse.getValue().getETag());
// 3rd Lease IDを取得していないので失敗する
String blobContents3 = "Third update. No lease ID provided.";
byteArray = blobContents3.getBytes(StandardCharsets.UTF_8);
targetStream = new ByteArrayInputStream(byteArray);
blobParallelUploadOptions = new BlobParallelUploadOptions(targetStream, byteArray.length);
blobItemResponse = blobClient.uploadWithResponse(blobParallelUploadOptions, null, Context.NONE);
System.out.println("Content: " + blobContents3);
System.out.println("Status: " + blobItemResponse.getStatusCode());
System.out.println("ETag: " + blobItemResponse.getValue().getETag());
}
catch(BlobStorageException e) {
if(e.getStatusCode() == HttpURLConnection.HTTP_PRECON_FAILED) {
System.out.println("Precondition failure as expected. The lease ID was not provided.");
}
else {
e.printStackTrace();
}
}
finally {
blobLeaseClient.releaseLease();
}
}

サンプルコードは以下から。

https://github.com/anishi1222/managing-concurrency-in-blob-storage

コメントを残す

以下に詳細を記入するか、アイコンをクリックしてログインしてください。

WordPress.com ロゴ

WordPress.com アカウントを使ってコメントしています。 ログアウト /  変更 )

Google フォト

Google アカウントを使ってコメントしています。 ログアウト /  変更 )

Twitter 画像

Twitter アカウントを使ってコメントしています。 ログアウト /  変更 )

Facebook の写真

Facebook アカウントを使ってコメントしています。 ログアウト /  変更 )

%s と連携中