Demystifying Akka Extensions
When it comes to adding features to Akka, there is an elegant way to do that by using a very handy extension mechanism called Akka Extensions.
Adding a custom extension requires implementing two basic components: Extension
and ExtensionId
. There are many examples of extensions that will bring your application and development to a new level (Cluster
and ClusterSharding
just to name a few of the powerful extensions that are provided by Akka).
Extensions are loaded once per ActorSystem
, either on-demand or at ActorSystem
creation time. By default extensions are loaded on-demand. For loading extensions at actor system creation time you should specify that in the configuration file. We will go through this a bit further.
Sample extension
We are going to implement an extension which loads and stores the configuration for a sample application which uses Apache Kafka. This is a typical example where Akka Extensions is used.
Let's define a simple configuration file:
application.name = "sample-application"
kafka {
producer {
bootstrap.servers = "localhost:9092"
acks = "all"
retries = 0
batch.size = 16834
linger.ms = 1
buffer.memory = 33554432
}
consumer {
bootstrap.servers = "localhost:9092"
group.id = "sample-application"
topics = ["sample-topic"]
}
}
Implementation
Now we can implement our settings extension. As previously mentioned, we have to implement two components: Extension
and ExtensionId
.
Here is what the extension looks like:
import akka.actor.{Actor, ExtendedActorSystem, Extension, ExtensionId}
import com.typesafe.config.Config
import scala.collection.JavaConverters._
class SettingsExtensionImpl(config: Config) extends Extension {
object application {
private val applicationConfig = config.getConfig("application")
val name = applicationConfig.getString("name")
}
object kafka {
private val kafkaConfig = config.getConfig("kafka")
object producer {
private val producerConfig = kafkaConfig.getConfig("producer")
val bootstrapServers = producerConfig.getString("bootstrap.servers")
val acks = producerConfig.getString("acks")
val retries = producerConfig.getInt("retries")
val batchSize = producerConfig.getInt("batch.size")
val lingerMs = producerConfig.getInt("linger.ms")
val bufferMemory = producerConfig.getInt("buffer.memory")
}
object consumer {
private val consumerConfig = kafkaConfig.getConfig("consumer")
val bootstrapServers = consumerConfig.getString("bootstrap.servers")
val groupId = consumerConfig.getString("group.id")
val topics = consumerConfig.getStringList("topics").asScala.toList
}
}
}
object SettingsExtension extends ExtensionId[SettingsExtensionImpl] {
override def createExtension(system: ExtendedActorSystem) = new SettingsExtensionImpl(system.settings.config)
}
Usage
Using the extension is straightforward. Just call the apply
method on the earlier defined extension: SettingsExtension(context.system)
.
For convenience I have added a SettingsActor
which we are going to use inside the Actors:
trait SettingsActor {
_: Actor =>
val settings = SettingsExtension(context.system)
}
This is what ConsumerActor
looks like after mixing in the SettingsActor
:
import akka.actor.Actor
import cakesolutions.kafka.KafkaConsumer
import cakesolutions.kafka.akka.KafkaConsumerActor
import cakesolutions.kafka.akka.KafkaConsumerActor.{Subscribe, Unsubscribe}
import com.ted.akka.extensions.SettingsActor
import org.apache.kafka.common.serialization.StringDeserializer
class ConsumerActor extends Actor with SettingsActor {
import settings.kafka.consumer._
val kafkaConsumerActor = context.actorOf(
KafkaConsumerActor.props(
consumerConf = KafkaConsumer.Conf(
keyDeserializer = new StringDeserializer,
valueDeserializer = new StringDeserializer,
bootstrapServers = bootstrapServers,
groupId = groupId
),
actorConf = KafkaConsumerActor.Conf(),
downstreamActor = self))
override def preStart() = {
super.preStart()
kafkaConsumerActor ! Subscribe.AutoPartition(topics)
}
override def postStop = {
kafkaConsumerActor ! Unsubscribe
super.postStop()
}
override def receive: Receive = // some message processing
}
And the ProducerActor
:
import akka.actor.Actor
import cakesolutions.kafka.KafkaProducer.Conf
import cakesolutions.kafka.akka.KafkaProducerActor
import com.ted.akka.extensions.SettingsActor
import org.apache.kafka.common.serialization.StringSerializer
class ProducerActor extends Actor with SettingsActor {
import settings.kafka.producer._
val kafkaProducerActor = context.actorOf(
KafkaProducerActor.props(
Conf(
keySerializer = new StringSerializer,
valueSerializer = new StringSerializer,
bootstrapServers = bootstrapServers,
acks = acks,
retries = retries,
batchSize = batchSize,
lingerMs = lingerMs,
bufferMemory = bufferMemory
)))
override def receive: Receive = // some message processing
}
Testing
Testing SettingsExtension
is very easy:
import akka.actor.ActorSystem
import akka.testkit.TestKit
import org.scalatest.{BeforeAndAfterAll, FunSuiteLike, Matchers}
class SettingsExtensionTest
extends TestKit(ActorSystem("SettingsExtensionTest"))
with FunSuiteLike
with BeforeAndAfterAll
with Matchers {
override def afterAll: Unit = {
TestKit.shutdownActorSystem(system)
}
val settings = SettingsExtension(system)
test("SettingsExtension to return the correct config values for application") {
val application = settings.application
application.name shouldBe "sample-application"
}
test("SettingsExtension to return the correct config values for kafka.producer") {
val kakfaProducer = settings.kafka.producer
kakfaProducer.bootstrapServers shouldBe "localhost:9092"
kakfaProducer.acks shouldBe "all"
kakfaProducer.batchSize shouldBe 16834
kakfaProducer.bufferMemory shouldBe 33554432
kakfaProducer.retries shouldBe 0
kakfaProducer.lingerMs shouldBe 1
}
test("SettingsExtension to return the correct config values for kafka.consumer") {
val kafkaConsumer = settings.kafka.consumer
kafkaConsumer.bootstrapServers shouldBe "localhost:9092"
kafkaConsumer.groupId shouldBe "sample-application"
kafkaConsumer.topics shouldBe List("sample-topic")
}
}
Loading extensions on ActorSystem creation time
For extensions that we want to load at the ActorSystem creation time we should specify FQCNs (fully qualified class name) of implementations of ExtensionId
in the configuration file:
// loads the extension at ActorSystem creation time
akka.extensions = ["com.ted.akka.extensions.SettingsExtension"]
Alternatively, we can define a class
/object
that implements ExtensionIdProvider
and specify its FQCN instead.
Note: A third party library may register its extension to be loaded at ActorSystem creation time. This is achieved by appending the extension's FQCN to akka.library-extensions
in the reference.conf
file:
akka.library-extensions += "com.example.extension.ExampleExtension"
Conclusion
In this blog post we went through a brief introduction to Akka Extensions. It is an elegant, yet powerful way to define reusable features for applications written in Akka, which should be definitely considered.
The complete code of the settings extension defined in this blog post is available on github.
Happy hakking!