6/19(日)「JJUG CCC Spring 2022」がオンラインにて開催され、GMOインターネットグループはセッションスポンサーとして協賛・登壇しました!日本最大のJavaコミュニティイベントとして、毎年2回、春と秋に開催されている「JJUG CCC」。異なる分野で活躍するJava技術者が集まる場として、Java関連の技術や事例に関するセッションが行われ、全セッションの総視聴者数は約900名に上りました。 登壇レポートvol.02では、GMOインターネットグループのテックリードである、成瀬允宣によるセッション「イベントソーシング入門 in Java――イベントストーミングから Akka persistence を使った CQRS+ES 実装まで」をまとめたレポートをお届けします。イベントについて:https://developers.gmo.jp/19185/
登壇者
GMOインターネットグループテックリード 成瀬 允宣(@nrslib)
セッション概要
常に変化するビジネス環境に迅速に対応できるよう、いま「マイクロサービス」を取り入れてサービスを開発する企業が増えています。
従来は、モノリシック(一枚板)なアーキテクチャでひとつのデータストアに「アカウント機能」、「支払い機能」、「オーダー機能」など様々な機能を保持してサービスを提供していました。それに対してマイクロサービスは「アカウント機能」、「支払い機能」など、個別に開発された小さなサービスを組み合わせて、それぞれがデータストアを持ち、ひとつのサービスを提供しています。
変化に強くて柔軟性の高いマイクロサービスですが、システム分割によりDB連結(JOIN)ができなくなるなど弊害もあります。
そこへのアプローチに、サービス毎に分かれている個別のデータストアから必要な情報を受け取ってJOINができるデータベースをつくるコマンドクエリレスポンシビリティセグリゲーション(CQRS)という手法があります。
さらにイベントのデータを保存するデータストア「イベントジャーナル」をつくり、DB全体で過去の状況を再現できる「イベントソーシング(ES)」を行うことで、マイクロサービスの課題を解決して、サービス開発のスケーラビリティにつなげています。
本セッションは、そんないま注目されている「Event Sourcing(イベントソーシング)」の理解から、イベント設計のモデリング手法「Event Storming(イベントストーミング)」の実践、リアクティブシステムを構築できるツールキット「Akka」と「Akka persistence」を使った「CQRS+ES」実装までを学べる、イベントソーシング入門です。
セッション全編
https://www.youtube.com/watch?v=Bt_kXHlNL98
イベントソーシングとは?
【従来のデータベース】
現在のスナップショットのデータを保持しているのが「State Sourcing(ステートソーシング)」です。
【イベントソーシング】
それに対して「イベントソーシング」は、イベントを保存したデータストアです。「event_data」にはシリアライズ化されたデータが入っており、「Event type」で1行づつイベント毎にデータを保存しています。
開発者間で記述したソースコードの変更履歴を記録することができる「Git」をイメージして頂くと分かりやすいと思います。システムを差分で管理してイベント履歴を保持していき、最終的に全ての履歴を再生することで、最新データ(=State(ステート))を手に入れることができます。
イベントソーシングの利点
Pub / Sub パターンがつくりやすい
Gitのような差分管理システムで履歴が完全に残るため、ひとつのデータソースから、各自が履歴となるイベントから端末上に必要なファイル(現在のスナップショット)をつくるといった、それぞれの開発者が読み取るためのView(ビュー)をつくっています。
同じことがシステムでも出来ます。各システムは、ひとつのイベントの履歴やイベントを保持したデータストアからイベントを受け取り、それぞれに必要なビューをつくることができます。データストアには最初から現在までの完全な履歴があるのでリプレイしてそれぞれのシステムにとって欲しいデータをつくることも可能となります。
このイベントデータを収めたデータストアを「ジャーナル」、読み取り側のデータを「リードモデル」と呼んでいます。リードモデルを作るにあたって「ジャーナル」に修正が起こらないこと、これはイベントソーシングの大きなメリットのひとつです。「ジャーナル」にはイベントを保存するシステムがあります。そのため新しいシステムをつくるにあたっても、このイベントを保存するシステムに影響は起こりません。
イベントを保存する側を「Publisher」、そのイベントを購読する側を「Subscriber」(クライアント)として、ジャーナルからリードモデルを作って、それぞれシステムを作っている開発チームがいます。
「Subscriber」は、あくまでジャーナルのデータを使っているだけです「Publisher」は、「Subscriber」のためにデータを修正する必要はありません
「Subscriber」が欲しがるすべての履歴が「Publisher」に残っています。イベントソーシングはPub/Subシステムを作りやすくするのです。
◎Pub/Subパターンとは
イベント駆動型プログラミングのデザインパターン。Publisher(発行者)が発行したイベントをBroker(仲介者)が取りまとめ、Subscriber(購読者)に伝達します。イベントはBrokerが管理しているため、PublisherとSubscriber同士は疎結合となります。
マイクロサービスの課題解決につながる
複数の規模の小さなサービスを組み合わせてひとつの大きなアプリケーションを構成している分散・疎結合なアーキテクチャ、それが「マイクロサービス」です。
システム毎にデータストアが分離しているため、クライアントがマイクロサービスのデータを使う際に課題になるのがSQLのJOINができないことです。そこでイベントソーシングは、Pub/Subシステムで、各ジャーナルのイベントをクライアントが購読して自分(クライアント)にとって都合の良いリードモデルを作ることが可能になります。
Pub/Subシステムを利用して、クライアントがイベントジャーナルを自由に活用することで、DB連結の「JOIN」できないといった問題も解決できます。マイクロサービスの利点である柔軟性の高い開発に貢献して、よりスケーラブルにサービス開発を行うことが可能になるのです。
実践!イベント設計「イベントストーミング」
イベントソーシングで、どのように「ドメインのイベント」を定義して保存するべきか。そこで使えるのが「Event Storming(イベントストーミング)」という手法です。
これはAlberto Brandolini 氏が提唱した付箋紙を使ったワークショップ形式のモデリング手法。付箋にドメインで発生するイベントを書き出して、タイムラインに沿って配置することでビジネスプロセスの全体像も明らかにすることできます。
上記は、最終的な完成図になります。実装では、この図がそのままコードに落ちます。ツールはオンラインホワイトボード「Miro」を使用します。今回のお題はECシステム、「ECで注文すると、どういうことが起こるか?」です。イベントストーミングはこうしたユースケースを軸に行うのがお勧めです。最初のきっかけである「商品を注文した」からスタートしましょう。
手順1:思い付くイベントを書き出す(動画 7:00~)
https://www.youtube.com/watch?v=Bt_kXHlNL98&t=420s
ECで注文後に起きるイベントをオレンジの付箋で書き出します。すぐ議論する内容ではない、けれど確定までに処理する情報(=留意点)は、「ホットスポット」という紅色の付箋で記述していきます。
手順2:イベントを時系列に並べる(動画 8:55~)
https://www.youtube.com/watch?v=Bt_kXHlNL98&t=535s
「商品の注文をした」→「注文受付メール送信」のように、イベント(オレンジ)を発生する時系列順に並べていきます。
手順3:カラーパズルを並べる(動画 11:00~)
https://youtu.be/Bt_kXHlNL98?t=660
次にイベント(オレンジ)以外の、カラーパネルを並べていきます。 それぞれ色によって意味づけされています。 カラーパズルにはルールがあります。
ルール(1):下図の矢印の順番で色をつながなければいけない
ルール(2): 「イベント」(オレンジ)には必ず、「コマンド」(意味:そもそものきっかけ)(青) と、「Aggregates」(黄色)または「システム」(ピンク)が必要(矢印の順番)
下記では「商品を注文する」(青)→「Aggregates」(黄色)→「注文した」(オレンジ)とカラーパネルをつなげました。この3つは密着させるように配置しましょう。
次のステップ「注文受付メールが送信された」(オレンジ)ですが、ルール①の通り、「イベント」(オレンジ)→「イベント」(オレンジ)は繋げられません。
そこで「コマンド」(青)を追加。「イベント」(オレンジ)から「コマンド」に行くために、「ポリシー」(紫)か「システム」(ピンク)、「ReadModel」(黄緑)の、いずれかを通ります。
よく使用されるのが「ポリシー」(紫)です。「ポリシー」は、「自動的に」「いつでも」という「暗黙の方針(明示的な合意がない場合)」という意味付けです。これで「商品を注文した」(イベント)から→「注文受付メールが送信された」(イベント)までをつなげることができました。
進めていくと複数のイベントをつくる条件分岐も起こります。定められた順番に沿ってカラーパネルでつなげて、完成を目指しましょう。
◎ 成瀬式!地球儀について
システムとは関係ないが現実で起こりえる曖昧な部分を包み込んでくれるのが、この「地球儀」です。これは成瀬式ですが、よく使用するので作ることをおススメします!
手順4:「集約」(Aggregates)を埋めていく(動画 19:20~)
https://youtu.be/Bt_kXHlNL98?t=1160
発生するイベントをカラーパネルでつなげました。現在、黄色はすべて「Aggregates」になっているかと思います。ここに何が入るのか? ヒントは目的語を見てみてください。
コマンドは「商品を注文する」なので、「Aggregates」(黄色)には「注文」を入れました。この言葉は後で変更することができます。そのため、参加メンバーで合意がとれている言葉を自由に入れて大丈夫です。
そして同じ意味を持つものを、まとめていきます。ビューモデルに関しては、システム的には関係ないため先を繋げません。また外部システムも一旦、置いておく意味で、自分のシステムにつなげておきましょう。
◎ 成瀬式!集約について
上記の図は保存して残しておくことをおススメします!システム開発では、開発者やドメイン知識を持っているエキスパートたちなど、それぞれ持っている知識には差があります。上記の図は、それ自体がワークフローになっており、その知識がマージ(まとめられたもの)されたものなので、すごく重要な図になります。図のコピーを作成して新たにメンテナンスしていきましょう。
こうして集約(Aggregates)の振り分けが完了しました。
手順5:コンテキスト毎に分ける(動画 21:27~)
https://youtu.be/Bt_kXHlNL98?t=1287
「イベントストーミング」もいよいよ終盤になりました。最後にコンテキストを分けていきます。コンテキストについては説明が難しいため「請求系」「通知系」と…ざっくりした認識でイメージできれば、まずは大丈夫です。これが「イベントストーミング」の一連の流れになります。
Akka persistenceを使ったCQRS+ES実装
https://youtu.be/Bt_kXHlNL98?t=1585
それでは出来上がった図からシステムを作っていきます。イベントソーシングをサポートしたライブラリを使えば、イベントストーミングの図がそのままシステムになります。
リアクティブシステムを構築できるツールキット「Akka」とそのプラグイン「Akka persistence」を使い、イベントソーシング(ES)を実装していきます。AkkaはScalaのライブラリなのでJavaでも動かせます。
コードはAkkaビギナー向けに変えておりGitHubでも公開しています。
【GitHub コードはこちら】
「注文をする」から順番にコードを見てみましょう。
コマンド
コマンド部分で最初に用意すべきオブジェクトはCommandRequestというインターフェースです。
public interface OrderAggregateProtocol {
sealed interface CommandRequest {
OrderId orderId();
}
}
この図(商品を注文する)のコマンドに従って実装していきます。今回だとCreateOrderRequestになります。
Record構文で、フィールドは全て不変なクラスを使っています。
public interface OrderAggregateProtocol {
sealed interface CommandRequest {
OrderId orderId();
}
/* Create Order */
record CreateOrderRequest(
OrderId orderId,
AccountId accountId,
OrderDetail detail,
ActorRef<CreateOrderReply> replyTo) implements CommandRequest {
}
sealed interface CreateOrderReply {}
record CreateOrderSucceeded(OrderId orderId)
implements CreateOrderReply {}
record CreateOrderFailed(OrderId orderId, OrderError error)
implements CreateOrderReply {}
}
コマンドを実装したCreateOrderRequest。以下のID、オブジェクトを持っています。コマンド送信元に返答を送るため、replyToという返答を伝えるオブジェクトがフィールドに定義されています。「Akka」はアクタープログラミングなので、オブジェクトにメッセージを送る(replyToにメッセージする)ことで戻り値を返すのと同じことを実現しています。
/* Create Order */
record CreateOrderRequest(
OrderId orderId,
AccountId accountId,
OrderDetail detail,
ActorRef<CreateOrderReply> replyTo) implements CommandRequest {
}
オブジェクト指向プログラミングとの対比を取ると、メソッドの呼び出しがメッセージ、戻り値がリプライメッセージとなります。リプライは、今回の例でいえばCreateOrderSuceededとCreateOrderFalledが該当していて、それぞれ成功と失敗を示しています。
sealed interface CreateOrderReply {}
record CreateOrderSucceeded(OrderId orderId)
implements CreateOrderReply {}
record CreateOrderFailed(OrderId orderId, OrderError error)
implements CreateOrderReply {}
イベント
イベントもインターフェースを用意して実装していきます。マークアップインターフェース(下記、青枠①)はAkka Javaの都合上、実装しています。動画ではドメインにおけるイベントなので DomainEventという基底オブジェクトを宣言することも提案されていました。
青枠①
public interface OrderEvents {
sealed interface Event extends CborSerializable {
OrderId orderId();
}
}
イベントを実装したOrderCreated(オーダーが作成されました)オブジェクトは、以下のデータを保持しています。
record OrderCreated(OrderId orderId, AccountId accountId, OrderDetail detail)
implements Event {
}
集約(Aggregates)
最後に、集約(Aggregates)です。
集約(Aggregates)はこのようなコードになっています。
public record Order(OrderId orderId, AccountId accountId, State state) implements AggregateRoot {
public static Either<OrderError, OrderEvents.OrderCreated> create(
OrderId orderId,
AccountId accountId,
OrderDetail detail) {
return Either.right(new OrderEvents.OrderCreated(orderId, accountId, detail));
}
public static Order applyEvent(OrderEvents.OrderCreated event) {
return new Order(event.orderId(), event.accountId(), State.CREATED);
}
public Order applyEvent(OrderEvents.Event event) {
if (event instanceof OrderEvents.OrderCreated) {
return new Order(orderId, accountId, State.CREATED);
}
throw new IllegalStateException("Unexpected event: " + event.getClass().getName());
}
public enum State { NONE, CREATED }
}
作成するときはEither型ですが、右側の値を正しいものとするので、メソッドが正しく実行された場合にはOrderEvents.OrderCreated(オーダーを作成した)というイベントを返却しています。
public record Order(OrderId orderId, AccountId accountId, State state) implements AggregateRoot {
public static Either<OrderError, OrderEvents.OrderCreated> create(
OrderId orderId,
AccountId accountId,
OrderDetail detail) {
return Either.right(new OrderEvents.OrderCreated(orderId, accountId, detail));
}
ロジックも、right (OrderCreatedを作成)して返却しているのが分かるかと思います。今回のケースでは現段階で失敗が発生するケースがなかったため、常にEither.rightメソッドで結果を返却して成功としています。エラーが起きる場合はleftでエラーを返しましょう。
以下が、そのイベントを適用する部分です。イベントを受け取り、それにしたがって処理を行います。
public Order applyEvent(OrderEvents.Event event) {
if (event instanceof OrderEvents.OrderCreated) {
return new Order(orderId, accountId, State.CREATED);
}
throw new IllegalStateException("Unexpected event: " + event.getClass().getName());
}
record型は値のミューテーションができないため、インスタンスを再生成しています。「Createdイベントを受け取り→stateをCREATEDにしたオブジェクトを作成する」イメージです。
if (event instanceof OrderEvents.OrderCreated) {
return new Order(orderId, accountId, State.CREATED);
}
コマンド、イベント、集約(Aggregates) を、まとめるオブジェクト
ここまでのコマンド、イベント、集約(Aggregates)をまとめるオブジェクトを解説していきます。コマンドを受け取って、そのコマンドを注文に渡して、イベントを保存するというオブジェクトです。
実装は、以下の通りです。
public class OrderAggregate extends EventSourcedBehavior<OrderAggregateProtocol.CommandRequest, OrderEvents.Event, Order> {
---/* 集約(Aggregates)が空の時 */
@Override
public Order emptyState() {
return null;
}
/* コマンドを受け取る時 */
@Override
public CommandHandler<OrderAggregateProtocol.CommandRequest, OrderEvents.Event, Order> commandHandler() {
return newCommandHandlerBuilder()
.forAnyState()
.onCommand(OrderAggregateProtocol.CreateOrderRequest.class, (__, command) -> {
var result = Order.create(command.orderId(), command.accountId(), command.detail());
if (result.isRight()) {
return Effect().persist(result.right().value())
.thenReply(command.replyTo(), order ->
new OrderAggregateProtocol.CreateOrderSucceeded(
order.orderId())
);
} else {
return Effect().none()
.thenReply(command.replyTo(), ___ ->
new OrderAggregateProtocol.CreateOrderFailed(
command.orderId(),
result.left().value()));
}
})
.build();
}
/* イベントが保存される時 */
@Override
public EventHandler<Order, OrderEvents.Event> eventHandler() {
return newEventHandlerBuilder()
.forAnyState()
.onEvent(OrderEvents.OrderCreated.class, (__, event) -> {
var order = Order.applyEvent(event);
return order;
})
.onAnyEvent((order, event) -> order.applyEvent(event));
}
}
それぞれ実行される処理
上のコードでは「コマンドを受け取る時」「イベントが保存される時」「集約(Aggregates)が空の時」と3つに別れているのが理解できると思います。以下で、それぞれ実行される処理を見てみましょう。
コマンドを受け取る時
@Override
public CommandHandler<OrderAggregateProtocol.CommandRequest, OrderEvents.Event, Order> commandHandler() {
return newCommandHandlerBuilder()
.forAnyState()
.onCommand(OrderAggregateProtocol.CreateOrderRequest.class, (__, command) -> {
var result = Order.create(command.orderId(), command.accountId(), command.detail());
if (result.isRight()) {
return Effect().persist(result.right().value())
.thenReply(command.replyTo(), order ->
new OrderAggregateProtocol.CreateOrderSucceeded(
order.orderId())
);
} else {
return Effect().none()
.thenReply(command.replyTo(), ___ ->
new OrderAggregateProtocol.CreateOrderFailed(
command.orderId(),
result.left().value()));
}
})
.build();
}
着目すべきはonCommandの部分です。第一引数にOrderAggregateProtocol.CreateOrderRequest.classを受け取っています。このコードは OrderAggregateProtocol.CreateOrderRequestメッセージを受け取ったら、第二引数のラムダ式(青枠②)を実行するという意味になります。
青枠②
.onCommand(OrderAggregateProtocol.CreateOrderRequest.class, (__, command) -> {
var result = Order.create(command.orderId(), command.accountId(), command.detail());
if (result.isRight()) {
return Effect().persist(result.right().value())
.thenReply(command.replyTo(), order ->
new OrderAggregateProtocol.CreateOrderSucceeded(
order.orderId())
);
今回のケースではまずコマンドのデータを利用してOrder.createメソッドを呼び出しています。Order.createメソッドは次のコードになります。
<Order.createメソッド>
public static Either<OrderError, OrderEvents.OrderCreated> create(
OrderId orderId,
AccountId accountId,
OrderDetail detail) {
return Either.right(new OrderEvents.OrderCreated(orderId, accountId, detail));
}
Order.createメソッドの結果のイベントはEffect.persistメソッドによって永続化されます。永続化が成功すると次のイベントハンドラへ移行します。
return Effect().persist(result.right().value())
.thenReply(command.replyTo(), order ->
new OrderAggregateProtocol.CancelOrderDueToDiscontinuationSucceeded(
order.orderId()))
イベントが保存された時
見るべき場所は青枠③です。オーダーが作成された時、このイベントを受け取ったら(OrderEvents.OrderCreated.class)、ラムダ(_, event)-> {…}を実行します。eventにはOrder Createdイベントのオブジェクトがバインドされてきます。
青枠③
@Override
public EventHandler<Order, OrderEvents.Event> eventHandler() {
return newEventHandlerBuilder()
.forAnyState()
.onEvent(OrderEvents.OrderCreated.class, (__, event) -> {
var order = Order.applyEvent(event);
return order;
})
.onAnyEvent((order, event) -> order.applyEvent(event));
}
ラムダで、Order.applyEventにeventを渡して、オーダー(order)を生成するメソッドを実行しています。
var order = Order.applyEvent(event);
以下がapplyEventメソッドになります。 applyEvent を実行するとOrderが作成され、インスタンス化されて返却されます。
public static Order applyEvent(OrderEvents.OrderCreated event) {
return new Order(event.orderId(), event.accountId(), State.CREATED);
}
返却された値を最新のstateとして扱い、リターン(return)するといった形になっています。
.onEvent(OrderEvents.OrderCreated.class, (__, event) -> {
var order = Order.applyEvent(event);
return order;
})
集約(Aggregates)
@Override
public Order emptyState() {
return null;
}
今回のケースでは空の状態をnullとしています。以上が、 コマンド、イベント、集約(Aggregates)をまとめるオブジェクト の解説になります。セッションでは「コマンド定義からコーディングまでの動き」も紹介しています。そちらも併せて、ご覧ください。
イベントソーシングの原理
ロジックを実行する側が「AggregateAdaptor」(Adaptor)にコマンドをリクエストします「AggregateAdaptor」がOrderのcreateメソッドを呼び出し、イベントを受け取ります(2.)で作成したイベントをpersistしますデータストアにOrderCreatedのイベントが保存されます(2.)のイベントをOrderに引き渡し適用することでフィールドの値をミューテーションしますオーダーのステート=中身=データがOrderCreatedのデータなどにより書き換えられます
イベントソーシングの機能
イベントを保存する以外に、イベントソーシングは下記の機能も持っています。
リプレイ機能
過去のイベントを読み出し、アグリゲートを再構築する機能です。これにより最新のステートを取得できます。
スナップショット機能
アグリゲートのスナップショットを取り、インスタンスを再構築するためにイベントを何度も再生するという処理を効率化させることができます。スナップショットの実行処理についてはセッション内で詳しく解説しています。(動画 37:08~)
まとめ
イベントソーシングはシステムをスケーラブルにしますマイクロサービスの課題解決につなげる手法のひとつですデータストアが分かれてしまうので、JOINができないという課題をイベントソーシングでPub/Subにすることで解決につながりますドメインをモデリングするのは、イベントストーミングが有効ですイベントストーミングの図を使って「Akka」で実装できます検証した限り(他のマイクロサービス課題も含めて)トータルで、学習コストが少ないのが「Akka」でした
Q&A
質問1:スナップショットについて
主催注文のように終了が明確なものだとスナップショットは取りやすいですが、終了がない終了までが長い場合はどうするか。変更履歴の数でカウントもできますか?
スナップショットを取るかどうか判定するコードにはシーケンスナンバーがあって、この値は何個目のイベントか?をカウントできる数になります。セッション内ではイベントの型によってスナップショットを取得する話をしてきましたが、そのシーケンスナンバーを使って何個目のイベントか?という判定も入れることができます。成瀬
質問2:開発における問題点
主催イベントソーシングでの開発する際、開発量やデータ量の増加など問題点はありますか?
データ量は増えます。本来、イベントソーシングで出来たジャーナルデータと、読み込みのリードモデルを用意するため、1チームで管理していると重複してデータが存在しているように感じます。開発量に関しては、個人的にあまり意識していないです。覚えることも多いため認知負荷はかかりましたがコード自体はクラスをいくつか用意して、それらを使うので、普段の記述と特段、変わらないという印象です。成瀬
質問3:イベントデータのリカバリーについて
主催間違ったデータが出来てしまった場合、リカバリーは難しいですか?
イベントの中身がおかしいとリード側でエラーを発生することがあります。その場合、元となるイベント側でエラーが起きないように修正する処理が必要になります。シリアライズ化されたデータを修正することになるのでツールが必要になります。また、リード側に問題がない場合、「エラーを治すための修正をしました」というイベントを入れる方法もあります。打消しのイベントですね、Gitのリバートと同じイメージです。成瀬
さいごに
今回、セッションに参加された方の中には「イベントソーシング初心者でも考え方や、メリットを理解できて良かった」という話や、「コードが分かりやすかった」という声などが寄せられました。
成瀬氏も「海外の資料を読んで挑んだ」と話していた「イベントストーミング」については初めて知った方が多かったようです。「イベントストーミングはユースケースの整理にも良さそう」 との声もありました。
本セッションはノーカット版として成瀬の個人チャンネル「なるセミ」でも配信していますので、そちらも是非ご覧ください。 今回、お伝え出来なかったPub/Subの詳しい実装については、また次の機会をお楽しみに。「JJUG CCC Spring 2022」ご参加いただき、誠にありがとうございました。
連載記事はこちら
https://developers.gmo.jp/19420/