読者です 読者をやめる 読者になる 読者になる

Fire and Forget

書いたら忘れる、開発やプログラミングに関するブログ。最近はScalaやAkkaなど。

Reactive Messaging Patterns with Actor Modelを読んで見る

Reactive Messaging Patterns with Actor Modelを読んで見る

Vaughn Vernon氏の「実践ドメイン駆動設計」を読んだのですが、C#のソースの紹介しかなくて、いまいちScalaでどのように実装してよいか わからなかったので、同著者の「Reactive Messaging Patterns with Actor Model」(RMP本(仮))を買って読んでいます。

Reactive Messaging Patterns with the Actor Model: Applications and Integration in Scala and Akka

Reactive Messaging Patterns with the Actor Model: Applications and Integration in Scala and Akka

実践ドメイン駆動設計著者自らが、本のなかで出てきたパターンをAkkaでどのように実装するかを解説してくれていますので、かなり参考になります。
僕はIDDD本の中で出てきたCQRSやイベントソーシング、長期プロセス(サーガ)をどのように実装してよいかわからなかったのですが、この本に助けてもらってなんとか実装できそうです。
他にも、

  • ヘキサゴナルアーキテクチャ、ポート&アダプターってどうするの
  • 腐敗防止層ってどうやるの
  • 境界づけられたコンテキストの統合
  • マイクロサービス
  • 結果整合性
  • トランザクションってどうするの
  • メッセージのルーティング、チャンネル

などがわからない時に参考になると思います。 まだ、つまみ読みしかしていないので、他にもたくさんあると思います。IDDD本の内容は大体カバーしてるんじゃないかな。。

Akkaの紹介、スーパーバイザ、Akka ClusterやAkka Cluster Sharding、Akka Remoteの解説も充実しているのでそれだけでも読む価値があると 思います。

表紙に「Foreword By Jonas Bonér, Founder of the Akka Project」ってあるので、僕なんかの推薦よりよっぽど説得力がありますね。
Jonasさんはこの前の、ScalaMatsuriでも登壇されていましたね。

ちょっと気をつけたほうがよいのが、Akka Persistenceが実験的(experimental)な機能の時にかかれているので、 Persistent Viewの紹介がありますが、今はPersistent Queryに変わっている?ので参考にならないかもしれないです。

この本で学べること

背表紙に書いてあることを雑にまとめると。

  • どのようにしてリアクティブシステムがコア、ミドル、エッジのあらゆる場所で、複雑なものををシンプルに変えるか。
  • アクターやアクターシステムの特徴、Akkaがどのようにパワフルにしてくれるか。
  • 複数の計算資源でスケールするシステムをどのように構築するか。
  • それぞれのアプリケーションや統合の際にチャンネル機構の構成や適したチャンネルをどのように選ぶか。
  • 送信者の意図を明確に運ぶメッセージをどのように構成するか。
  • DDDの文脈で、どのようにプロセスマネージャーを実装するか。
  • メッセージソースとメッセージの宛先をいかにして分離するか。そして、どのようにそのルーターの中に適切なビジネスロジックを統合するか。
  • メッセージの変換がどのようにして、アプリケーションやインテグレーションに適合するか。
  • イベントソーシングを使ったPersistent Actorsの実装、 CQRSを使ったリアクティブビューをどのように実装するか。
事前にあると良い知識

まずは同じ著者(Vaughn Vernon氏)の「実践ドメイン駆動設計」(IDDD本)は読んでおいたほうが良いです。IDDD本を読んでいないとピンと来ないと思います。IDDD本を読んだけど、いまいちScalaでの実装が想像できなかった人はこの本を読むと良いと思います。 Akkaの簡単な解説はこの本に含まれていますが、ScalaやAkkaの知識はあったほうが良いです。 Enterprise Integration Patterns(EIP)(なんで翻訳されていないんだ。。)も度々言及されているのであると良いのかもしれません。僕は読んだことがありません。  

