CordaのFlowフレームワークとQuasar fiberの基本概念、及びCordaのFlowの実行メカニズムと状態遷移について学ぶことができます。
はじめに
CordaのFlowは、KotlinまたはJavaを用いて、分岐のない一連のないプログラムとして記述できます。しかし、実行時には多くの分岐や補完的な動作が発生します。例えば・・
- Checkpointでの状態保存(障害耐性確保の為)
- SubFlowの実行
- 他のノードへのメッセージ送信とその後の受信待機
- 非同期呼び出しと、その結果待ち
等です。
Cordaは、Quasar fiber をベースにしたState Machine上で動作することで、こうした分岐/補完動作を実現しています。
この記事は、こうした動作の概要を説明することが目的です。動作を理解することで、Cordaノードの運用を行うにあたっての基礎になると考えます。
注意事項
- 概念の説明を中心に行っていて、エラー処理を無視しています。Cordaのステートマシン/フローフレームワーク/ P2Pメッセージングに実際に変更を加える場合は、もう少し詳細を理解する必要があります。技術的詳細についてはCordaのドキュメントサイトをご覧ください。
- 現在の実装に基づいて説明していますが、将来変更される可能性があります。CorDappを作成する場合は、公式のドキュメントとAPIに基づいて設計してください。ドキュメントを理解する助けとなる事がこの記事の目的であって、技術的な代替手段を検討するためのものではありません。
Quasarとは?
Quasarは、Javaバイトコードの巧妙な書き換えと例外を利用して、継続(contiuation、情報処理用語でプログラムの実行に関して、ある時点で中断され、評価されていない残りのプログラムを再度実行する機能を指す)を提供するJavaライブラリです。
Quasarにおけるもっとも重要な概念はfiber
です。fiber
とは、プログラムコードにおけるスレッドのようなものですが、fiber
は自らを一時停止できます。その時点で、その計算状態を保存することができます。保存された状態を元に戻し、そこから再び計算を始めることができます。
さて、この機能を実現する仕組みをまずは学びましょう。
ユーザーは,中断可能であるすべての関数に対して@Suspendable
というアノテーションをつける必要があります。起動時とJITコンパイルの直前に、Quasarは,@Suspendable
メソッドのバイトコードを書き換えます。具体的には、throws SuspendExecution
を関数定義に追加し,全てのcatchブロックに、SuspendExecution
を再スローするコードを追加します。この例外をユーザーコードでキャッチさせないためです。
fiber
を中断するために、プログラムはQuasarの特定の関数(例えば parkAndSerialize
)を呼び出します。この関数はSuspendExecution
を投げ、投げられたこの例外はすべてのユーザーコードを通過し、Quasarによってキャッチされます。Quasarは、コールスタックを抽出し、シリアライズして保存します。バイトコードインストゥルメンテーション(byte code instrumentation)は、コールスタック上の関数が使用する変数、ローカルオブジェクト、スレッドが使用しているローカルストレージ等の値を全て確保でき、fiber
はこのひと固まりのスレッドを再構成することができます。また、これらすべてをメモリに保存した後、CPUは現在の実行スレッドを解放するので、CPUは他の何かを実行するためにこの実行スレッドを利用可能になります。
ファイバーを復元すると、コールスタックが新しいスレッドで再作成され、すべての値が記録されたとおりに関数に渡され、スレッドのローカルストレージが復元され、停止したところから実行が続行されます。
これを機能させるには、中断するスタック内のすべての関数に@Suspendable
アノテーション付けて、クエーサーがそれらを計測できるようにすることが重要です。そうしないと、例外がキャッチされるか、パラメーターが正しくキャプチャされず、fiber
が正しく中断されません。これは、CorDappsのエラーの一般的な原因(transaction context missing
エラーなど)であり、スタックのどの関数に注釈が欠けているかを把握するのは非常に難しい場合があります。
CordaのFlowはQuasarをラップし、それを使用して、CorDappのflow logic実装に含まれるKotlinまたはJavaのコードを、一時停止可能なコードに変換します。一時停止可能だと指定した場所をcheckpointと呼び、システムは任意のcheckpointからコードを再実行できる事が保証されます。
Flowの状態遷移
さて、FlowがCheckPointで中止&再開できる仕組みを見てきたところで、次にFlowの状態遷移についてみていきましょう。FlowはQuasarによる一時停止機能がある事を前提に、次のような状態遷移を想定しています。
まず、すべてのフローはPending
状態から始まります。次にrunning
に移行します。最も簡単なケースでは、最終的にコードの最後に到達してsuccess
に移行し、結果が返されます。コード途中で指定され、保存されていたCheckPointは全て消去されます。
https://www.corda.net/wp-content/uploads/2020/02/the-state-machine.png
エラーが発生した場合はErroed
に移行し、FlowHospitalが管理することになります。FlowHospitalでは3つの可能性があります。
①一時的なエラーの場合、Pending
に戻って再実行されます。この時、Flowは最初から実行されるのではなく、記録された最後のチェックポイントから再試行されます。
②エラーが致命的である場合、フローは失敗Failed
として終了します。エラーが呼び出し元に返され、すべてのCheckPointが削除されます。
③エラーを単純に再試行することはできないが、失敗させることができない場合(たとえば、元帳の整合性が失われる可能性がある場合など)、Flowはheld
状態に移行します。hold
状態の解消には、次のような人間の介入が必要です。
③-1エラーの原因となった状態を修正する。
③-2最後のCheckPointから手動でフローを再開する。(現状、これはNodeの再起動によって実現します。将来のリリースでRPCによる再開が実装される予定です)
Flowのライフサイクル
Flowのライフサイクルを考えるとき、Cordaノードを「ユーザースペース」と「カーネルスペース」に分けて考えると便利です。「ユーザースペース」は、ユーザーが提供するFlow logicを実行し、「カーネルスペース」は、ユーザーの提供するコードを実行するFlowフレームワークです。
典型的なフローはあるスレッドで実行が開始され、ネットワーク上の他のノードに送付され、帰ってきたレスポンスを受けて再開するという流れをたどります。この流れの詳細を確認していきましょう。
1. Flowの開始
RPCを介してFlowがキックされると、StartFlow
がカーネルスペースで呼び出されます。これにより、Flowを開始するイベントがスケジュールされます(RPC処理とメッセージのデシリアライズ処理は別のスレッドプールで行われることに注意してください)CPUの実行スレッドが使用可能になるとすぐに、メッセージの処理を開始し、FlowはPending
からRunning
に移行します。この作業には以下の作業も含まれます。
- スレッドを取得する。
- データベース接続を取得して(fiberを保存するための)トランザクションを開く。
- CorDappから要求されたFlowLogicをインスタンス化し、必要なパラメータを渡す。
- FlowLogicのrun関数を呼び出す。
2. 実行と一時停止
次に、ユーザーコードは、一時停止が必要なポイントに到達するまで実行されます。一時停止の最も一般的な条件は次のとおりです。
- メッセージを送信した場合
FlowLogic.sleep()
の呼び出し。つまり、明示的に一定時間Flowを一時停止する場合- Flow非同期API(
FlowLogic.await()
)の使用
ここでは、Flowが他の一つのノードとsendAndReceive()
を用いて通信する。この関数はカーネル空間を呼び出し、次のことを行います。
①ペイロードをシリアル化して、この関数に渡します。②メッセージを送信キューに入れると、P2Pメッセージングシステムによってピックアップされます。(P2PメッセージングシステムArtemisは、それだけで一つの記事が書けるほどのものです。ここでは、このシステムは頑健で、正確に1度だけメッセージを送信することができることだけを述べます。)このシステムによって、メッセージが正しく配信され(いずれ)返信を受け取ることができると想定できます。
③次に、Quasarの
parkAndSerialize
関数を呼び出して、前節で示した通りコールスタック他の情報を記録します。④ノードはこのFlowに割り当てられたリソースの中で、Quasarが無視してしまう部分の後処理をする必要もあります。典型的には、開いているデータベーストランザクションをコミットして、データベース接続を解放する必要があります.他のリソース(ロックなど)も同様に処理する必要があります。これはCordaコードで行われます。
⑤Cordaはまた、Checkpointでのコピーをデータベースに保存するため、中断されたフローも再起動することができます。
・Kryoシリアライザーを使用するので、スタック上で見つかったものはすべてシリアル化できます。これは、RPCおよびP2P通信で使用されるシリアライザやStatesをデータベースに保存する際に使われるシリアライザとは異なる仕組みです。通信用やState用はその内容に制約や制限が必要ですが、CheckPoint用シリアライザは、すべてを受け入れる必要があります。一方で、データ形式や構造の変化に対処する必要はありません。
・シリアルの際に無視する必要のあるもの(主にノードやState Machineそのものなどで、デシリアライズ処理時にも当然にメモリ上に存在するモノ)のリストがあります。
・特定のクラス(Cordaサービスなど)は
SerializeAsToken
インターフェースを実装していて、クラスインスタンスの代わりにトークン化したデータだけが記録されます。これらのクラスは、デシリアライズ処理の際にインスタンスを取得するメカニズムを別途指定する必要があります。特にシングルトンクラスをシリアライズする時に役立ちます。⑥その後、
fiber
はRunning
からSuspended
に移行し、CPUの実行スレッドは他のタスクを実行できるようになります。
3. メッセージ受信と再開
送信したメッセージに対する応答メッセージが到着すると、P2Pメッセージングシステムは、fiber
へ再起動を促すメッセージをイベントキューに置きます。CPUの実行スレッドが使用可能になると、中断されたfiber
の状態遷移が次ようにして再開します。
①コールスタックと変数が再作成されます
②リソースが再取得されます(例えば,DB接続およびトランザクションが開かれます)
③受信したメッセージはデシリアライズされます
④受信メッセージは、Flowが再びSuspended
するか、終了したときにのみP2Pメッセージングシステム上から削除されます。何らかのエラーに伴うフェイルオーバーが起きた場合、フローはメッセージ送信直後のチェックポイントから再実行され、P2Pメッセージングキューからメッセージを再受信します。
⑤次に、ユーザー空間のFlowLogicが、sendAndReceive
を叩いた時点の状態で呼び出され、メッセージの内容を関数呼び出しの戻り値として渡します。
4. 実行完了
FlowLogicが最後まで正常に完了すると、制御はカーネル空間に戻り、次の処理が行われます。
①データベーストランザクションがコミットされます
②このフローのすべてのCheckPointがデータベースから削除されます。
③P2Pメッセージングシステムに保存されていた未処理のメッセージはキューから削除されます。
④関数戻り値はRPCプールに渡されます。シリアル化されて呼び出し元のクライアントに送り返すことになります。
⑤データベース接続が解放され、fiber
が終了し、CPUの実行スレッドが解放されます。
含意
この上記の実装は、CorDappsの開発者にいくつかの影響を与えます
- (別途、生のJDBC接続を張りに行っても)通常のACIDデータベースロジックをCorDappsで単純に使用することはできません。
FlowLogin中で、DBを単純にコミットすることはできません(FlowLogic.sleep(1.millis)は妥当で現実的な回避策です)一時停止により、想定外のDBコミットが発生する可能性があります。
Flowが失敗した場合、DBトランザクションはロールバックされます。使用しているJDBC接続で別途何かを行った場合、それは失われます。再試行すると、フローは最後のメッセージから再開されるため、DBトランザクションが再発生します。
DBトランザクションをロールバックすることはできません。FlowLogic内はFlowベースのトランザクション内にいるためです。SQLコードを直接記述して何か問題が発生し、ロールバックした場合、元帳の整合性が脅かされる可能性があります
- Flowのすべての部分が少なくとも1回実行されます。ただし、問題が発生した場合、最後のCheckpointからエラーが発生したポイントの間の部分が再実行されます。Flowは、ロールバックを介して発生するすべてのDBトランザクションをクリアしてクリーンに再実行できるようにしますが、サードパーティコンポーネントとの他のやり取りは、べき等であるか、複数回の呼び出しを処理できる必要があります。
@Suspendable
アノテーションは、非常に注意深く使用なければなりません。@Suspendable
ではない関数から、@Suspendable
な関数を呼ぶことはできません。(逆は問題はありません。@Suspendable
な関数からデータベース操作などの@Suspendable
ではない関数を呼び出すことは可能です。)- バイトコードのスキャン/再読み込みは、かなり時間がかかります。サスペンド操作は安価ではありません。
- バイトコードインストゥルメンテーション(byte code instrumentation)がうまくできないコードを書いてしまう事があります。(例えば、Kotlinの
.map{}
操作内で@Suspendable
関数を使用する等)この場合、再生成されるバイトコードが無効化され、JVMが実行時にあきらめる可能性があります。これはKotlin(https://youtrack.jetbrains.com/issue/KT-19251)の既知の問題であり、この問題が発生した場合、以下のようなコンパイルオプションを指定することで改善できます。
compileKotlin {
kotlinOptions {
freeCompilerArgs = ["-Xnormalize-constructor-calls=enable"]
}
}
<ご質問・ご要望の例>
- Corda Portalの記事について質問したい
- ブロックチェーンを活用した新規事業を相談したい
- 企業でのブロックチェーン活用方法を教えて欲しい 等々
SBI R3 Japan エンジニアリング部長
書籍出してます:https://amzn.asia/d/c0V31Vd
趣味:サッカー、ガンプラ、ドライブ、キャンプ