Akka Persistence: Testing Persistent Actors

Akka persistence provides functionality for persisting the internal state of an actor so it can be recovered when the actor is started, restarted after a shutdown/crash or migrated in a cluster.

The main idea behind Akka persistence is Event Sourcing, where the individual changes to an actor are persisted as opposed to persisting the entire state itself (although snapshotting of the whole state can be enabled to speed-up recovery and/or allow for event deletion to save disk space). All the events are persisted in a journal, keyed by the actor's persistenceId. After actor is started/restarted its internal state is rebuilt by replaying all of its persisted events.

In this post we are going to focus on testing persistent actors. There is more to say about Akka persistence (I might dedicate a blog post for that). Meanwhile, if you want to read more about Event Sourcing and Akka Persistence check out Martin Fowler's post on Event Sourcing and the official Akka documentation.

Example

Persistent actor implementation

Let's start with a simple implementation of a shopping cart actor. We use PersistentActor to persist our actor's events:

class ShoppingCartActor(id: String) extends PersistentActor with ActorLogging {  
  private var state: Seq[ShoppingItem] = Seq.empty
  override def persistenceId: String = id

  override def receiveCommand: Receive = {
    case AddItemCommand(item) =>
      persist(ItemAdded(item)) { evt =>
        state = applyEvent(evt)
        sender() ! AddItemResponse(item)
      }
    case UpdateItemCommand(item) =>
      persist(ItemUpdated(item)) { evt =>
        state = applyEvent(evt)
        sender() ! UpdateItemResponse(item)
      }
    case RemoveItemCommand(itemId) =>
      persist(ItemRemoved(itemId)) { evt =>
        state = applyEvent(evt)
        sender() ! RemoveItemResponse(itemId)
      }
    case GetItemsRequest =>
      sender() ! GetItemsResponse(state)
  }

  override def receiveRecover: Receive = {
    case evt: ShoppingCartEvent => state = applyEvent(evt)
    case RecoveryCompleted => log.info("Recovery completed!")
  }

  private def applyEvent(shoppingCartEvent: ShoppingCartEvent): Seq[ShoppingItem] = shoppingCartEvent match {
    case ItemAdded(item) => item +: state
    case ItemUpdated(item) => item +: state.filterNot(_.id == item.id)
    case ItemRemoved(itemId) => state.filterNot(_.id == itemId)
  }
}

ShoppingCart's protocol:

object ShoppingCartActor {  
  def props(id: String): Props = Props(new ShoppingCartActor(id))

  //protocol
  case class AddItemCommand(shoppingItem: ShoppingItem)
  case class AddItemResponse(shoppingItem: ShoppingItem)

  case class UpdateItemCommand(shoppingItem: ShoppingItem)
  case class UpdateItemResponse(shoppingItem: ShoppingItem)

  case class RemoveItemCommand(shoppingItemId: String)
  case class RemoveItemResponse(shoppingItemId: String)

  case object GetItemsRequest
  case class GetItemsResponse(items: Seq[ShoppingItem])

  // events
  sealed trait ShoppingCartEvent
  case class ItemAdded(shoppingItem: ShoppingItem) extends ShoppingCartEvent
  case class ItemUpdated(shoppingItem: ShoppingItem) extends ShoppingCartEvent
  case class ItemRemoved(shoppingItemId: String) extends ShoppingCartEvent
}
Testing persistent actors

The general pattern for testing persistent actors is pretty straightforward:

  • Create the actor you want to test.
  • Send commands to your actor. The actor will persist the events triggered by these commands.
  • Restart the actor.
  • Test the actor's state. It should be the same as before restarting it.

For testing persistent actors we will use akka-persistence-inmemory. This is a plugin for akka-persistence that stores journal and snapshot data in memory.

Usage is straightforward:

  • Add the dependency to your build.sbt file:
resolvers += Resolver.jcenterRepo

