Networking I/O with Virtual Threads – Under the hood

原文はこちら。
The original article was written by Chris Hegarty (Consulting Member of Technical Staff, Networking Lead of the Java Platform Group at Oracle).
https://inside.java/2021/05/10/networking-io-with-virtual-threads/

Project Loomは、Javaプラットフォーム上で使いやすく、高スループットの軽量な並行処理と新しいプログラミングモデルをサポートするためのJava VMの機能とAPIを提供することを意図しています。

Project Loom wiki
https://wiki.openjdk.java.net/display/loom/Main

これにより、多くの興味あるエキサイティングな展望がもたらされますが、その一つが、ネットワークとやり取りするコードの簡素化です。今日のサーバーは、サポートできるスレッド数よりもはるかに多くのオープンソケット接続を処理できるため、チャンスと課題の両方が生まれています。

残念ながら、ネットワークと対話するスケーラブルなコードを書くのは簡単ではありません。同期型APIを使用では、それ以上はスケールしないという閾値があります。なぜなら、このようなAPIはI/O操作を行う際にブロックすることがあり、操作の準備が整うまで(例えば、利用可能なデータがないときにソケットからデータを読み取ろうとする場合など)スレッドが拘束されてしまうからです。スレッドは(現在のところ)Javaプラットフォームにおいて高価なリソースであり、I/O操作が完了待機に縛られるにはコストがかかりすぎます。この制限を回避するために、私たちは一般的に非同期I/Oやリアクティブフレームワークを利用することが多いでしょう。これらのフレームワークを使って、I/O操作でスレッドを拘束するのではなく、I/O操作が完了したときや準備ができたときにコールバックやイベント通知を使用するコードを作成できるからです。

非同期APIやノンブロッキングAPIは、同期APIに比べて扱いが難しいのですが、その理由の一つは、人間にとって不自然なコード構成になってしまうからです。同期APIは、ほとんどの場合簡単に扱えます。コードは書きやすく、読みやすく、デバッグも簡単です(スタックトレースが意味をなします!)。しかし、先ほど説明したように、同期APIを使ったコードは非同期APIのようにはスケールしません。このため、私たちは悪い選択を迫られることになります。より簡単な同期型のコードを選んでスケールしないことを受け入れるか、よりスケーラブルな非同期型のコードを選んでその複雑さに対処するかです。これは素晴らしい選択ではありません。Project Loomの魅力的な価値提案の一つは、このような選択をしなくて済むようにすることです。つまり、同期コードがスケーリングできるようにするというものです。

この記事では、JavaプラットフォームのNetworking APIが、仮想スレッド上で呼び出されたときに、どのように動作するかを見ていきます。詳細については実装上の問題であり、コードを書く際に知る必要はありません。しかし、ボンネットの中でどのように動作しているかを理解することは興味深いことであり、わからないままだと、先ほどの難しい選択をしなければならなくなるかもしれない疑問への回答の助けになるかもしれません。

Virtual Threads

先に進む前に、Project Loomの新しい種類のスレッドである仮想スレッドについて少し知っておく必要があります。

仮想スレッドとは、OSではなく、Java仮想マシンがスケジューリングするユーザーモードのスレッドです。仮想スレッドが必要とするリソースは少なく、1つのJava仮想マシンで数百万の仮想スレッドをサポートすることもあります。仮想スレッドは、I/O操作が完了するのを待つなど、ブロックされている時間が多いタスクを実行するのに適しています。

プラットフォーム・スレッド(現在のJavaプラットフォームでおなじみのスレッド)は、通常、OSがスケジューリングするカーネル・スレッドに1対1でマッピングされます。プラットフォーム・スレッドは通常、OSがメンテナンスする大きなスタックやその他のリソースを持ちます。

仮想スレッドは、通常、キャリアスレッドとして使用される少数のプラットフォームスレッドを使います。仮想スレッドで実行されるコードは、通常、基盤となるキャリアスレッドを意識することはありません。ロック操作やI/O操作は、キャリアスレッドをある仮想スレッドから別の仮想スレッドに再スケジューリングするスケジューリングポイントです。仮想スレッドを保留できますが、保留するとスケジューリングできなくなります。 仮想スレッドを保留解除すると、再びスケジューリングできるようになります。