各章の紹介

 読み進めて、気が向いたら書く。
 読書会も企画中です。興味ある方はtwitterでメッセージください。

Akka 2.4.0がリリースされたのでAkka Persistenceを試してみる。

 こんにちは。先月、待望のAkka 2.4.0がリリースされたので、 2.4.0の目玉機能であるAkka Persistenceを試してみました。
 いやー、待ちました。なかなかexperimental(実験的な機能の意味)が外れないのでやきもきしていましたが、やっとakka-persistence-experimentalからakka-persistenceになったので、自信を持ってブログで紹介することが出来るようになりました。リリースお疲れ様です!

Akka 2.4.0についてはこちら↓
Akka 2.4.0 Released! | Akka

Akka Persistenceについてはこちら↓
Persistence — Akka Documentation

今回の記事は、Akkaは知っているけどAkka Persistenceは知らないよという方向けの記事です。
ん?Elixir?知らない子ですね。

Akka Persistenceとは

 Akka Persistenceを使うとActorの内部状態を永続化することが出来るようになります。マシンやJVMがシャットダウンしても、 前回のActorの状態を引き継いだActorを立ち上げることが出来ます。
Akka Persistenceはイベントソーシングの考え方に基づき、Actorの内部状態を直接保存するのではなく、そのActorに起こった一連のイベントを保存し、Actorの復元の際にそのイベントを順番に再生することで内部状態を復元します。

イベントソーシングを使うことによって、

  • 履歴管理が不要
  • 溜まったイベントをデータ解析に使える
  • イベントは基本的には追記のみなのでパフォーマンスが出やすい
  • ActorのID(persistenceId)に基づいて水平分割できるのでスケールしやすい
  • ORMが不要
  • スキーマの概念がないので、イベントの種類を増やすことでモデルを簡単に拡張できる。
    と、様々な特徴があります。

 イベントソーシングはしばしば、コマンドクエリ責務分離(CQRS: Command Query Responsibility Segregation)と併用されて、 コマンドによって引き起こされたドメインイベントをイベントソーシングで保存して(コマンドサイド)、 Actorの内部状態を好きなように、DBに投影して、クエリに応答する(クエリサイド)。といった使われ方をされます。

 今回は、イベントソーシングやCQRSについては紹介しきれませんが、 Akka Persistenceの簡単な紹介から、イベントソーシングやCQRSについて興味を持って頂けたら幸いです。

はじめての永続化

 全ソースはこちら

 Akka Persistenceを使うためには libraryDependenciesに
"com.typesafe.akka" %% "akka-persistence" % "2.4.0" を追加します。

例として、単純に0から1ずつカウントアップして、カウントした値を保持するActorを作ってみます。

class CountUpActor extends PersistentActor with ActorLogging {

  var count: Int = 0

  override def persistenceId: String = self.path.name

  override def receiveCommand: Receive = {
    case c: CountUp =>
      log.info("receive command {}", c)
      persist(Increased(c.count)) {
        event =>
          count += event.diff
          log.info("current count {}", count)
      }
  }

  override def receiveRecover: Receive = {
    case e: Increased =>
      log.info("receive recover {}", e)
      count += e.diff
  }
}

object CountUpActor {
  def props = Props[CountUpActor]

  case class CountUp(count: Int)

  case class Increased(diff: Int)

}

 PersistentActorを継承しています。
def receiveCommand で、CountUpコマンドを受け取ってpersist(Increased(c.count)) で永続化しています。
ポイントは永続化しているのは、内部状態(count)ではなく、 増加した(Increased)というイベントを永続化しているところです。 また、永続化した後に count += event.diff と内部状態を更新しているので、 状態は変わったのに、永続化されていないといったことは起こりません。

永続化したイベントは次回Actorの起動時に、 def receiveRecover でハンドルできます。 ここでは復元したイベントをもとにcountを増加させて、前回終了時のcountの値を復元しています。

