Corda 5のP2P通信レイヤーアーキテクチャの進化、その実装の詳細、及び新しいP2Pレイヤーの試用方法
はじめに
本記事では、Corda 5がどのように、1.マルチテナント化、2,水平スケーリング(horizontal scalability)、3.高可用性(high availability)を実現しているか知ることができます。
本記事は、主に3つの構成に分かれています。
- Corda 5のP2P通信レイヤーアーキテクチャの概要
- R3社の初期評価で得られたP2Pレイヤーに関する各種測定結果
- 新しいP2Pレイヤーを試す方法についての説明
アーキテクチャ概要
Corda 5では、ノードは一つのプロセスで稼働するのではなく、複数のワーカープロセスで構成されるワーカークラスターによって動作します。ワーカークラスタ上には、複数のアプリケーションネットワークに属する複数のIDを配置することが可能となります。
そして、以下の図に示す通り、P2Pレイヤーは複数のIDがどのワーカークラスタに存在するかに関わらずメッセージ交換可能にしてくれます。
P2Pレイヤーは、”リンクマネージャ(link manager)”と”P2P ゲートウェイ(P2P gateway)”からなる2つの主要なワーカーで構成されています。
”リンクマネージャ”は通常、内部ネットワークに存在し、他のクラスタの”リンクマネージャ”と相互に認証されたend-to-endの通信セッションを確立し、安全で信頼性の高い方法でメッセージを転送する役割を担います。宛先のIDが同じクラスタ内に配置されている場合、”リンクマネージャ”はメッセージを内部転送します。”P2P ゲートウェイ”はインターネットとの境界に配置され、他のクラスタの”P2P ゲートウェイ”とTLS接続を確立し、HTTPSでメッセージを送受信する役割を担います。
”リンクマネージャ”と”P2P ゲートウェイ”はメッセージ バスを介して通信します。メッセージ バスは、デプロイのモードに応じてさまざまな形式を取ることができます。P2Pプレビュー版では、”リンクマネージャ”と”P2P ゲートウェイ”が別々のワーカープロセスとして実行され、メッセージバスがApache Kafkaである完全分散配置モードで提供されます。
図に示す通り、メッセージ転送に使用されるKafka トピックに、P2Pコンポーネントからの「extra housekeeping topics」という追加のトピックセットが存在することが分かります。例えば、未配信のメッセージを追跡したり、end-to-endのセッションとそれを配置する”リンクマネージャ”のインスタンスとの間のマッピングを維持するために、「extra housekeeping topics」が使用されます。
P2Pレイヤーを利用したい上位のアプリケーションコンポーネントは、トピックセットを介して前述の処理を依頼することができます。他のIDからメッセージを受信したり (p2p.in) 逆に送信したり(p2p.out)することが可能です。こうした上位コンポーネントの一つとして、Flowワーカーがあります。またmembershipワーカーという例も存在します。またmembershipワーカーは、P2Pレイヤーを使ってアプリケーションネットワークにIDを登録し、さまざまなネットワーク参加者に自身のネットワーク情報を共有します。
Corda 5のクラスタは複数のIDを配置することができ、同じまたは異なるアプリケーションネットワークに属することができます。IDは、ネットワークの他のメンバーと通信する前に、対応するアプリケーションネットワークのメンバーシップグループマネージャー(MGM, membership group manager)に登録します。この時、membershipワーカーはIDの登録やクラスタ内に最新のメンバーシップ情報を共有する役割を担います。
”リンクマネージャ”と”P2Pゲートウェイ”はTLSハンドシェイクやend-to-endのセッション確立の間、署名に関する処理を実行する必要があります。この署名は、”クリプトワーカー(crypto worker)”によって実行されます。”クリプトワーカー”は、暗号鍵を改ざんできない方法で保存するhardware security module devices (HSMs)と通信するなどして、安全な方法で署名を行うことが可能です。”リンクマネージャ”と”P2Pゲートウェイ”は同じメッセージバスを経由して、”クリプトワーカー”に署名を要求します。
上記で説明したコンポーネントの中には、まだ完全に開発・統合されていないものも存在します。したがって、P2Pプレビュー版ではP2Pレイヤー単体で使用できるようにするための以下のような代替機能が存在します。
- ”Flow ワーカー”の代わりに、”app-simulator”というシンプルなアプリケーションによってP2Pレイヤーを用いたメッセージの送受信を可能に
- ”membershipワーカー”の代わりに、MGMを用いることなくアプリケーションネットワークを構築し、クラスタ内にローカルにメンバーを追加できる機能を提供
- ”crypto ワーカー”の代わりに、”リンクマネージャ”と”P2Pゲートウェイ”が内部で署名を行います。そのために、Kafkaクラスタ内の暗号鍵をアップロードするためのツールを提供します。
現在、上記の代替機能でおぎなっている各種機能は、現在開発段階にあります。次のマイルストーンに向けて利用できるよう準備中です。
初期評価
P2Pレイヤーの初期バージョンの開発終了後、私たちは開発したP2Pレイヤーの、水平スケーリング(horizontal scalability)と高可用性(high availability)に対する評価を行いました。このセクションでは、我々のアセスメント評価の結果を皆様と共有します。又、ご自身でも評価を行い、是非その結果を我々R3と共有していただきたいと考えております。
(訳注:以下の1ブロックは、Kubernetesのサービス利用法について書かれており、文意の理解には特段必要が無いので引用形式に改変しています。)
まず最初に、Kubernetesを使ってクラウド上に2つのCordaクラスタのデプロイメントを作成し、それぞれのクラスタに1つのIDが配置されている状態にしました。次に、各ワーカータイプのインスタンスを1種類のステートフルセットにグループ化しました。(つまり、全ての”リンクマネージャ”には1つのステートフルセット、”P2Pゲートウェイ”には別のステートフルセットが存在しています。)また、ゲートウェイ用にヘッドレスサービスを作成し、その DNS 名に対して TLS 証明書を紐づけました。
分析を簡単にするために、最初のクラスタ(送信側クラスタとします)から2番目のクラスタ(受信側クラスタとします)に向かってトラフィックを発生させます。各クラスタ上で、ワーカー数を増やし、システムで観測されるスループットの数を記録することで、水平スケーリングを評価しました。また、システムの様々なP2Pコンポーネントに障害を発生させ、メッセージ配信の観点からその障害による影響を観測することで、高可用性を評価しました。
構成したトポロジーを以下の図にまとめます。Kubernetesのリソース要求を使って、以下を割り当てました。
・各P2P podに最低でもCPU1つと2GBを割り当てました。
・Kafka broker pod 1つにつき、最低でもCPU1つと4GBを割り当てました。
実験終了後、記録されたPostgreSQLデータベースのデータを使用して、以下の検証を実施しました。
- 送信メッセージと受信メッセージを照合を行い、メッセージロスがないことを確認
- システムのスループット(1秒あたりの配信メッセージ数)とメッセージの配信遅延(メッセージの送信時点と受信時点の間の経過時間)を分析しました。(この分析のためににKafkaにてtime window aggregationを実施しました。)
可用性の評価結果
各クラスタ毎に以下の構成を取りました。
- 3台のKafka broker、1台のZookeeper node
- 3つのリンクマネージャと3つのP2Pゲートウェイ
- 送信側クラスタにSENDER モードのシミュレータ
- 受信側クラスタにRECEIVER モードのシミュレータ
- 受信側クラスタにDB_SINK モードのシミュレータ
測定手順は以下に示す通りです。
1. 送信側クラスタから受信側クラスタに向けたトラフィックを app-simulator で生成(トラフィックのメッセージサイズは1KB。このメッセージを10メッセージずつバッチ送信し、バッチ間の遅延を100msにし、トータルで10,000メッセージを送信。)
2. 30秒間待機
3. P2Pコンポーネントの障害をエミュレートするため、P2Pコンポーネントに関連するステートフルセットを1つずつ縮小。
4. app-simulatorから送信された10,000メッセージが受信側クラスタに全て揃うまで待機。
以下の図はいくつかの想定される障害ごとに実験した結果です。
縦軸は、メッセージのend-to-endの遅延(単位:秒)です。つまり、メッセージを送信してから相手側で正常に受信されるまでの時間に相当します。横軸は、実験での経過時間であり、障害が発生したときを0としています。
図の青い点は個々のメッセージの待ち時間(遅延)を表し、オレンジの線は、待ち時間の99パーセンタイルを指し、黄色の線は、待ち時間の中央値を指します。統計値のタイムウィンドウは10秒に設定しています。このように、個々のデータポイントと集計統計を組み合わせることで、平均、最良、最悪のケースにおけるシステムの挙動をよく理解することができます。
このシナリオでは、設定した一定時間が経過後、故障したリンクマネージャはKafkaからのデータ処理を停止します。すると、Brokerは故障したリンクマネージャを識別し、そのパーティションを正常動作している他のリンクマネージャに送信します。
このプロセスをKafkaではリバランスと呼ばれます。結果として、残ったリンクマネージャは故障したリンクマネージャから処理の終わっていないメッセージをピックアップし、システムを復旧させます。この動作を行っている間は、送信メッセージの一部が一時的に遅延することが予想されます。この際、(訳注:障害発生と判定するための)タイムアウトを設定することができます。この設定値は(一時的なネットワーク問題の発生時に)、Kafkaの迅速な回復と不要なリバランスの発生との間のトレードオフをコントロールします。
上の図は、1つ前のシナリオと同様に、P2Pゲートウェイに障害が発生した場合、リバランスが発生し、残った正常なP2Pゲートウェイが故障したゲートウェイが担っていたパーティションをピックアップします。この場合、リンクマネージャは利用可能なP2Pゲートウェイ間でメッセージのロードバランスをとるので、障害が発生したP2Pゲートウェイに送信されたメッセージはしばらくしてから再送信され、正常なP2Pゲートウェイのいずれかに到達することになるので、最初のパターンより早い回復が期待できます。
このシナリオでは、障害が発生した受信側のP2P ゲートウェイとの接続を確立していた送信側P2P ゲートウェイは、他の正常な受信側P2Pゲートウェイのいずれかに切り替わります。
前述した通り、リンクマネージャは互いにend-to-endのセッションを保ちます。リンクマネージャが予期せずクラッシュすると、セッションのためにメモリに保存されていた(一時使用)暗号鍵が紛失します。
リンクマネージャは障害検出メカニズムを備えているため、最終的には失われたセッションを特定し、新しいセッションの確立を開始します。また、リンクマネージャは通信先のID一つ一つに対して、冗長なセッションを一つ維持しており、失われたセッションを回復し、フェイルオーバーできるようになっています。これが、このシナリオにおいて、再送されるメッセージの量が急速に減少している理由です。(訳注:一定数のメッセージは冗長で用意されたセッションを使ってすぐに再送が完了することになります)
利用可能な複数のセッションが存在する中で、どのセッションを選択するかは、ハートビート(セッション相手が動作していることの確認)が直近でとれているものがより健全であるという前提で、健全なものをより多く選択する仕組みが用意されています。全てのシナリオに共通する傾向として、障害はシステムの一部にしか影響しない、かつ、その影響は障害サイズに比例するということです。これは高可用性アーキテクチャに必要とされる特性です。逆にいうと、個々の障害の影響範囲を最小に抑えるため、実行するインスタンスの数を増やすことが必要になります。
水平スケーリング性能の評価
この実験では、以下のセットアップをし、基本的にクラスタの容量をセットアップ毎に倍々に増大させています。
- Setup A: Kafka broker3台、リンクマネージャ1つ、P2Pゲートウェイ1つ、RECEIVER /DB_SINKモードのSimulator1つ
- Setup B: Kafka broker3台、リンクマネージャ2つ、P2Pゲートウェイ2つ、RECEIVER /DB_SINKモードのSimulator2つ
- Setup C: Kafka broker3台、リンクマネージャ4つ、P2Pゲートウェイ4つ、RECEIVER /DB_SINKモードのSimulator4つ
- Setup D: Kafka broker7台、リンクマネージャ8つ、P2Pゲートウェイ8つ、RECEIVER /DB_SINKモードのSimulator8つ
- Setup E: Kafka broker13台、リンクマネージャ16つ、P2Pゲートウェイ16つ、RECEIVER /DB_SINKモードのSimulator16つ
この実験は、(timewindowとして設定した10秒間の)平均待ち時間が1秒を超え、システムが過負荷になるまで、負荷を増加させるという形を取りました。過負荷になるまでに観測された最大スループットを、このセットアップで達成された最大スループットとして選択しています。
負荷発生のパラメータに関しては、10KBのメッセージを一定数まとめて生成し、一括生成完了後、次の生成までの間隔を50ミリ秒に設定しました。また、負荷増大のためにメッセージの個数と送信者インスタンスの数を増やしました。
下図は、上記の各設定における最大スループットを示しています。このように、システムは水平方向に拡張でき、クラスタサイズを大きくすることで、より高いスループット(1秒あたりのメッセージ転送数で測定)を達成することができます。
P2Pレイヤーの導入と使用方法
ここでは、P2P クラスタをセットアップしてクラスタ間でメッセージを交換する方法について説明します。
必要なアーティファクトのビルド
最初の手順は必要なアーティファクトを全て構築することです。まず、必要なすべてのリポジトリをローカルにクローンし、ここで説明するようにコンポジットビルド(CompositeBuild)を有効にしていることを確認します。全てのリポジトリのp2p-previewタグをチェックアウトし、安定版を入手していることを確認してください。その後、以下に示すリンク先にある README ファイルに記載されている説明に従って、必要なアーティファクトをビルドします。
- リンクマネージャーのワーカー.
- P2Pゲートウェイのワーカー
- app-simulator.
- kafka-setup tool. (これを利用して、Kafkaに想定される構成で必要なtopicを生成することができます。
- The P2P-setup tool. (アプリケーションネットワークの作成、メンバー追加、各種設定やクラスタに対応する鍵の公開使用されます。
- The fake-ca tool. (単純にテスト目的で自動生成された認証局からTLS証明書を生成するために使用されます。このツールの代わりに実用に供されている認証局から認証を取得することもできます。)
P2Pクラスタのセットアップ
全てのアーティファクトのビルド後、以下の手順でP2Pクラスタをセットアップします。
1. Kafkaのクラスタをデプロイします。これはどのような方法でもかまいません。ここでは、「kafka-cluster:9092」というアドレスで接続待機中であるKafkaのクラスターがあると仮定します。
2. クラスタに必要なトピックを作成します。Kafka セットアップツールおよび必要なすべての構成を含むサンプルファイルを使用して作成することができます。例えば、
java -jar -Dbootstrap.servers=kafka-cluster:9092 corda-kafka-setup.jar — topic p2p-kafka-setup-example.conf
を実行すれば作成が完了します。
パーティションの数、レプリケーションファクターは必要に応じて設定ファイルで調整できますが、その他の設定はそのままにしてください。それぞれのトピックに対するパーティションの数は、デプロイ予定のワーカーインスタンスの数以上である必要があります。
3. リンクマネージャー ワーカーをデプロイします。例えば、
java -jar corda-p2p-gateway.jar -k=kafka-cluster:9092 -i=1
を実行します。-iを変更すればインスタンスのデプロイ数を調節できます。
4. P2Pゲートウェイワーカーをデプロイします。例えば、
java -jar corda-p2p-gateway.jar -k=kafka-cluster:9092 -i=1
を実行します。同様に、複数インスタンスをデプロイできます。
5. P2P-setup toolを用いて、KafkaのリンクマネージャワーカーとP2Pゲートウェイワーカーの設定をアップロードします。リンクマネージャには必須のパラメータは存在しません(オプションで存在しますが全て適切なデフォルト値が設定されています) P2Pゲートウェイには必須パラメータが存在します。 — helpオプションを使うと利用可能な全ての設定オプションを見ることができます。両方のワーカーで利用可能なすべてのデフォルトを使用して設定を公開するコマンドの例は次のとおりです。
java -jar corda-p2p-setup.jar -k kafka-cluster:9092 config-link-manager
java -jar corda-p2p-setup.jar -k kafka-cluster:9092 config-gateway --port=8085
この時点で、クラスタのワーカーは正常に稼働を開始しているはずです。
次のステップは、クラスターに ID を作成し、アプリケーションネットワークを作成し、そのネットワークに ID を追加するために必要な作業になります。
1.このクラスターでホストされている各IDについて、PEM形式のセッションキーペアを作成します。これを行う1つの方法として、以下に示すようにopensslツールを使用します。
openssl ecparam -out alice_session_key_private.pem -name prime256v1 -genkey
openssl ec -in alice_session_key_private.pem -pubout -out alice_session_key_public.pem
2. TLS キーを作成し、認証局(CA)から PEM 形式の TLS 証明書を取得します。証明書のドメインは、ゲートウェイのIPアドレスと紐付けされるように設定されていることを確認する必要があります。よく知られた信頼できるCAを使用して証明書を取得するか、次の方法でfake-caツールを使用することができます。
java -jar corda-fake-ca.jar -m tls_folder/ create-ca
java -jar corda-fake-ca.jar -m tls_folder/ create-certificate www.alice.net
このコマンドは、認証局を作成し、そのルート証明書をローカルファイルシステムに保存し、さらにその同じ認証局からTLSキーペアとwww.alice.net のTLS証明書を作成します。
3. p2p-setup ツールを使用して、セッション鍵と TLS 鍵をクラスタにアップロードします。実際にセッションキーをアップロードするには、次のような内容のファイル(alice_session_key.json)を作成します。
{
"keysFile": "alice_session_key_private.pem",
"tenantId": "alice",
"publishAlias": "alice-session-key"
}
次のコマンドを使用して、クラスターへこのファイルをアップロードします。
java -jar corda-p2p-setup.jar -k kafka-cluster:9092 add-key-pair alice_session_key.json
tenantIdは鍵を分離して保存するために使用され、各鍵に指定する値は、後で新しいID(tlsTenantId、sessionKeyTenantId)を作成する際に指定するテナントIDと一致させる必要があります。
TLS鍵のアップロードも上記と同じ手順で行う必要があります。
4. p2p-setup ツールを使用して、アプリケーションネットワークを作成します。以下のような内容のファイル(group.json)を作成する必要があります。
{
"groupId": "group-1",
"data": {
"networkType": "CORDA_5",
"protocolModes": ["AUTHENTICATED_ENCRYPTION"],
"trustRootCertificatesFiles":["tls_folder/ca/root-certificate.pem"]
}
}
よく知られた信頼できるCAを使用する場合、代わりにそのルート証明書を含めることができます。その後、以下のコマンドによりネットワークを作成することができます。
java -jar corda-p2p-setup.jar -k kafka-cluster:9092 add-group group.json
5. p2p-setup ツールを使用して、クラスタ上でローカルにホストされた ID を作成します。そのためには、以下のような内容のファイル(alice_identity.json)を作成する必要があります。
{
"x500name": "O=Alice, L=London, C=GB",
"groupId": "group-1",
"data": {
"tlsTenantId": "<tenant-id-used-for-tls>",
"sessionKeyTenantId": "alice",
"tlsCertificatesFiles":["tls_folder/www.alice.net/certificate.pem"],
"publicSessionKeyFile": "alice_session_key_public.pem"
}
}
次のコマンドによりネットワークにこのファイルを公開します。
java -jar corda-p2p-setup.jar -k kafka-cluster:9092 add-identity alice_identity.json
6. IDをネットワークのメンバーとして追加します。以下のような内容のファイル(alice_member.json)を作成する必要があります。
{
"groupId": "group-1"
"x500name": "O=Alice, L=London, C=GB",
"data": {
"publicSessionKeyFile": "alice_session_key_public.pem"。
"address": "http://www.alice.net:8085"
}
}
次のコマンドによりネットワークにこのファイルを公開します。
java -jar corda-p2p-setup.jar -k kafka-cluster:9092 add-member alice_member.json
addressフィールドの DNS 名はゲートウェイの IP アドレスに紐づく必要があり、ポート はゲートウェイのポートに一致させる必要があります。
注:このネットワークのメンバーであるすべてのIDについてステップ6を繰り返す必要があります。(つまり、他のクラスタでホストされているメンバーについてもステップ6を行う必要があります。)
ようやくこの時点で、完全に動作する p2p クラスタと、その中で単一の ID がホストされている状態が実現できています。
必要であれば、同じプロセスを繰り返すことで、さらに多くの p2p クラスタをセットアップすることができます。
他のクラスタへのメッセージ送信
クラスタ間でメッセージを送信するには、app-simulator アプリケーションを使用します。このアプリは、送受信されたメッセージに関するメタデータを PostgreSQL データベースに送信し、さらに分析するように設定することができます。このデータベースは、ここで指定したように、このデータを追跡するためのいくつかのテーブルを持つ必要があります。ここでは、app-simulator-db:5432 で接続を待ち受けるデータベースをデプロイしていると仮定しています。
クラスタで受信したメッセージを処理するには、app-simulatorをRECEIVERモードで実行する必要があります。そのためには、以下のような内容の設定ファイル(simulator_receiver.conf)を作成します。
{
parallelClients: 1
simulatorMode: "RECEIVER"
}
そして、この設定ファイルを使って、次のコマンドによりアプリシミュレータを起動することができます。
java -jar corda-app-simulator.jar -k <receiver-kafka-cluster-address> --simulator-config simulator_receiver.conf
受信したメッセージのメタデータをPostgreSQLのデータベースに転送するには、app-simulatorをDB_SINKモードで動作させる必要があります。そのためには、以下のような構成の設定ファイル(simulator_sink.conf)を作成します。
{
dbParams: {
username: "<db-username>",
password: "<db-password>",
host: "app-simulator-db:5432",
db: "<db>"
},
parallelClients: 1
simulatorMode: "DB_SINK"
}
そして、この設定ファイルを使って、次のコマンドによりアプリシミュレータを起動することができます。
java -jar corda-app-simulator.jar -k <receiver-kafka-cluster-address> --simulator-config simulator_sink.conf
あるクラスタからメッセージを送信するには、アプリシミュレータをSENDERモードで実行する必要があります。そのためには、以下のような構造の設定ファイル(simulator_sender.conf)を作成します。
{
dbParams: {
username: "<db-username>",
password: "<db-password>",
host: "app-simulator-db:5432",
db: "<db>"
},
parallelClients: 1,
simulatorMode: "SENDER",
loadGenerationParams: {
peerX500Name: "<destination-identity-x500-name>",
peerGroupId: "group-1",
ourX500Name: "O=Alice, L=London, C=GB",
ourGroupId: "group-1",
loadGenerationType: "ONE_OFF",
batchSize: 10,
totalNumberOfMessages: 10
interBatchDelay: 0ms,
messageSizeBytes: 10000
}
}
そして、この設定ファイルを使って、次のコマンドによりアプリシミュレータを起動することができます。
java -jar corda-app-simulator.jar -k <sender-kafka-cluster-address> --simulator-config simulator_sender.conf
これらのステップを踏んだ後、データベース上でタイムスタンプと共に送受信された全てのメッセージを検査することができます。
Corda 5のP2Pレイヤーのこの最初のプレビューが、私たちと同じようにあなたを興奮させることを願っています。もちろん、これはほんの始まりに過ぎません。私たちは、足りない部分の開発を完了させ、すでに確認されているいくつかの部分をさらに改善するために努力を続けていきます。ご期待ください。
Milan Khan、Richard Brown、Yiftach Kaplan、William Vigor、そしてこの記事へのフィードバックを下さったNadine Quin、Emily Bowe、Kat Bakerに感謝します。
<ご質問・ご要望の例>
- Corda Portalの記事について質問したい
- ブロックチェーンを活用した新規事業を相談したい
- 企業でのブロックチェーン活用方法を教えて欲しい 等々
SBI R3 Japan エンジニアリング部長
書籍出してます:https://amzn.asia/d/c0V31Vd
趣味:サッカー、ガンプラ、ドライブ、キャンプ