Networking APIs

Javaプラットフォームには、大きく分けて2種類のネットワーキングAPIがあります。

  1. 非同期型 – AsynchronousServerSocketChannel や AsynchronousSocketChannel
  2. 同期型 – java.netの Socket / ServerSocket / DatagramSocket、 java.nio.channels SocketChannel / ServerSocketChannel / DatagramChannel

最初のカテゴリーである非同期型は、最初にI/O操作を開始したスレッドとは別のスレッドで、後のどこかのタイミングで完了するものです。定義上、これらのAPIはブロッキングシステムコールを発生させないため、仮想スレッドで実行しても特別な処理は必要ありません。

もう一つのカテゴリーである同期型は、仮想スレッドで実行したときの動作という点で、より興味深いものです。このカテゴリーには、ノンブロッキングモードで構成可能なNIOチャンネルがあります。このようなチャンネルは通常、SelectorのようなI/Oイベント通知メカニズムに登録され、ブロッキングシステムコールを実行しません。非同期ネットワークAPIと同様に、I/O操作がブロッキングシステムコールを呼び出さず、通常はSelectorに任されるため、仮想スレッドで実行する場合には特別な処理は必要ありません。このため、残るのはjava.netSocketの種類とブロッキングモードに設定されているNIOチャンネルです。これらが仮想スレッドでどのように動作するかを見てみましょう。

同期APIのセマンティクスでは、一度開始されたI/O操作は、呼び出しスレッド(calling thread)で完了または失敗した後に、呼び出し元に制御が戻されることになっています。しかし、I/O操作が「準備できていない」場合、例えば、ソケットから読み取るデータがない場合はどうでしょうか?

Synchronous Blocking APIs

同期型のネットワーキングJava APIは、仮想スレッドで実行されると、基盤となるネイティブ・ソケットをノンブロッキング・モードに切り替えます。Javaコードから呼び出されたI/O操作がすぐに完了しない場合(ネイティブソケットがEAGAIN(not ready/would block)を返す場合)、配下のネイティブソケットはJVM全体のイベント通知メカニズム(Poller)に登録され、仮想スレッドを保留します。基礎となるI/O操作の準備が整うと(Pollerにイベントが到着すると)、仮想スレッドを保留解除して、配下のソケット操作を再試行します。

例を挙げて詳しく見てみましょう。retrieveURLsメソッドは、指定された複数のURLをダウンロードし、そのレスポンスを返します。

// Tuple of URL and response bytes
record URLData (URL url, byte[] response) { }

List<URLData> retrieveURLs(URL... urls) throws Exception {
  try (var executor = Executors.newVirtualThreadExecutor()) {
    var tasks = Arrays.stream(urls)
            .map(url -> (Callable<URLData>)() -> getURL(url))
            .toList();
    return executor.submit(tasks)
            .filter(Future::isCompletedNormally)
            .map(Future::join)
            .toList();
  }
}

retrieveURLsメソッドが(各URLに対して1個の)タスクのリストを作成し、これらをexecutorに送信し、結果を待ちます。executorは各タスクごとに新しい仮想スレッドを起動し、getURLを呼び出します。簡単にするために、正常に完了したタスクのみが返されるものとします。

このgetURLメソッドは同期型のURLConnection APIを使ってレスポンスを得るように明確に記載されています。

URLData getURL(URL url) throws IOException {
  try (InputStream in = url.openStream()) {
    return new URLData(url, in.readAllBytes());
  }
}

readAllBytesメソッドは同期型の一括読み取り操作でレスポンスの全てのバイトを読み取ります。このメソッドの内部では、最終的にjava.netのソケット入力ストリームのお読み取りメソッドに到達します。

retrieveURLsを使ってHTTP URLをダウンロードする小さなプログラムを実行すると、HTTPサーバーが完全なレスポンスを提供しない場合、以下のようにスレッドの状態を検査できます。

$ java Main & echo $!
89215
$ jcmd 89215 JavaThread.dump threads.txt
Created /Users/chegar/threads.txt

