Akka Persistence: Testing Persistent Actors
Akka persistence provides functionality for persisting the internal state of an actor so it can be recovered when the actor is started, restarted after a shutdown/crash or migrated in a cluster.
The main idea behind Akka persistence is Event Sourcing, where the individual changes to an actor are persisted as opposed to persisting the entire state itself (although snapshotting of the whole state can be enabled to speed-up recovery and/or allow for event deletion to save disk space). All the events are persisted in a journal, keyed by the actor's persistenceId
. After actor is started/restarted its internal state is rebuilt by replaying all of its persisted events.
In this post we are going to focus on testing persistent actors. There is more to say about Akka persistence (I might dedicate a blog post for that). Meanwhile, if you want to read more about Event Sourcing and Akka Persistence check out Martin Fowler's post on Event Sourcing and the official Akka documentation.
Example
Persistent actor implementation
Let's start with a simple implementation of a shopping cart actor. We use PersistentActor
to persist our actor's events:
class ShoppingCartActor(id: String) extends PersistentActor with ActorLogging {
private var state: Seq[ShoppingItem] = Seq.empty
override def persistenceId: String = id
override def receiveCommand: Receive = {
case AddItemCommand(item) =>
persist(ItemAdded(item)) { evt =>
state = applyEvent(evt)
sender() ! AddItemResponse(item)
}
case UpdateItemCommand(item) =>
persist(ItemUpdated(item)) { evt =>
state = applyEvent(evt)
sender() ! UpdateItemResponse(item)
}
case RemoveItemCommand(itemId) =>
persist(ItemRemoved(itemId)) { evt =>
state = applyEvent(evt)
sender() ! RemoveItemResponse(itemId)
}
case GetItemsRequest =>
sender() ! GetItemsResponse(state)
}
override def receiveRecover: Receive = {
case evt: ShoppingCartEvent => state = applyEvent(evt)
case RecoveryCompleted => log.info("Recovery completed!")
}
private def applyEvent(shoppingCartEvent: ShoppingCartEvent): Seq[ShoppingItem] = shoppingCartEvent match {
case ItemAdded(item) => item +: state
case ItemUpdated(item) => item +: state.filterNot(_.id == item.id)
case ItemRemoved(itemId) => state.filterNot(_.id == itemId)
}
}
ShoppingCart's protocol:
object ShoppingCartActor {
def props(id: String): Props = Props(new ShoppingCartActor(id))
//protocol
case class AddItemCommand(shoppingItem: ShoppingItem)
case class AddItemResponse(shoppingItem: ShoppingItem)
case class UpdateItemCommand(shoppingItem: ShoppingItem)
case class UpdateItemResponse(shoppingItem: ShoppingItem)
case class RemoveItemCommand(shoppingItemId: String)
case class RemoveItemResponse(shoppingItemId: String)
case object GetItemsRequest
case class GetItemsResponse(items: Seq[ShoppingItem])
// events
sealed trait ShoppingCartEvent
case class ItemAdded(shoppingItem: ShoppingItem) extends ShoppingCartEvent
case class ItemUpdated(shoppingItem: ShoppingItem) extends ShoppingCartEvent
case class ItemRemoved(shoppingItemId: String) extends ShoppingCartEvent
}
Testing persistent actors
The general pattern for testing persistent actors is pretty straightforward:
- Create the actor you want to test.
- Send commands to your actor. The actor will persist the events triggered by these commands.
- Restart the actor.
- Test the actor's state. It should be the same as before restarting it.
For testing persistent actors we will use akka-persistence-inmemory. This is a plugin for akka-persistence that stores journal and snapshot data in memory.
Usage is straightforward:
- Add the dependency to your
build.sbt
file:
resolvers += Resolver.jcenterRepo
libraryDependencies += "com.github.dnvriend" %% "akka-persistence-inmemory" % "1.3.7"
- Configure Akka persistence to use the
inmemory-journal
andinmemory-snapshot-store
plugins. Add the following configuration toapplication.conf
:
akka.persistence {
journal.plugin = "inmemory-journal"
snapshot-store.plugin = "inmemory-snapshot-store"
}
Triggering receiveRecover:
Important: It is not possible to use TestActorRef
for testing PersistentActor
or any other persistence provided classes (e.g. AtLeastOnceDelivery
) due to its synchronous nature. These traits perform asynchronous tasks in background in order to handle internal persistence related events. Therefore, when testing persistent actors you should rely on asynchronous messaging using TestKit
.
We will achieve this in two ways:
1. Stop the actor by sending a PoisonPill
. Next, create another actor with the same persistenceId
so it reads from the same journal when recovering.
Here are the tests for the first approach:
import akka.actor.{ActorSystem, PoisonPill}
import akka.testkit.{ImplicitSender, TestKit}
import com.ted.playground.akka.persistence.shopping.ShoppingCartActor._
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
class ShoppingCartActorSpec
extends TestKit(ActorSystem("ShoppingCartActorSpec"))
with WordSpecLike
with Matchers
with BeforeAndAfterAll
with ImplicitSender {
override def afterAll: Unit = {
TestKit.shutdownActorSystem(system)
}
"ShoppingCartActor" should {
val shoppingItem = ShoppingItem("sku-000001", "Cheap headphones", 42.25, 2)
"add an item to the shopping cart and preserve it after restart" in {
val shoppingCartId = "sc-000001"
val shoppingCartActor = system.actorOf(ShoppingCartActor.props(shoppingCartId))
shoppingCartActor ! AddItemCommand(shoppingItem)
expectMsg(AddItemResponse(shoppingItem))
shoppingCartActor ! PoisonPill
// creating a new actor with the same persistence id
val shoppingCartActor2 = system.actorOf(ShoppingCartActor.props(shoppingCartId))
shoppingCartActor2 ! GetItemsRequest
expectMsg(GetItemsResponse(Seq(shoppingItem)))
}
"update an existing item to the shopping cart and preserve the changes after restart" in {
val shoppingCartId = "sc-000002"
val shoppingCartActor = system.actorOf(ShoppingCartActor.props(shoppingCartId))
val updatedShoppingItem = shoppingItem.copy(quantity = 5)
shoppingCartActor ! AddItemCommand(shoppingItem)
expectMsg(AddItemResponse(shoppingItem))
shoppingCartActor ! UpdateItemCommand(updatedShoppingItem)
expectMsg(UpdateItemResponse(updatedShoppingItem))
shoppingCartActor ! PoisonPill
// creating a new actor with the same persistence id
val shoppingCartActor2 = system.actorOf(ShoppingCartActor.props(shoppingCartId))
shoppingCartActor2 ! GetItemsRequest
expectMsg(GetItemsResponse(Seq(updatedShoppingItem)))
}
"remove an existing item from the shopping cart and preserve the changes after restart" in {
val shoppingCartId = "sc-000003"
val shoppingCartActor = system.actorOf(ShoppingCartActor.props(shoppingCartId))
shoppingCartActor ! AddItemCommand(shoppingItem)
expectMsg(AddItemResponse(shoppingItem))
shoppingCartActor ! RemoveItemCommand(shoppingItem.id)
expectMsg(RemoveItemResponse(shoppingItem.id))
shoppingCartActor ! PoisonPill
// creating a new actor with the same persistence id
val shoppingCartActor2 = system.actorOf(ShoppingCartActor.props(shoppingCartId))
shoppingCartActor2 ! GetItemsRequest
expectMsg(GetItemsResponse(Seq.empty))
}
}
}
2. The second approach is slightly different. Restart the actor by throwing an exception. According to the default supervisor strategy the actor will be restarted. Thus, the persistenceId
will be the same and we won't have to create another actor.
To avoid any redundant functionality (read as mess) we won't add this behaviour directly into the actor. Instead we are going to mix in a small utility trait (called RestartableActor
) which will add this behaviour for us.
Note: We use this trait only in tests, not in the actor implementation code.
Here is what RestartableActor
looks like:
import akka.persistence.PersistentActor
import com.ted.playground.akka.persistence.fixtures.RestartableActor._
trait RestartableActor extends PersistentActor {
abstract override def receiveCommand = super.receiveCommand orElse {
case RestartActor => throw RestartActorException
}
}
object RestartableActor {
case object RestartActor
private object RestartActorException extends Exception
}
It adds handling for a RestartActor
command which causes RestartActorException
to be thrown.
The tests that use the newly created trait:
import akka.actor.{ActorSystem, Props}
import akka.testkit.{ImplicitSender, TestKit}
import com.ted.playground.akka.persistence.fixtures.RestartableActor
import com.ted.playground.akka.persistence.fixtures.RestartableActor.RestartActor
import com.ted.playground.akka.persistence.shopping.ShoppingCartActor._
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
class ShoppingCartActorSpec2
extends TestKit(ActorSystem("ShoppingCartActorSpec2"))
with WordSpecLike
with Matchers
with BeforeAndAfterAll
with ImplicitSender {
override def afterAll: Unit = {
TestKit.shutdownActorSystem(system)
}
"ShoppingCartActor" should {
val shoppingItem = ShoppingItem("sku-000001", "Cheap headphones", 42.25, 2)
"add an item to the shopping cart and preserve it after restart" in {
val shoppingCartActor = system.actorOf(Props(new ShoppingCartActor("sc-000001") with RestartableActor))
shoppingCartActor ! AddItemCommand(shoppingItem)
expectMsg(AddItemResponse(shoppingItem))
shoppingCartActor ! RestartActor
shoppingCartActor ! GetItemsRequest
expectMsg(GetItemsResponse(Seq(shoppingItem)))
}
"update an existing item to the shopping cart and preserve the changes after restart" in {
val shoppingCartActor = system.actorOf(Props(new ShoppingCartActor("sc-000002") with RestartableActor))
val updatedShoppingItem = shoppingItem.copy(quantity = 5)
shoppingCartActor ! AddItemCommand(shoppingItem)
expectMsg(AddItemResponse(shoppingItem))
shoppingCartActor ! UpdateItemCommand(updatedShoppingItem)
expectMsg(UpdateItemResponse(updatedShoppingItem))
shoppingCartActor ! RestartActor
shoppingCartActor ! GetItemsRequest
expectMsg(GetItemsResponse(Seq(updatedShoppingItem)))
}
"remove an existing item from the shopping cart and preserve the changes after restart" in {
val shoppingCartActor = system.actorOf(Props(new ShoppingCartActor("sc-000003") with RestartableActor))
shoppingCartActor ! AddItemCommand(shoppingItem)
expectMsg(AddItemResponse(shoppingItem))
shoppingCartActor ! RemoveItemCommand(shoppingItem.id)
expectMsg(RemoveItemResponse(shoppingItem.id))
shoppingCartActor ! RestartActor
shoppingCartActor ! GetItemsRequest
expectMsg(GetItemsResponse(Seq.empty))
}
}
}
Conclusion
In this post I have shown how persistent actors can be tested. You can use either of the two approaches described above. It's up to you. If you use a different way please share in the comment section!
The full code is available on GitHub.
Happy persisting!