Fire and Forget

書いたら忘れる、開発やプログラミングに関するブログ。最近はScalaやAkkaなど。

Akka 2.4.0がリリースされたのでAkka Persistenceを試してみる。

 こんにちは。先月、待望のAkka 2.4.0がリリースされたので、 2.4.0の目玉機能であるAkka Persistenceを試してみました。
 いやー、待ちました。なかなかexperimental(実験的な機能の意味)が外れないのでやきもきしていましたが、やっとakka-persistence-experimentalからakka-persistenceになったので、自信を持ってブログで紹介することが出来るようになりました。リリースお疲れ様です!

Akka 2.4.0についてはこちら↓
Akka 2.4.0 Released! | Akka

Akka Persistenceについてはこちら↓
Persistence — Akka Documentation

今回の記事は、Akkaは知っているけどAkka Persistenceは知らないよという方向けの記事です。
ん?Elixir?知らない子ですね。

Akka Persistenceとは

 Akka Persistenceを使うとActorの内部状態を永続化することが出来るようになります。マシンやJVMがシャットダウンしても、 前回のActorの状態を引き継いだActorを立ち上げることが出来ます。
Akka Persistenceはイベントソーシングの考え方に基づき、Actorの内部状態を直接保存するのではなく、そのActorに起こった一連のイベントを保存し、Actorの復元の際にそのイベントを順番に再生することで内部状態を復元します。

イベントソーシングを使うことによって、

  • 履歴管理が不要
  • 溜まったイベントをデータ解析に使える
  • イベントは基本的には追記のみなのでパフォーマンスが出やすい
  • ActorのID(persistenceId)に基づいて水平分割できるのでスケールしやすい
  • ORMが不要
  • スキーマの概念がないので、イベントの種類を増やすことでモデルを簡単に拡張できる。
    と、様々な特徴があります。

 イベントソーシングはしばしば、コマンドクエリ責務分離(CQRS: Command Query Responsibility Segregation)と併用されて、 コマンドによって引き起こされたドメインイベントをイベントソーシングで保存して(コマンドサイド)、 Actorの内部状態を好きなように、DBに投影して、クエリに応答する(クエリサイド)。といった使われ方をされます。

 今回は、イベントソーシングやCQRSについては紹介しきれませんが、 Akka Persistenceの簡単な紹介から、イベントソーシングやCQRSについて興味を持って頂けたら幸いです。

はじめての永続化

 全ソースはこちら

 Akka Persistenceを使うためには libraryDependenciesに
"com.typesafe.akka" %% "akka-persistence" % "2.4.0" を追加します。

例として、単純に0から1ずつカウントアップして、カウントした値を保持するActorを作ってみます。

class CountUpActor extends PersistentActor with ActorLogging {

  var count: Int = 0

  override def persistenceId: String = self.path.name

  override def receiveCommand: Receive = {
    case c: CountUp =>
      log.info("receive command {}", c)
      persist(Increased(c.count)) {
        event =>
          count += event.diff
          log.info("current count {}", count)
      }
  }

  override def receiveRecover: Receive = {
    case e: Increased =>
      log.info("receive recover {}", e)
      count += e.diff
  }
}

object CountUpActor {
  def props = Props[CountUpActor]

  case class CountUp(count: Int)

  case class Increased(diff: Int)

}

 PersistentActorを継承しています。
def receiveCommand で、CountUpコマンドを受け取ってpersist(Increased(c.count)) で永続化しています。
ポイントは永続化しているのは、内部状態(count)ではなく、 増加した(Increased)というイベントを永続化しているところです。 また、永続化した後に count += event.diff と内部状態を更新しているので、 状態は変わったのに、永続化されていないといったことは起こりません。

永続化したイベントは次回Actorの起動時に、 def receiveRecover でハンドルできます。 ここでは復元したイベントをもとにcountを増加させて、前回終了時のcountの値を復元しています。