threads.txtには、テストプログラムのメインスレッドとともに、通常のシステムスレッドと読み取り操作でブロックされた仮想スレッドが表示されています。注意:仮想スレッドは明示的に割り当てられない限り名前を持たないため、無名です。

$ cat threads.txt
...
"<unnamed>" #15 virtual
  java.base/java.lang.Continuation.yield(Continuation.java:402)
  java.base/java.lang.VirtualThread.yieldContinuation(VirtualThread.java:367)
  java.base/java.lang.VirtualThread.park(VirtualThread.java:534)
  java.base/java.lang.System$2.parkVirtualThread(System.java:2370)
  java.base/jdk.internal.misc.VirtualThreads.park(VirtualThreads.java:60)
  java.base/sun.nio.ch.NioSocketImpl.park(NioSocketImpl.java:184)
  java.base/sun.nio.ch.NioSocketImpl.park(NioSocketImpl.java:212)
  java.base/sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:320)
  java.base/sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:356)
  java.base/sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:807)
  java.base/java.net.Socket$SocketInputStream.read(Socket.java:988)
  java.base/java.io.BufferedInputStream.fill(BufferedInputStream.java:255)
  java.base/java.io.BufferedInputStream.read1(BufferedInputStream.java:310)
  java.base/java.io.BufferedInputStream.lockedRead(BufferedInputStream.java:382)
  java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:361)
  java.base/sun.net.www.MeteredStream.read(MeteredStream.java:141)
  java.base/java.io.FilterInputStream.read(FilterInputStream.java:132)
  java.base/sun.net.www.protocol.http.HttpURLConnection$HttpInputStream.read(HttpURLConnection.java:3648)
  java.base/java.io.InputStream.readNBytes(InputStream.java:409)
  java.base/java.io.InputStream.readAllBytes(InputStream.java:346)
  Main.getURL(Main.java:24)
  Main.lambda$retrieveURLs$0(Main.java:13)
  java.base/java.util.concurrent.FutureTask.run(FutureTask.java:268)
  java.base/java.util.concurrent.ThreadExecutor$TaskRunner.run(ThreadExecutor.java:385)
  java.base/java.lang.VirtualThread.run(VirtualThread.java:295)
  java.base/java.lang.VirtualThread$VThreadContinuation.lambda$new$0(VirtualThread.java:172)
  java.base/java.lang.Continuation.enter0(Continuation.java:372)
  java.base/java.lang.Continuation.enter(Continuation.java:365)

スタックフレームを下から順に見ていきましょう。まず、仮想スレッドのセットアップに関連するいくつかのフレームが見えます。continuation(継続)は仮想スレッドが内部的に採用するVMメカニズムです。これらは、executorサービスが作成した新しいスレッドに対応しています。次は、retrieveURLgetURLを呼び出すテストプログラムに対応するフレームが表示されます。3つ目は、HTTPプロトコルハンドラーと、最終的にはソケット入力ストリーム実装であるreadメソッドに対応するフレームが表示されます。最後に、これらのフレームを追ってスタックを上に移動すると、仮想スレッドが保留されたことがわかります。これは、サーバーが完全なレスポンスを送信していないため、ソケットから読み取るのに十分なデータがないことから予想されることです。しかし、ソケットにデータが到着した場合、どのようにして仮想スレッドの保留を解除するのでしょうか?

thread.txtの他のシステムスレッドをもう少しよく見てみると、以下のようです。

"Read-Poller" #16
  java.base@17-internal/sun.nio.ch.KQueue.poll(Native Method)
  java.base@17-internal/sun.nio.ch.KQueuePoller.poll(KQueuePoller.java:65)
  java.base@17-internal/sun.nio.ch.Poller.poll(Poller.java:195)
  java.base@17-internal/sun.nio.ch.Poller.lambda$startPollerThread$0(Poller.java:65)
  java.base@17-internal/sun.nio.ch.Poller$$Lambda$14/0x00000008010579c0.run(Unknown Source)
  java.base@17-internal/java.lang.Thread.run(Thread.java:1522)
  java.base@17-internal/jdk.internal.misc.InnocuousThread.run(InnocuousThread.java:161)

