イベントドリブンアーキテクチャとkafkaの利用、メッセージバスとのやり取りやメッセージパターンの重要性
はじめに
近年、耐障害性があり、高可用なメッセージバスに支えられたイベントドリブンアーキテクチャが人気を集めています。これにより、複数のワーカーで負荷を共有したり、非常に大きな負荷に対応できるように拡張したり、障害が発生したときに、障害のあるプロセッサーで完了しなかったメッセージを再生したりするようなシステムを設計することができます。
我々R3も、まさにこのような理由から”KAFKA”を利用したいと考えました。ビジネスプロセスの核となるこのようなやり取りを処理するためのライブラリは、どのように書けばいいのでしょうか?ライブラリの一部としてどのようなパターンが必要なのでしょうか?ライブラリはどのように構成されるべきでしょうか?なぜこのような概念をライブラリに抽象化するのでしょうか?
もし、N人の開発者がメッセージバスとやりとりする必要があり、彼らが使える共有ライブラリがなければ、そのプロジェクトはN個のソリューションを作る羽目に陥り、必然的にカオスな状況に陥るでしょう。開発者がメッセージバスの低レベルのやりとりについて考えなくても済むように、課題を一箇所で解決することは理にかなっています。
ポーリング、オフセットのコミット、バスへの出力、失敗したメッセージの再生などの操作について毎回心配することは、新しいコンポーネントを追加する度に多くの摩擦を引き起こしてしまうでしょう。これらの概念が別のライブラリに抽象化されていれば、開発者はバスからの入力を処理し、ある種の出力を生成するコードを書くことに集中できます。開発者の時間は最大限効率化され、バスのインタラクションを誰が実装するのがベストかといった論争が減ることを期待できます。
メッセージパターンとは何か?
さて、メッセージパターンとメッセージバスインタラクションを区別することは重要です。バスインタラクションは低レベル関数の呼び出しを意味し、ポーリングなどの何らかの操作をバスに実行させるものです。メッセージパターンはより高いレベルの概念で、システムがどのようにバスと相互作用させたいかという”セマンティクス”をカプセル化したものです。
以下は、メッセージパターンという言葉で処理されるべき事柄のリストです。
- 失敗した入力は何回リトライすべきか?
- データ処理装置はバスの先頭から読み込むべきか、それとも最新のオフセットから読み込むべきか?
- プロセッサは複数の入力から読み出す必要があるのか、それとも1つの入力から読み出せばいいのか?
- プロセッサは出力を許可しているか?
- 出力に対する応答は期待できるか?
これらの質問に対する答えは、ユースケースによって異なるため、それぞれの答えが異なるメッセージパターンになっていきます。
そして、これらの質問は、バスに依存しないということにお気づきでしょう。これらは、あなたのコンポーネントがどのように動作する必要があるかというストーリーを教えてくれるのです。この記事の後半で、私たちがR3で考えた具体的なメッセージパターンを説明します。
構造はどうあるべきか?
メッセージパターンライブラリの構造を正しく理解することは、あなたが考えているよりも少し複雑です。利用可能にしたいパターンを一覧化したAPIと、そのAPIを実現する実装モジュールのようなものでしょうか?そうではないのです。
もしあなたが、次のようなことをしたいのならどうですか?
- Kafkaのようなリソース集約的なプロセスを実行するほどリソースがないので、ローカルでシンプルに実行したい。
- お気に入りのバスは非決定論的動作をするにもかかわらず、テストは決定論的でシンプルなコンポーネントしか存在しない。
- AWSコストを指数関数的に増加させたくはないのだが、CI環境上でビルドを実行したい!
上記を実現したい場合、Kafkaを通常のデータベースやインメモリ(In-Memory)ソリューションなどの別のメッセージバスに交換できるようにする必要があります。その際、定義された各パターンに対して、バスが同じように動作することが望まれます。
これを行う最良の方法は、コンポーネント側から見た動作をカプセル化したメッセージパターンをAPIとして用意するのとは別に、バスとのやり取りを処理するメッセージバスAPIを別に用意する方法です。
図1に示した通り、これは、自分の環境で使われているメッセージバスを変更したい場合、システムの動作を変えずに変更できることを意味します。このような構造であれば、メッセージバスAPIの実行時依存性を変更することで、Kafkaにさせたいことをインメモリソリューションに変更することが可能になります。
メッセージングパターン
さて、プロジェクトを構造化し、バスとのやりとりのセマンティクスをパターンから分離する方法がわかったところで、低レベルの実装はどうすべきでしょう?
R3では、本番環境のメッセージバスがKafkaになることが分かっていたので、Kafkaの用語と概念を多く使ってメッセージパターンライブラリをモデル化しました。Kafkaを知る開発者にとっては、今回構築したライブラリで使用されている用語を推測しやすくなっていると思います。
バスの核となるのは、何らかの方法でコレクション(トピック/Topic)に整理されたデータパケット(レコード/Record)と、並列処理のためのメカニズム(パーティション/Partition)です。Kafkaの汎用メッセージパターンライブラリをモデル化したとはいえ、その概念はほとんどのメッセージバスに適用可能な形をしています。
メッセージバスに送受信されるすべてのデータパケットは、キー(Key)と値(Value)のペアであるレコード(Record)と呼ばれるものでラップされます。レコードは、レコードの集合体であるトピック(Topic)に関連づけられます。これらのレコードは公開することもできますし、特定のパターン(処理装置)を介して新しいトピックへ渡すことができます。渡し方についても「パターンの中で消費する」、「パターンの中で処理する」、「別のパターンに向けて出力する」といったさまざまなパターンがあり得ます。複数存在することが普通のパターンのインスタンスは、それぞれトピックを処理するよう準備されますが、実際には、インスタンスごとに異なるパーティションから読み込むことになります。次に、構築した各パターンについて説明します。
Durableパターン
Durableパターンは、耐久性のあるメッセージキューを実装しており、トピックからレコードを古いレコードから順に消費していきます。Durableパターンのポイントは以下の通りです。
・レコードは順番にバッチ処理される。
・処理される個々のレコードは、さまざまなタイプのゼロまたは複数の新しいレコードを生成することができ、それらは1つまたは複数のトピックに書き戻される。
・出力レコードは、一塊のトランザクションの一部としてトピックに書き戻される。
・消費、処理、および生成のループは、アトミックに実行される。レコードは、新しいレコードを出力するトランザクションと同じソーストピックで消費されたものとしてマークされる。
このDurableパターンは、その名前が示すように、各メッセージが”間違いなく一度だけ”正常に処理されることを意図しています。出力レコードがメッセージバスに書き戻せなかった場合、コンシューマは以前のポーリング位置にリセットされ、再度消費/処理を試むようになっています。。
State and Eventパターン
このメッセージパターンはDurableパターンと非常によく似ています。イベントトピック(Event Topic)からレコードを読み込んで処理し、出力があればアトミックにメッセージバスに書き戻します。違いは、イベントにも”状 態(State)”を持たせることができる点です。あるイベントの状態は別のトピック(ステートトピック/State Topic)に保存されますが、常に同じキーを持っています。イベントがイベントトピックから消費されるとき、サブスクリプションは与えられたキーに対する最新の状態を(訳注:ステートトピックから)取得し、これらの値の両方をプロセッサに渡します。
このパターンは、メッセージバスにデータを保存したい場合に最適です。ステートトピックはデータストアとして機能し、それに対する更新はイベントトピックを介してトリガーされることができます。データを保存するために本物のDBが必要な場合もあるかもしれません。しかし、環境に別のインフラを追加する代わりに、メッセージバスを使用することができるのであれば、それを使用するのではないでしょうか?ビルドエンジニアはあなたに感謝することでしょう。
State and Eventパターンは、2つの異なるトピックを追跡する必要があるため、おそらく最も複雑なパターンです。イベントトピックのパーティションがイベントコンシューマに割り当てられると、ステートトピックの同じパーティションがステートコンシューマに手動で割り当てられ、イベントが消費される前にそのトピックパーティションからのすべてのレコードが読み込まれなければなりません。これにより、サブスクリプションは、与えられたイベントに対する最新の状態を持つことが保証されます。
PubSubパターン
PubSubパターンは、口に出して言うのは難しいですが、非常にシンプルです。プロデューサーはトピック経由でサブスクライバーにレコードを送り、パターンがアクティブでなければ、レコードを受け取ることはありません。
PubSubパターンは、まず最初に、新しいレコードの消費を開始します。パターンが始まる前にトピックに対して生成されたレコードは一切処理されません。PubSub処理系からレコードが出力されることはなく、したがってトピックに書き戻されることもありません。このパターンでは、なんらかの保証はほぼ用意されません。エラーが発生した場合、いくつかのレコードが見落とされる可能性があります。このパターンは、信頼性のある配信が必要とされていない場合や、古いメッセージを無視すべき場合に適しています。また、プロデューサが再送を実装し、コンシューマがしばらくの間オフラインになっている場合に、処理が必要な大量のバックログを引き起こす可能性があります。このパターンでは、新しいメッセージだけをピックアップすることで、この問題を軽減しています。
例えば、他のコンポーネントが稼働しているかをチェックするためのハートビートメカニズムを考えてみましょう。この時、ずっと前に送られてきたハートビートに意味がないのは明らかでしょう。なぜなら、ハートビートはずっと前に送られてきたかもしれないが、そのコンポーネントはダウンしているかもしれないからです。
Compacted subscriptionパターン(コンパクト化購読パターン)
Compacted subscriptionパターンは、トピックからレコードを消費し、アウトプットを生成しないパターンです。サブスクリプションの起動時に、トピックに存在するすべてのレコードが、最も古いレコードから順に読み込まれます。トピックのすべてのパーティションがコンシューマーに割り当てられ、レコードの取りこぼしがないようにします。そして、各キーの最新値のインメモリーマップが構築され、これをスナップショットと呼びます。このマップはコンパクト化されたプロセッサに渡され、処理されることになります。
起動時以降にトピックから消費された新しいレコード(訳注:これまでにあったキーと、それに対する新しい値のセットであることが想定されます)は個別に読み込まれ、同じキーの前の値とともにプロセッサに渡されます。このパターンは、メッセージバス上のデータを「キーと値」のペアとして保存し、キーの最新値のみが重要視される場合に有効です。設定関連のデータはこのパターンを使う例の一つになります。
RPCパターン
RPCパターンは、トピックの特定のパーティションから、メッセージバスにレコードを送信することを可能にするものです。別のプロセス(RPC Receiver Subscription)がこのレコードを受け取って処理し、別に用意される応答トピック(Response Topic)の同じパーティションで応答レコードを送り返すことが可能になります。
このしくみにより、通常のリモートプロシージャコール(RPC)のように、リモートプロセスに対して要求を出し、応答を返すことができます。
送信側にはリクエストの進行状況を追跡するためのfutureが与えられますが、このパターンのメッセージ配送は信頼性がありません。受信側は送信側トピックの最新のメッセージから読み取るので、受信側でダウンタイムが発生すると、その間に送信されたメッセージは処理されないことになります。送信側は、リクエストに失敗した場合に再試行する方法を用意しておく必要があります。
メッセージバスは通常、データのストリームを処理するために使用され、RPCパターンには当然ながら適していません。しかし、Cordaのデプロイメントエコシステムにおける他の全ての通信はメッセージバスを介して行われることから、このパターンにもメッセージバスを使用することにしました。これはRPCコールがKafkaのようなメッセージバスが提供する、高可用性やスケーラビリティといった利点を享受できることも意味します。
しかし、このパターンでメッセージバスを利用することには課題もあります。送信者が同じパーティションにレコードを出力し、正しいパーティションとトピックで応答を待っていることを確認しなければなりません。
おわりに
社内のほとんどの開発者がメッセージバスのようなコンポーネントとやり取りするコアなビジネスロジックを書く必要がある場合、使いやすいAPIの背後にあるこのロジックを抽象化することは理にかなっています。早い段階から開発者が必要とする様々なユースケースを考え、パターンを特定し、開発者のニーズに対応するために時間をかけて使用、拡張できるAPIを構築することが重要です。開発者がメッセージバスをどのように使いたいかをすべて考えることは不可能ですが、新しいパターンは常に追加することができます。
メッセージバスをAPIの後ろに隠すことは、異なるメッセージバスを異なるユースケースに使用できることを意味します。Kafkaのようなプロダクションレベルのメッセージバスを、HSQLのようなインメモリデータベースに切り替えて、ローカル開発のユニットテストで使用できるようになります。また、将来的に、お客様がKafka以外のバスを使いたい場合、パターンが明確に定義されているので、新しいメッセージバス実装をより簡単に追加することができるようになります。
将来的には、ユースケースに応じて技術スタックを使い分ける機会も出てくるかもしれません。Redisのようなキーバリューストアは、コンパクト化されたパターンのメッセージバスよりも、大きなデータオブジェクトに適しているかもしれません。また、State and eventパターンにおけるステートの保存には、従来のデータベースの方が適しているかもしれません。
いずれにせよ、現時点では、メッセージバスを用いてすべての目標を達成し、その使用方法をパターンAPIの背後に隠すことで、将来の変更に備えることが重要だと考えています。
<ご質問・ご要望の例>
- Corda Portalの記事について質問したい
- ブロックチェーンを活用した新規事業を相談したい
- 企業でのブロックチェーン活用方法を教えて欲しい 等々
SBI R3 Japan エンジニアリング部長
書籍出してます:https://amzn.asia/d/c0V31Vd
趣味:サッカー、ガンプラ、ドライブ、キャンプ