実行

 さっそく動かしてみましょう。Mainクラスはこんな感じにCountUpコマンドを3回送っています。

object ApplicationMain extends App {
  val system = ActorSystem("MyActorSystem")

  val actor = system.actorOf(CountUpActor.props, "c1")

  actor ! CountUp(1)
  actor ! CountUp(1)
  actor ! CountUp(1)

  Await.result(system.whenTerminated, Duration.Inf)
}

一回目の実行結果

[info] Running com.example.ApplicationMain
[info][akka://MyActorSystem/user/c1] receive command CountUp(1)
[info][akka://MyActorSystem/user/c1] current count 1
[info][akka://MyActorSystem/user/c1] receive command CountUp(1)
[info][akka://MyActorSystem/user/c1] current count 2
[info][akka://MyActorSystem/user/c1] receive command CountUp(1)
[info][akka://MyActorSystem/user/c1] current count 3

二回目の実行結果

[info] Running com.example.ApplicationMain
[info][akka://MyActorSystem/user/c1] receive recover Increased(1)
[info][akka://MyActorSystem/user/c1] receive recover Increased(1)
[info][akka://MyActorSystem/user/c1] receive recover Increased(1)
[info][akka://MyActorSystem/user/c1] receive command CountUp(1)
[info][akka://MyActorSystem/user/c1] current count 4
[info][akka://MyActorSystem/user/c1] receive command CountUp(1)
[info][akka://MyActorSystem/user/c1] current count 5
[info][akka://MyActorSystem/user/c1] receive command CountUp(1)
[info][akka://MyActorSystem/user/c1] current count 6

 一回目の実行で countが3まで増加しています。
二回目の実行で receive recover Increased(1) が3回続き、1回目の実行で保存したイベントが復元されているのがわかります。 その後、CoutUpコマンドを3回受け取り、正しく、合計6までカウントすることができています。

Persistence Plugin

 今回の例ではイベントの保存先はLevelDBを使っていますが、LevelDB以外にも使うことができて、自分で開発したプラグインや他の方が開発したプラグインをつかって 様々なDB(永続化機構)にイベントを保存することができます。 http://akka.io/community/#plugins-to-akka-persistence
 テスト時は標準で準備されているインメモリプラグインを使うと良いと思います。
設定はapplication.confに書きます。
akka.persistence.journal.plugin="akka.persistence.journal.inmem"

おわりに

 今回は、Akka Persistenceの簡単な紹介でしたが、

  • イベントストアの肥大化対策(スナップショット)
  • 分散環境(Cluster Sharding)
  • クエリサイド( akka-persistence-query)
  • メッセージの再送(At-Least-Once Delivery)
  • PersistentFSM
  • ドメイン駆動設計との絡み

など、まだまだAkkaやAkka Persistenceの奥深さを味わえるトピックスがあるので、 気が向いたら紹介したいと思います。

Akka やAkka Persistenceを知りたい場合は、公式ドキュメント以上に良い情報はないと思うので そちらを読めば良いと思います。

書籍であれば、

Effective Akka

Effective Akka

Reactive Messaging Patterns with the Actor Model: Applications and Integration in Scala and Akka

Reactive Messaging Patterns with the Actor Model: Applications and Integration in Scala and Akka

 CQRSやイベントソーシングについては、以下の書籍が参考になりました。 特に.NETのエンタープライズアプリケーションアーキテクチャ第2版はCQRSやイベントソーシングの説明は分かりやすかったです。.NETとタイトルに付いているので目に止まらない方もいると思いますが、良書だと思います。

実践ドメイン駆動設計 (Object Oriented Selection)

実践ドメイン駆動設計 (Object Oriented Selection)

.NETのエンタープライズアプリケーションアーキテクチャ 第2版 (マイクロソフト公式解説書)

.NETのエンタープライズアプリケーションアーキテクチャ 第2版 (マイクロソフト公式解説書)