このスレッドは、JVM全体のread pollerです。根本的には、このスレッドが基本的なイベントループを実行し、仮想スレッドで呼び出されたときにすぐには準備ができない同期ネットワークの読み取り、接続、および受け入れの操作をすべて監視します。I/O操作の準備が整うと、pollerに通知され、その後、保留された適切な仮想スレッドの保留を解除します。書き込み操作には、これと同等のWrite-Pollerがあります。

上記のスタックトレースはmacOS上でテストプログラムを実行した際に取得されたもので、macOS上のpoller実装であるkqueueに関連するスタックフレームが表示されています。Linuxではpollerとしてepollを、Windowsではwepoll(Winsock用のAncillary Function Driverでepollに似たAPIを提供する)を使用しています。

kqueue
https://www.freebsd.org/cgi/man.cgi?query=kqueue&sektion=2
epoll
https://man7.org/linux/man-pages/man7/epoll.7.html
wepoll – epoll for Windows
https://github.com/piscisaureus/wepoll

pollerは、ファイル記述子と仮想スレッドのマップを管理します。ファイル記述子がpollerに登録されると、そのファイル記述子のマップにエントリが追加され、その値として登録スレッドが追加されます。pollerのイベントループは、イベントで起こされると、イベントのファイル記述子を使って対応する仮想スレッドを検索し、そのスレッドの保留を解除します。

Scaling

目を凝らすと、上記の動作は、NIOチャンネルやセレクタを利用した現在のスケーラブルなコードと、それほど大きな違いはなく、多くのサーバーサイドフレームワークやライブラリで見られるものです。仮想スレッドと異なるのは、開発者に公開されるプログラミングモデルです。前者は、ユーザーコードがイベントループを実装し、I/O境界を跨ぐアプリケーションロジックを維持しなければならないという、より複雑なモデルを公開しているのに対し、後者は、タスクのスケジューリングとI/O境界を跨ぐコンテキストのメンテナンスをJavaプラットフォームが処理するという、よりシンプルでわかりやすいプログラミングモデルを公開しています。

仮想スレッドのスケジューリングに使用されるデフォルトのスケジューラは、このジョブに適した fork-join work-stealing scheduler です。I/O操作の準備ができたかどうかを監視するために使用されるネイティブのイベント通知メカニズムは、OSが提供する最新かつ効率的なメカニズムです。仮想スレッドは、Java VMによるcontinuation(継続)のサポートの上で構築されています。そのため、同期型のJavaネットワークAPIは、より複雑な非同期型やノンブロッキング型のコード構成と同等の拡張性を持っています。

Conclusion

Javaの同期型ネットワークAPIは、Project Loomに向けて、JEP 353とJEP 373で再実装されました。

JEP 353: Reimplement the Legacy Socket API
https://openjdk.java.net/jeps/353
JEP 373: Reimplement the Legacy DatagramSocket API
https://openjdk.java.net/jeps/373

仮想スレッドで実行する場合、I/O操作がすぐに完了しないと、仮想スレッドは保留になります。I/Oが準備完了になると、仮想スレッドは保留解除されます。この実装では、Java VMとCoreライブラリのいくつかの機能を使用して、現在の非同期コードやノンブロッキングコードと比較しても遜色のない、スケーラブルで効率的な代替手段を提供しています。

ぜひLoomの早期アクセスビルドをお試しください。利用体験を是非お聞かせください。フィードバックはloom-devメーリングリストにお願いします。

Project Loom Early-Access Builds
https://jdk.java.net/loom/
loom-devメーリングリスト
https://mail.openjdk.java.net/mailman/listinfo/loom-dev

コメントを残す

以下に詳細を記入するか、アイコンをクリックしてログインしてください。

WordPress.com ロゴ

WordPress.com アカウントを使ってコメントしています。 ログアウト /  変更 )

Google フォト

Google アカウントを使ってコメントしています。 ログアウト /  変更 )

Twitter 画像

Twitter アカウントを使ってコメントしています。 ログアウト /  変更 )

Facebook の写真

Facebook アカウントを使ってコメントしています。 ログアウト /  変更 )

%s と連携中