libraryDependencies += "com.github.dnvriend" %% "akka-persistence-inmemory" % "1.3.7"  
  • Configure Akka persistence to use the inmemory-journal and inmemory-snapshot-store plugins. Add the following configuration to application.conf:
akka.persistence {  
  journal.plugin = "inmemory-journal"
  snapshot-store.plugin = "inmemory-snapshot-store"
}
Triggering receiveRecover:

Important: It is not possible to use TestActorRef for testing PersistentActor or any other persistence provided classes (e.g. AtLeastOnceDelivery) due to its synchronous nature. These traits perform asynchronous tasks in background in order to handle internal persistence related events. Therefore, when testing persistent actors you should rely on asynchronous messaging using TestKit.

We will achieve this in two ways:

1. Stop the actor by sending a PoisonPill. Next, create another actor with the same persistenceId so it reads from the same journal when recovering. Here are the tests for the first approach:

import akka.actor.{ActorSystem, PoisonPill}  
import akka.testkit.{ImplicitSender, TestKit}  
import com.ted.playground.akka.persistence.shopping.ShoppingCartActor._  
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}

class ShoppingCartActorSpec  
  extends TestKit(ActorSystem("ShoppingCartActorSpec"))
    with WordSpecLike
    with Matchers
    with BeforeAndAfterAll
    with ImplicitSender {

  override def afterAll: Unit = {
    TestKit.shutdownActorSystem(system)
  }

  "ShoppingCartActor" should {
    val shoppingItem = ShoppingItem("sku-000001", "Cheap headphones", 42.25, 2)

    "add an item to the shopping cart and preserve it after restart" in {
      val shoppingCartId = "sc-000001"
      val shoppingCartActor = system.actorOf(ShoppingCartActor.props(shoppingCartId))

      shoppingCartActor ! AddItemCommand(shoppingItem)
      expectMsg(AddItemResponse(shoppingItem))

      shoppingCartActor ! PoisonPill

      // creating a new actor with the same persistence id
      val shoppingCartActor2 = system.actorOf(ShoppingCartActor.props(shoppingCartId))

      shoppingCartActor2 ! GetItemsRequest

      expectMsg(GetItemsResponse(Seq(shoppingItem)))
    }

    "update an existing item to the shopping cart and preserve the changes after restart" in {
      val shoppingCartId = "sc-000002"
      val shoppingCartActor = system.actorOf(ShoppingCartActor.props(shoppingCartId))
      val updatedShoppingItem = shoppingItem.copy(quantity = 5)

      shoppingCartActor ! AddItemCommand(shoppingItem)
      expectMsg(AddItemResponse(shoppingItem))
      shoppingCartActor ! UpdateItemCommand(updatedShoppingItem)
      expectMsg(UpdateItemResponse(updatedShoppingItem))

      shoppingCartActor ! PoisonPill

      // creating a new actor with the same persistence id
      val shoppingCartActor2 = system.actorOf(ShoppingCartActor.props(shoppingCartId))
      shoppingCartActor2 ! GetItemsRequest

      expectMsg(GetItemsResponse(Seq(updatedShoppingItem)))
    }

    "remove an existing item from the shopping cart and preserve the changes after restart" in {
      val shoppingCartId = "sc-000003"
      val shoppingCartActor = system.actorOf(ShoppingCartActor.props(shoppingCartId))

      shoppingCartActor ! AddItemCommand(shoppingItem)
      expectMsg(AddItemResponse(shoppingItem))
      shoppingCartActor ! RemoveItemCommand(shoppingItem.id)
      expectMsg(RemoveItemResponse(shoppingItem.id))

      shoppingCartActor ! PoisonPill

      // creating a new actor with the same persistence id
      val shoppingCartActor2 = system.actorOf(ShoppingCartActor.props(shoppingCartId))
      shoppingCartActor2 ! GetItemsRequest

      expectMsg(GetItemsResponse(Seq.empty))
    }
  }
}