実行

 さっそく動かしてみましょう。Mainクラスはこんな感じにCountUpコマンドを3回送っています。

object ApplicationMain extends App {
  val system = ActorSystem("MyActorSystem")

  val actor = system.actorOf(CountUpActor.props, "c1")

  actor ! CountUp(1)
  actor ! CountUp(1)
  actor ! CountUp(1)

  Await.result(system.whenTerminated, Duration.Inf)
}

一回目の実行結果

[info] Running com.example.ApplicationMain
[info][akka://MyActorSystem/user/c1] receive command CountUp(1)
[info][akka://MyActorSystem/user/c1] current count 1
[info][akka://MyActorSystem/user/c1] receive command CountUp(1)
[info][akka://MyActorSystem/user/c1] current count 2
[info][akka://MyActorSystem/user/c1] receive command CountUp(1)
[info][akka://MyActorSystem/user/c1] current count 3

二回目の実行結果

[info] Running com.example.ApplicationMain
[info][akka://MyActorSystem/user/c1] receive recover Increased(1)
[info][akka://MyActorSystem/user/c1] receive recover Increased(1)
[info][akka://MyActorSystem/user/c1] receive recover Increased(1)
[info][akka://MyActorSystem/user/c1] receive command CountUp(1)
[info][akka://MyActorSystem/user/c1] current count 4
[info][akka://MyActorSystem/user/c1] receive command CountUp(1)
[info][akka://MyActorSystem/user/c1] current count 5
[info][akka://MyActorSystem/user/c1] receive command CountUp(1)
[info][akka://MyActorSystem/user/c1] current count 6

 一回目の実行で countが3まで増加しています。
二回目の実行で receive recover Increased(1) が3回続き、1回目の実行で保存したイベントが復元されているのがわかります。 その後、CoutUpコマンドを3回受け取り、正しく、合計6までカウントすることができています。

Persistence Plugin

 今回の例ではイベントの保存先はLevelDBを使っていますが、LevelDB以外にも使うことができて、自分で開発したプラグインや他の方が開発したプラグインをつかって 様々なDB(永続化機構)にイベントを保存することができます。 http://akka.io/community/#plugins-to-akka-persistence
 テスト時は標準で準備されているインメモリプラグインを使うと良いと思います。
設定はapplication.confに書きます。
akka.persistence.journal.plugin="akka.persistence.journal.inmem"

おわりに

 今回は、Akka Persistenceの簡単な紹介でしたが、

  • イベントストアの肥大化対策(スナップショット)
  • 分散環境(Cluster Sharding)
  • クエリサイド( akka-persistence-query)
  • メッセージの再送(At-Least-Once Delivery)
  • PersistentFSM
  • ドメイン駆動設計との絡み

など、まだまだAkkaやAkka Persistenceの奥深さを味わえるトピックスがあるので、 気が向いたら紹介したいと思います。

Akka やAkka Persistenceを知りたい場合は、公式ドキュメント以上に良い情報はないと思うので そちらを読めば良いと思います。

書籍であれば、

Effective Akka

Effective Akka

Reactive Messaging Patterns with the Actor Model: Applications and Integration in Scala and Akka

Reactive Messaging Patterns with the Actor Model: Applications and Integration in Scala and Akka

 CQRSやイベントソーシングについては、以下の書籍が参考になりました。 特に.NETのエンタープライズアプリケーションアーキテクチャ第2版はCQRSやイベントソーシングの説明は分かりやすかったです。.NETとタイトルに付いているので目に止まらない方もいると思いますが、良書だと思います。

実践ドメイン駆動設計 (Object Oriented Selection)

実践ドメイン駆動設計 (Object Oriented Selection)

.NETのエンタープライズアプリケーションアーキテクチャ 第2版 (マイクロソフト公式解説書)

.NETのエンタープライズアプリケーションアーキテクチャ 第2版 (マイクロソフト公式解説書)