原文はこちら。
The original article was written by Andreas Müller (creator of SwiftMQ, CEO of swiftmq.com and CTO of flowdirector.io).
https://medium.com/@am_73687/porting-from-nashorn-how-to-handle-multi-threading-in-graal-js-957e359b7df5
既存のJavaScriptのコードをNashornからGraalVMに移行するのは困難な場合があります。この記事ではFlow Directorが生成するSwiftMQ Streamスクリプトを移行する際に発生したマルチスレッドの問題を取り上げます。
Nashorn
https://docs.oracle.com/javase/10/nashorn/introduction.htm#JSNUG136
GraalVM
https://www.graalvm.org/
SwiftMQ Stream
https://www.swiftmq.com/docs/docs/ce/swiftlets/streams/
Flow Director
https://www.flowdirector.io/

JavaScript is single-threaded
非同期コールバックをJavaクラスに登録し、同じJavaScriptの実行中にこのコールバックを呼び出すと、イベントキュー内でコールバックの処理をすぐにスケジュールしたとしても、マルチスレッドアクセスになります。Nashornはこれをチェックしませんが、GraalVMでは例外をスローするため、このようなコールバックを持つスクリプトはGraalVM上では動作しません。
The asynchronous Callback Problem
以下はRabbitMQチャネルに対して非同期メッセージリスナーを登録する例です。
var MYCONSUMER = Java.extend(Java.type("com.rabbitmq.client.DefaultConsumer"), { handleDelivery: function (consumerTag, envelope, properties, body) { // Add it to an event queue (skipped) } }); this.channel.basicConsume(this.declareOk.getQueue(), false, this.consumerTag, new MYCONSUMER(this.channel));
このコードでは、以下の処理を実施しています。
- メッセージを受け取る
- (このスクリプト内の他のコンポーネントとの同期アクセスを確実にするために使っている)イベントキューに、その受け取ったメッセージを入れる
そのため、RabbitMQスレッドから呼び出された非同期関数呼び出しhandleDeliveryを除き、全てはこのイベントキューの外で動作しています。
これはNashornでは動作しますが、GraalVMでは動作しません。その理由は、GraalVMがPolyglot Contextを持つ任意のJavaScriptコードをラップし、コード実行前に Context.enter() を、実行後に Context.leave() を呼び出すことで、アクセスをチェックするからです。Context.enter() 以後でマルチスレッドアクセスを検知すると、例外をスローします。
GraalVMの開発者はこの問題をよく認識しており、この問題の解決方法に関する推奨事項まで公表しているのですが、今回はそれは役に立ちませんでした。なぜなら、非同期コールバックの登録を含めて、今回の場合はJavaScriptのスクリプトから全てを開始するためです。そのため、イベントキューに非同期呼び出しを入れるために、我々のクラスとともに使うJavaライブラリをラップすることは選択できませんでした。
Multi-threaded Java ←→JavaScript language interoperability in GraalVM
https://medium.com/graalvm/multi-threaded-java-javascript-language-interoperability-in-graalvm-2f19c1f9c37b
How to solve it
Javaスレッドからの呼び出しとJavaScriptコールバックの呼び出しの間に入ってイベントキューから呼び出すにはどうすればいいのでしょうか?
そこでJavaのProxyクラスを使います。
Class Proxy
https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/lang/reflect/Proxy.html
https://docs.oracle.com/javase/jp/11/docs/api/java.base/java/lang/reflect/Proxy.html
これを使えば、動的にソースオブジェクトの全インターフェースを実装し、そのインスタンスとして振る舞い、メソッド呼び出しの傍受をするオブジェクトを動的に作成できます。JavaScriptのコールバックではなく、プロキシがコールバックとして登録し、JavaScriptへの呼び出しが起こる前にその呼び出しを傍受できるため、このContext.enter()およびContext.leave()の呼び出しを回避し、同期イベントキューから呼び出しを実行できます。
The AsyncProxy
以下は今回使ったコールバックを傍受するproxyクラスです。
public class AsyncProxy implements java.lang.reflect.InvocationHandler { private Object obj; private Stream stream; private int hashcode; private String toString; private AsyncProxy(Stream stream, Object obj) { this.stream = stream; this.obj = obj; this.hashcode = obj.hashCode(); this.toString = obj.toString(); } public static Object newInstance(Stream stream, String interfaceClassName, Object obj) throws Exception { return java.lang.reflect.Proxy.newProxyInstance( stream.getStreamCtx().classLoader, new Class[]{ stream .getStreamCtx() .classLoader .loadClass(interfaceClassName) }, new AsyncProxy(stream, obj)); } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { // Exec it on our event queue stream.getStreamCtx().streamProcessor .dispatch(new POExecute(null, () -> { try { method.invoke(obj, args); } catch (Exception e) { stream.getStreamCtx().logStackTrace(e); } })); return null; } @Override public int hashCode() { return hashcode; } @Override public boolean equals(Object obj) { return obj.equals(this.obj); } @Override public String toString() { return toString; } }
重要な箇所は以下です。
new Class[]{ stream .getStreamCtx() .classLoader .loadClass(interfaceClassName) }
このProxyクラスを使う典型的な例は、オブジェクトからインターフェースの名前を取得する場合ですが、その場合、JavaScriptアダプタクラスを使うためうまくいきません。直接元のJavaインターフェースクラスをロードしてプロキシを作成する場合のみうまくいきます。今回は、プロキシは直接登録クラスに意図的に配置しています。
Connect it to JS
SwiftMQ Streamsでは、各JavaScriptには事前定義済みの環境があります。その中にStream Javaクラスがあります。これはスクリプティング・コンテキストに登録され、stream変数を使ってJavaScriptのスクリプトからアクセス可能なクラスです。
SwiftMQ Streams
https://www.swiftmq.com/docs/docs/ce/swiftlets/streams
Class Stream
https://www.swiftmq.com/docs/javadoc/com/swiftmq/impl/streams/Stream.html
Variable ‘stream’
https://www.swiftmq.com/docs/docs/ce/swiftlets/streams/#stream
そのため、任意のオブジェクトを非同期としてマークし、プロキシでラップするメソッドを追加しました。
public Object async(String interfaceClassName, Object callback) throws Exception { return AsyncProxy.newInstance(this, interfaceClassName, callback); }
JavaScript側では、これを使ってコールバックをラップします。
this.channel.basicConsume(this.declareOk.getQueue(), false, this.consumerTag, stream.async( "com.rabbitmq.client.Consumer", new MYCONSUMER(this.channel) ));
これでおしまいです。
このサンプルはGitHubのリポジトリにあります。マルチスレッド化が成功しますように。
graaljs-concurrency-problem
https://github.com/iitsoftware/graaljs-concurrency-problem