2. The second approach is slightly different. Restart the actor by throwing an exception. According to the default supervisor strategy the actor will be restarted. Thus, the persistenceId will be the same and we won't have to create another actor. To avoid any redundant functionality (read as mess) we won't add this behaviour directly into the actor. Instead we are going to mix in a small utility trait (called RestartableActor) which will add this behaviour for us.
Note: We use this trait only in tests, not in the actor implementation code. Here is what RestartableActor looks like:

import akka.persistence.PersistentActor  
import com.ted.playground.akka.persistence.fixtures.RestartableActor._

trait RestartableActor extends PersistentActor {

  abstract override def receiveCommand = super.receiveCommand orElse {
    case RestartActor => throw RestartActorException
  }
}

object RestartableActor {  
  case object RestartActor

  private object RestartActorException extends Exception
}

It adds handling for a RestartActor command which causes RestartActorException to be thrown.

The tests that use the newly created trait:

import akka.actor.{ActorSystem, Props}  
import akka.testkit.{ImplicitSender, TestKit}  
import com.ted.playground.akka.persistence.fixtures.RestartableActor  
import com.ted.playground.akka.persistence.fixtures.RestartableActor.RestartActor  
import com.ted.playground.akka.persistence.shopping.ShoppingCartActor._  
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}

class ShoppingCartActorSpec2  
  extends TestKit(ActorSystem("ShoppingCartActorSpec2"))
    with WordSpecLike
    with Matchers
    with BeforeAndAfterAll
    with ImplicitSender {

  override def afterAll: Unit = {
    TestKit.shutdownActorSystem(system)
  }

  "ShoppingCartActor" should {
    val shoppingItem = ShoppingItem("sku-000001", "Cheap headphones", 42.25, 2)

    "add an item to the shopping cart and preserve it after restart" in {
      val shoppingCartActor = system.actorOf(Props(new ShoppingCartActor("sc-000001") with RestartableActor))

      shoppingCartActor ! AddItemCommand(shoppingItem)
      expectMsg(AddItemResponse(shoppingItem))

      shoppingCartActor ! RestartActor
      shoppingCartActor ! GetItemsRequest

      expectMsg(GetItemsResponse(Seq(shoppingItem)))
    }

    "update an existing item to the shopping cart and preserve the changes after restart" in {
      val shoppingCartActor = system.actorOf(Props(new ShoppingCartActor("sc-000002") with RestartableActor))
      val updatedShoppingItem = shoppingItem.copy(quantity = 5)

      shoppingCartActor ! AddItemCommand(shoppingItem)
      expectMsg(AddItemResponse(shoppingItem))
      shoppingCartActor ! UpdateItemCommand(updatedShoppingItem)
      expectMsg(UpdateItemResponse(updatedShoppingItem))

      shoppingCartActor ! RestartActor
      shoppingCartActor ! GetItemsRequest

      expectMsg(GetItemsResponse(Seq(updatedShoppingItem)))
    }

    "remove an existing item from the shopping cart and preserve the changes after restart" in {
      val shoppingCartActor = system.actorOf(Props(new ShoppingCartActor("sc-000003") with RestartableActor))

      shoppingCartActor ! AddItemCommand(shoppingItem)
      expectMsg(AddItemResponse(shoppingItem))
      shoppingCartActor ! RemoveItemCommand(shoppingItem.id)
      expectMsg(RemoveItemResponse(shoppingItem.id))

      shoppingCartActor ! RestartActor
      shoppingCartActor ! GetItemsRequest

      expectMsg(GetItemsResponse(Seq.empty))
    }
  }
}

Conclusion

In this post I have shown how persistent actors can be tested. You can use either of the two approaches described above. It's up to you. If you use a different way please share in the comment section!

The full code is available on GitHub.

Happy persisting!

Tudor Zgureanu

Crafting software at Cake Solutions, Generalist and Scala Enthusiast. Passionate about Functional Programming and Distributed Systems. Professional Scrum Master certified (PSM I).

Manchester, United Kingdom

Subscribe to Tudor Zgureanu

Get the latest posts delivered right to your inbox.

or subscribe via RSS with Feedly!