Note
This document describes the design concepts of the clustering.
Akka Cluster provides a fault-tolerant decentralized peer-to-peer based cluster membership service with no single point of failure or single point of bottleneck. It does this using gossip protocols and an automatic failure detector.
A
cluster is made up of a set of member nodes. The identifier for each
node is a hostname:port:uid
tuple. An Akka application
can be distributed over a cluster with each node hosting some part of
the application. Cluster membership and the actors running on that node
of the application are decoupled. A node could be a member of a cluster
without hosting any actors. Joining a cluster is initiated by issuing a Join
command to one of the nodes
in the cluster to join.
The
node identifier internally also contains a UID that uniquely identifies
this actor system instance at thathostname:port
.
Akka uses the UID to be able to reliably trigger remote death watch.
This means that the same actor system can never join a cluster again
once it's been removed from that cluster. To re-join an actor system
with the samehostname:port
to a cluster you have to
stop the actor system and start a new one with the same hostname:port
which
will then receive a different UID.
The cluster membership state is a specialized CRDT, which means that it has a monotonic merge function. When concurrent changes occur on different nodes the updates can always be merged and converge to the same end result.
The cluster membership used in Akka is based on Amazon's Dynamo system and particularly the approach taken in Basho's' Riak distributed database. Cluster membership is communicated using a Gossip Protocol, where the current state of the cluster is gossiped randomly through the cluster, with preference to members that have not seen the latest version.
Vector clocks are a type of data structure and algorithm for generating a partial ordering of events in a distributed system and detecting causality violations.
We use vector clocks to reconcile and merge differences in cluster state during gossiping. A vector clock is a set of (node, counter) pairs. Each update to the cluster state has an accompanying update to the vector clock.
Information about the cluster converges locally at a node at certain points in time. This is when a node can prove that the cluster state he is observing has been observed by all other nodes in the cluster. Convergence is implemented by passing a set of nodes that have seen current state version during gossip. This information is referred to as the seen set in the gossip overview. When all nodes are included in the seen set there is convergence.
Gossip
convergence cannot occur while any nodes are unreachable
.
The nodes need to become reachable
again, or moved to the down
and removed
states (see the Membership
Lifecycle section
below). This only blocks the leader from performing its cluster
membership management and does not influence the application running
on top of the cluster. For example this means that during a network
partition it is not possible to add more nodes to the cluster. The
nodes can join, but they will not be moved to the up
state until the
partition has healed or the unreachable nodes have been downed.
The
failure detector is responsible for trying to detect if a node is unreachable
from the rest of the
cluster. For this we are using an implementation of The Phi
Accrual Failure Detector by
Hayashibara et al.
An
accrual failure detector decouple monitoring and interpretation.
That makes them applicable to a wider area of scenarios and more
adequate to build generic failure detection services. The idea is
that it is keeping a history of failure statistics, calculated from
heartbeats received from other nodes, and is trying to do educated
guesses by taking multiple factors, and how they accumulate over
time, into account in order to come up with a better guess if a
specific node is up or down. Rather than just answering "yes" or
"no" to the question "is the node down?" it returns a phi
value representing the
likelihood that the node is down.
The threshold
that is the basis for
the calculation is configurable by the user. A low threshold
is prone to generate
many wrong suspicions but ensures a quick detection in the event of
a real crash. Conversely, a high threshold
generates
fewer mistakes but needs more time to detect actual crashes. The
default threshold
is 8 and is appropriate
for most situations. However in cloud environments, such as Amazon
EC2, the value could be increased to 12 in order to account for
network issues that sometimes occur on such platforms.
In
a cluster each node is monitored by a few (default maximum 5) other
nodes, and when any of these detects the node as unreachable
that information will
spread to the rest of the cluster through the gossip. In other
words, only one node needs to mark a node unreachable
to have the rest of the
cluster mark that node unreachable
.
The nodes to monitor are picked out of neighbors in a hashed ordered node ring. This is to increase the likelihood to monitor across racks and data centers, but the order is the same on all nodes, which ensures full coverage.
Heartbeats are sent out every second and every heartbeat is performed in a request/reply handshake with the replies used as input to the failure detector.
The
failure detector will also detect if the node becomes reachable
again. When all nodes
that monitored theunreachable
node detects it as reachable
again the cluster,
after gossip dissemination, will consider it asreachable
.
If
system messages cannot be delivered to a node it will be quarantined
and then it cannot come back fromunreachable
.
This can happen if the there are too many unacknowledged system
messages (e.g. watch, Terminated, remote actor deployment, failures
of actors supervised by remote parent). Then the node needs to be
moved to thedown
or removed
states (see the Membership
Lifecycle section
below) and the actor system must be restarted before it can join the
cluster again.
After
gossip convergence a leader
for the cluster can be
determined. There is no leader
election process, theleader
can always be
recognised deterministically by any node whenever there is gossip
convergence. The leader is just a role, any node can be the leader
and it can change between convergence rounds. The leader
is simply the first
node in sorted order that is able to take the leadership role, where
the preferred member states for a leader
are up
and leaving
(see the Membership
Lifecycle section
below for more information about member states).
The
role of the leader
is to shift members in
and out of the cluster, changing joining
members to the up
state orexiting
members to the removed
state. Currently leader
actions are only
triggered by receiving a new cluster state with gossip convergence.
The leader
also has the power, if
configured so, to "auto-down" a node that according to the Failure
Detector is
considered unreachable
.
This means setting the unreachable
node status to down
automatically after a
configured time of unreachability.
The seed nodes are configured contact points for new nodes joining the cluster. When a new node is started it sends a message to all seed nodes and then sends a join command to the seed node that answers first.
The seed nodes configuration value does not have any influence on the running cluster itself, it is only relevant for new nodes joining the cluster as it helps them to find contact points to send the join command to; a new member can send this command to any current member of the cluster, not only to the seed nodes.
A variation of push-pull gossip is used to reduce the amount of gossip information sent around the cluster. In push-pull gossip a digest is sent representing current versions but not actual values; the recipient of the gossip can then send back any values for which it has newer versions and also request values for which it has outdated versions. Akka uses a single shared state with a vector clock for versioning, so the variant of push-pull gossip used in Akka makes use of this version to only push the actual state as needed.
Periodically, the default is every 1 second, each node chooses another random node to initiate a round of gossip with. If less than ½ of the nodes resides in the seen set (have seen the new state) then the cluster gossips 3 times instead of once every second. This adjusted gossip interval is a way to speed up the convergence process in the early dissemination phase after a state change.
The choice of node to gossip with is random but it is biased to towards nodes that might not have seen the current state version. During each round of gossip exchange when no convergence it uses a probability of 0.8 (configurable) to gossip to a node not part of the seen set, i.e. that probably has an older version of the state. Otherwise gossip to any random live node.
This biased selection is a way to speed up the convergence process in the late dissemination phase after a state change.
For clusters larger than 400 nodes (configurable, and suggested by empirical evidence) the 0.8 probability is gradually reduced to avoid overwhelming single stragglers with too many concurrent gossip requests. The gossip receiver also has a mechanism to protect itself from too many simultaneous gossip messages by dropping messages that have been enqueued in the mailbox for too long time.
While the cluster is in a converged state the gossiper only sends a small gossip status message containing the gossip version to the chosen node. As soon as there is a change to the cluster (meaning non-convergence) then it goes back to biased gossip again.
The recipient of the gossip state or the gossip status can use the gossip version (vector clock) to determine whether:
If the recipient and the gossip have the same version then the gossip state is not sent or requested.
The periodic nature of the gossip has a nice batching effect of state changes, e.g. joining several nodes quickly after each other to one node will result in only one state change to be spread to other members in the cluster.
The gossip messages are serialized with protobuf and also gzipped to reduce payload size.
A
node begins in the joining
state. Once all nodes
have seen that the new node is joining (through gossip convergence)
the leader
will set the member state
to up
.
If
a node is leaving the cluster in a safe, expected manner then it
switches to the leaving
state. Once the leader
sees the convergence on the node in the leaving
state, the leader will
then move it to exiting
.
Once all nodes have seen the exiting state (convergence) the leader
will remove the node from
the cluster, marking it as removed
.
If
a node is unreachable
then gossip convergence
is not possible and therefore any leader
actions are also not
possible (for instance, allowing a node to become a part of the
cluster). To be able to move forward the state of theunreachable
nodes must be changed. It
must become reachable
again or marked as down
.
If the node is to join the cluster again the actor system must be
restarted and go through the joining process again. The cluster can,
through the leader, also auto-down a
node after a configured time of unreachability. If new incarnation of
unreachable node tries to rejoin the cluster old incarnation will be
marked as down
and new incarnation can
rejoin the cluster without manual intervention.
Note
If
you have auto-down enabled and the failure
detector triggers, you can over time end up with a lot of single
node clusters if you don't put measures in place to shut down nodes
that have become unreachable
.
This follows from the fact that the unreachable
node will likely see
the rest of the cluster as unreachable
,
become its own leader and form its own cluster.
As
mentioned before, if a node is unreachable
then gossip convergence
is not possible and therefore any leader
actions
are also not possible. By enabling akka.cluster.allow-weakly-up-members
it is possible to let new
joining nodes be promoted while convergence is not yet reached. These Joining
nodes will be promoted as WeaklyUp
.
Once gossip convergence is reached, the leader will move WeaklyUp
members to Up
.
Note
that members on the other side of a network partition have no
knowledge about the existence of the new members. You should for
example not count WeaklyUp
members in quorum
decisions.
akka.cluster.allow-weakly-up-members=off
)akka.cluster.allow-weakly-up-members=on
)transient state when joining a cluster
transient
state while network split (only if akka.cluster.allow-weakly-up-members=on
)
normal operating state
states during graceful removal
marked as down (no longer part of cluster decisions)
tombstone state (no longer a member)
join a single node to a cluster - can be explicit or automatic on startup if a node to join have been specified in the configuration
tell a node to leave the cluster gracefully
mark a node as down
The leader
has the following
duties:
the failure detector of one of the monitoring nodes has triggered causing the monitored node to be marked as unreachable
unreachable is not a real member states but more of a flag in addition to the state signaling that the cluster is unable to talk to this node, after being unreachable the failure detector may detect it as reachable again and thereby remove the flag
For introduction to the Akka Cluster concepts please see Cluster Specification.
The Akka cluster is a separate jar file. Make sure that you have the following dependency in your project:
- "com.typesafe.akka" %% "akka-cluster" % "2.4.10"
The
following configuration enables the Cluster
extension to be used. It
joins the cluster and an actor subscribes to cluster membership events
and logs them.
The application.conf
configuration looks like
this:
- akka {
- actor {
- provider = "akka.cluster.ClusterActorRefProvider"
- }
- remote {
- log-remote-lifecycle-events = off
- netty.tcp {
- hostname = "127.0.0.1"
- port = 0
- }
- }
-
- cluster {
- seed-nodes = [
- "akka.tcp://ClusterSystem@127.0.0.1:2551",
- "akka.tcp://ClusterSystem@127.0.0.1:2552"]
-
- # auto downing is NOT safe for production deployments.
- # you may want to use it during development, read more about it in the docs.
- #
- # auto-down-unreachable-after = 10s
- }
- }
-
- # Disable legacy metrics in akka-cluster.
- akka.cluster.metrics.enabled=off
-
- # Enable metrics extension in akka-cluster-metrics.
- akka.extensions=["akka.cluster.metrics.ClusterMetricsExtension"]
-
- # Sigar native library extract location during tests.
- # Note: use per-jvm-instance folder when running multiple jvm on one host.
- akka.cluster.metrics.native-library-extract-folder=${user.dir}/target/native
To
enable cluster capabilities in your Akka project you should, at a
minimum, add the Remoting settings, but withakka.cluster.ClusterActorRefProvider
.
The akka.cluster.seed-nodes
should normally also be
added to your application.conf
file.
Note
If you are running Akka in a Docker container or the nodes for some other reason have separate internal and external ip addresses you must configure remoting according to Akka behind NAT or in a Docker container
The seed nodes are configured contact points for initial, automatic, join of the cluster.
Note
that if you are going to start the nodes on different machines you need
to specify the ip-addresses or host names of the machines in application.conf
instead of 127.0.0.1
An actor that uses the cluster extension may look like this:
- package sample.cluster.simple
-
- import akka.cluster.Cluster
- import akka.cluster.ClusterEvent._
- import akka.actor.ActorLogging
- import akka.actor.Actor
-
- class SimpleClusterListener extends Actor with ActorLogging {
-
- val cluster = Cluster(context.system)
-
- // subscribe to cluster changes, re-subscribe when restart
- override def preStart(): Unit = {
- //#subscribe
- cluster.subscribe(self, initialStateMode = InitialStateAsEvents,
- classOf[MemberEvent], classOf[UnreachableMember])
- //#subscribe
- }
- override def postStop(): Unit = cluster.unsubscribe(self)
-
- def receive = {
- case MemberUp(member) =>
- log.info("Member is Up: {}", member.address)
- case UnreachableMember(member) =>
- log.info("Member detected as unreachable: {}", member)
- case MemberRemoved(member, previousStatus) =>
- log.info("Member is Removed: {} after {}",
- member.address, previousStatus)
- case _: MemberEvent => // ignore
- }
- }
The actor registers itself as subscriber of certain cluster events. It receives events corresponding to the current state of the cluster when the subscription starts and then it receives events for changes that happen in the cluster.
The
easiest way to run this example yourself is to download Lightbend
Activator and open
the tutorial named Akka Cluster
Samples with Scala. It contains instructions of how to run the SimpleClusterApp
.
You may decide if joining to the cluster should be done manually or automatically to configured initial contact points, so-called seed nodes. When a new node is started it sends a message to all seed nodes and then sends join command to the one that answers first. If no one of the seed nodes replied (might not be started yet) it retries this procedure until successful or shutdown.
You define the seed nodes in the Configuration file (application.conf):
- akka.cluster.seed-nodes = [
- "akka.tcp://ClusterSystem@host1:2552",
- "akka.tcp://ClusterSystem@host2:2552"]
This can also be defined as Java system properties when starting the JVM using the following syntax:
- -Dakka.cluster.seed-nodes.0=akka.tcp://ClusterSystem@host1:2552
- -Dakka.cluster.seed-nodes.1=akka.tcp://ClusterSystem@host2:2552
The
seed nodes can be started in any order and it is not necessary to have
all seed nodes running, but the node configured as the first element in
the seed-nodes
configuration list must be
started when initially starting a cluster, otherwise the other
seed-nodes will not become initialized and no other node can join the
cluster. The reason for the special first seed node is to avoid forming
separated islands when starting from an empty cluster. It is quickest to
start all configured seed nodes at the same time (order doesn't matter),
otherwise it can take up to the configured seed-node-timeout
until the nodes can join.
Once more than two seed nodes have been started it is no problem to shut down the first seed node. If the first seed node is restarted, it will first try to join the other seed nodes in the existing cluster.
If you don't configure seed nodes you need to join the cluster programmatically or manually.
Manual
joining can be performed by using ref:cluster_jmx_scala or Command
Line Management. Joining programmatically can be
performed with Cluster(system).join
.
Unsuccessful join attempts are automatically retried after the time
period defined in configuration property retry-unsuccessful-join-after
.
Retries can be disabled by setting the property to off
.
You can join to any node in the cluster. It does not have to be configured as a seed node. Note that you can only join to an existing cluster member, which means that for bootstrapping some node must join itself,and then the following nodes could join them to make up a cluster.
You
may also use Cluster(system).joinSeedNodes
to join programmatically,
which is attractive when dynamically discovering other nodes at startup
by using some external tool or API. When using joinSeedNodes
you should not include the
node itself except for the node that is supposed to be the first seed
node, and that should be placed first in parameter to joinSeedNodes
.
Unsuccessful
attempts to contact seed nodes are automatically retried after the time
period defined in configuration property seed-node-timeout
.
Unsuccessful attempt to join a specific seed node is automatically
retried after the configured retry-unsuccessful-join-after
.
Retrying means that it tries to contact all seed nodes and then joins
the node that answers first. The first node in the list of seed nodes
will join itself if it cannot contact any of the other seed nodes within
the configured seed-node-timeout
.
An actor system can only join a cluster once. Additional attempts will be ignored. When it has successfully joined it must be restarted to be able to join another cluster or to join the same cluster again.It can use the same host name and port after the restart, when it come up as new incarnation of existing member in the cluster, trying to join in, then the existing one will be removed from the cluster and then it will be allowed to join.
Note
The
name of the ActorSystem
must be the same for all
members of a cluster. The name is given when you start the ActorSystem
.
When a member is considered by the failure detector to be unreachable the leader is not allowed to perform its duties, such as changing status of new joining members to 'Up'. The node must first become reachable again, or the status of the unreachable member must be changed to 'Down'. Changing status to 'Down' can be performed automatically or manually. By default it must be done manually, using JMX or Command Line Management.
It
can also be performed programmatically with Cluster(system).down(address)
.
A pre-packaged solution for the downing problem is provided by Split Brain Resolver, which is part of the Lightbend Reactive Platform. If you don’t use RP, you should anyway carefully read the documentation of the Split Brain Resolver and make sure that the solution you are using handles the concerns described there.
There is an atomatic downing feature that you should not use in production. For testing purpose you can enable it with configuration:
- akka.cluster.auto-down-unreachable-after = 120s
This
means that the cluster leader member will change the unreachable
node status to down
automatically after the
configured time of unreachability.
This is a naïve approach to remove unreachable nodes from the cluster membership. It works great for crashes and short transient network partitions, but not for long network partitions. Both sides of the network partition will see the other side as unreachable and after a while remove it from its cluster membership. Since this happens on both sides the result is that two separate disconnected clusters have been created. This can also happen because of long GC pauses or system overload.
Warning
We recommend against using the auto-down feature of Akka Cluster in production. This is crucial for correct behavior if you use Cluster Singleton or Cluster Sharding, especially together with Akka Persistence. For Akka Persistence with Cluster Sharding it can result in corrupt data in case of network partitions.
There are two ways to remove a member from the cluster.
You can just stop the actor system (or the JVM process). It will be detected as unreachable and removed after the automatic or manual downing as described above.
A more graceful exit can be performed if you tell the cluster that a node shall leave. This can be performed using JMX orCommand Line Management. It can also be performed programmatically with:
- val cluster = Cluster(system)
- cluster.leave(cluster.selfAddress)
Note
that this command can be issued to any member in the cluster, not
necessarily the one that is leaving. The cluster extension, but not the
actor system or JVM, of the leaving member will be shutdown after the
leader has changed status of the member to Exiting. Thereafter the member will
be removed from the cluster. Normally this is handled automatically, but
in case of network failures during this process it might still be
necessary to set the node’s status toDown
in order to complete the
removal.
If
a node is unreachable
then gossip convergence is
not possible and therefore any leader
actions are also not
possible. However, we still might want new nodes to join the cluster in
this scenario.
Warning
The WeaklyUp feature is marked as “experimental” as of its introduction in Akka 2.4.0. We will continue to improve this feature based on our users’ feedback, which implies that while we try to keep incompatible changes to a minimum the binary compatibility guarantee for maintenance releases does not apply this feature.
This feature is disabled by default. With a configuration option you can allow this behavior:
- akka.cluster.allow-weakly-up-members = on
When allow-weakly-up-members
is enabled and there is no
gossip convergence, Joining
members will be promoted to WeaklyUp
and they will become part
of the cluster. Once gossip convergence is reached, the leader will move WeaklyUp
members to Up
.
You
can subscribe to the WeaklyUp
membership event to make
use of the members that are in this state, but you should be aware of
that members on the other side of a network partition have no knowledge
about the existence of the new members. You should for example not count WeaklyUp
members in quorum
decisions.
Warning
This feature is only available from Akka 2.4.0 and cannot be used if some of your cluster members are running an older version of Akka.
You
can subscribe to change notifications of the cluster membership by using Cluster(system).subscribe
.
- cluster.subscribe(self, classOf[MemberEvent], classOf[UnreachableMember])
A
snapshot of the full state, akka.cluster.ClusterEvent.CurrentClusterState
,
is sent to the subscriber as the first message, followed by events for
incremental updates.
Note
that you may receive an empty CurrentClusterState
,
containing no members, if you start the subscription before the initial
join procedure has completed. This is expected behavior. When the node
has been accepted in the cluster you will receive MemberUp
for that node, and other
nodes.
If
you find it inconvenient to handle the CurrentClusterState
you can use ClusterEvent.InitialStateAsEvents
as
parameter to subscribe
.
That means that instead of receiving CurrentClusterState
as the first message you
will receive the events corresponding to the current state to mimic what
you would have seen if you were listening to the events when they
occurred in the past. Note that those initial events only correspond to
the current state and it is not the full history of all changes that
actually has occurred in the cluster.
- cluster.subscribe(self, initialStateMode = InitialStateAsEvents,
- classOf[MemberEvent], classOf[UnreachableMember])
The events to track the life-cycle of members are:
ClusterEvent.MemberJoined
- A new member has joined
the cluster and its status has been changed to Joining
.ClusterEvent.MemberUp
- A new member has joined
the cluster and its status has been changed to Up
.ClusterEvent.MemberExited
- A member is leaving the
cluster and its status has been changed to Exiting
Note that the node might
already have been shutdown when this event is published on another
node.ClusterEvent.MemberRemoved
- Member completely
removed from the cluster.ClusterEvent.UnreachableMember
- A member is considered
as unreachable, detected by the failure detector of at least one other
node.ClusterEvent.ReachableMember
- A member is considered
as reachable again, after having been unreachable. All nodes that
previously detected it as unreachable has detected it as reachable
again.There
are more types of change events, consult the API documentation of
classes that extendsakka.cluster.ClusterEvent.ClusterDomainEvent
for details about the
events.
Instead
of subscribing to cluster events it can sometimes be convenient to only
get the full membership state withCluster(system).state
.
Note that this state is not necessarily in sync with the events
published to a cluster subscription.
Let's take a look at an example that illustrates how workers, here named backend, can detect and register to new master nodes, here named frontend.
The example application provides a service to transform text. When some text is sent to one of the frontend services, it will be delegated to one of the backend workers, which performs the transformation job, and sends the result back to the original client. New backend nodes, as well as new frontend nodes, can be added or removed to the cluster dynamically.
Messages:
- final case class TransformationJob(text: String)
- final case class TransformationResult(text: String)
- final case class JobFailed(reason: String, job: TransformationJob)
- case object BackendRegistration
The backend worker that performs the transformation job:
- class TransformationBackend extends Actor {
-
- val cluster = Cluster(context.system)
-
- // subscribe to cluster changes, MemberUp
- // re-subscribe when restart
- override def preStart(): Unit = cluster.subscribe(self, classOf[MemberUp])
- override def postStop(): Unit = cluster.unsubscribe(self)
-
- def receive = {
- case TransformationJob(text) => sender() ! TransformationResult(text.toUpperCase)
- case state: CurrentClusterState =>
- state.members.filter(_.status == MemberStatus.Up) foreach register
- case MemberUp(m) => register(m)
- }
-
- def register(member: Member): Unit =
- if (member.hasRole("frontend"))
- context.actorSelection(RootActorPath(member.address) / "user" / "frontend") !
- BackendRegistration
- }
Note
that the TransformationBackend
actor subscribes to
cluster events to detect new, potential, frontend nodes, and send them
a registration message so that they know that they can use the backend
worker.
The frontend that receives user jobs and delegates to one of the registered backend workers:
- class TransformationFrontend extends Actor {
-
- var backends = IndexedSeq.empty[ActorRef]
- var jobCounter = 0
-
- def receive = {
- case job: TransformationJob if backends.isEmpty =>
- sender() ! JobFailed("Service unavailable, try again later", job)
-
- case job: TransformationJob =>
- jobCounter += 1
- backends(jobCounter % backends.size) forward job
-
- case BackendRegistration if !backends.contains(sender()) =>
- context watch sender()
- backends = backends :+ sender()
-
- case Terminated(a) =>
- backends = backends.filterNot(_ == a)
- }
- }
Note
that the TransformationFrontend
actor watch the
registered backend to be able to remove it from its list of available
backend workers. Death watch uses the cluster failure detector for
nodes in the cluster, i.e. it detects network failures and JVM
crashes, in addition to graceful termination of watched actor. Death
watch generates the Terminated
message
to the watching actor when the unreachable cluster node has been
downed and removed.
The Lightbend Activator tutorial named Akka Cluster Samples with Scala. contains the full source code and instructions of how to run the Worker Dial-in Example.
Not all nodes of a cluster need to perform the same function: there might be one sub-set which runs the web front-end, one which runs the data access layer and one for the number-crunching. Deployment of actors—for example by cluster-aware routers—can take node roles into account to achieve this distribution of responsibilities.
The
roles of a node is defined in the configuration property named akka.cluster.roles
and it is typically defined
in the start script as a system property or environment variable.
The
roles of the nodes is part of the membership information in MemberEvent
that you can subscribe to.
A common use case is to start actors after the cluster has been initialized, members have joined, and the cluster has reached a certain size.
With a configuration option you can define required number of members before the leader changes member status of 'Joining' members to 'Up'.
- akka.cluster.min-nr-of-members = 3
In a similar way you can define required number of members of a certain role before the leader changes member status of 'Joining' members to 'Up'.
- akka.cluster.role {
- frontend.min-nr-of-members = 1
- backend.min-nr-of-members = 2
- }
You
can start the actors in a registerOnMemberUp
callback, which will be
invoked when the current member status is changed to 'Up', i.e. the
cluster has at least the defined number of members.
- Cluster(system) registerOnMemberUp {
- system.actorOf(Props(classOf[FactorialFrontend], upToN, true),
- name = "factorialFrontend")
- }
This callback can be used for other things than starting actors.
You
can do some clean up in a registerOnMemberRemoved
callback, which will be
invoked when the current member status is changed to 'Removed' or the
cluster have been shutdown.
For
example, this is how to shut down the ActorSystem
and thereafter exit the
JVM:
- Cluster(system).registerOnMemberRemoved {
- // exit JVM when ActorSystem has been terminated
- system.registerOnTermination(System.exit(0))
- // shut down ActorSystem
- system.terminate()
-
- // In case ActorSystem shutdown takes longer than 10 seconds,
- // exit the JVM forcefully anyway.
- // We must spawn a separate thread to not block current thread,
- // since that would have blocked the shutdown of the ActorSystem.
- new Thread {
- override def run(): Unit = {
- if (Try(Await.ready(system.whenTerminated, 10.seconds)).isFailure)
- System.exit(-1)
- }
- }.start()
- }
Note
Register a OnMemberRemoved callback on a cluster that have been shutdown, the callback will be invoked immediately on the caller thread, otherwise it will be invoked later when the current member status changed to 'Removed'. You may want to install some cleanup handling after the cluster was started up, but the cluster might already be shutting down when you installing, and depending on the race is not healthy.
For some use cases it is convenient and sometimes also mandatory to ensure that you have exactly one actor of a certain type running somewhere in the cluster.
This can be implemented by subscribing to member events, but there are several corner cases to consider. Therefore, this specific use case is made easily accessible by the Cluster Singleton.
Distributes actors across several nodes in the cluster and supports interaction with the actors using their logical identifier, but without having to care about their physical location in the cluster.
See Cluster Sharding
Publish-subscribe messaging between actors in the cluster, and point-to-point messaging using the logical path of the actors, i.e. the sender does not have to know on which node the destination actor is running.
Communication from an actor system that is not part of the cluster to actors running somewhere in the cluster. The client does not have to know on which node the destination actor is running.
See Cluster Client.
Akka Distributed Data is useful when you need to share data between nodes in an Akka Cluster. The data is accessed with an actor providing a key-value store like API.
See Distributed Data.
In
a cluster each node is monitored by a few (default maximum 5) other
nodes, and when any of these detects the node as unreachable
that information will
spread to the rest of the cluster through the gossip. In other words,
only one node needs to mark a node unreachable
to have the rest of the
cluster mark that node unreachable
.
The
failure detector will also detect if the node becomes reachable
again. When all nodes that
monitored theunreachable
node detects it as reachable
again the cluster, after
gossip dissemination, will consider it asreachable
.
If
system messages cannot be delivered to a node it will be quarantined and
then it cannot come back fromunreachable
.
This can happen if the there are too many unacknowledged system messages
(e.g. watch, Terminated, remote actor deployment, failures of actors
supervised by remote parent). Then the node needs to be moved to thedown
or removed
states and the actor system
of the quarantined node must be restarted before it can join the cluster
again.
The nodes in the cluster monitor each other by sending heartbeats to detect if a node is unreachable from the rest of the cluster. The heartbeat arrival times is interpreted by an implementation of The Phi Accrual Failure Detector.
The suspicion level of failure is given by a value called phi. The basic idea of the phi failure detector is to express the value of phi on a scale that is dynamically adjusted to reflect current network conditions.
The value of phi is calculated as:
- phi = -log10(1 - F(timeSinceLastHeartbeat))
where F is the cumulative distribution function of a normal distribution with mean and standard deviation estimated from historical heartbeat inter-arrival times.
In
the Configuration you can adjust the akka.cluster.failure-detector.threshold
to define when a phi value
is considered to be a failure.
A
low threshold
is prone to generate many
false positives but ensures a quick detection in the event of a real
crash. Conversely, a high threshold
generates fewer mistakes
but needs more time to detect actual crashes. The defaultthreshold
is 8 and is appropriate for
most situations. However in cloud environments, such as Amazon EC2, the
value could be increased to 12 in order to account for network issues
that sometimes occur on such platforms.
The following chart illustrates how phi increase with increasing time since the previous heartbeat.
Phi is calculated from the mean and standard deviation of historical inter arrival times. The previous chart is an example for standard deviation of 200 ms. If the heartbeats arrive with less deviation the curve becomes steeper, i.e. it is possible to determine failure more quickly. The curve looks like this for a standard deviation of 100 ms.
To
be able to survive sudden abnormalities, such as garbage collection
pauses and transient network failures the failure detector is configured
with a margin, akka.cluster.failure-detector.acceptable-heartbeat-pause
.
You may want to adjust the Configuration of this depending on you
environment. This is how the curve looks like foracceptable-heartbeat-pause
configured to 3 seconds.
Death
watch uses the cluster failure detector for nodes in the cluster, i.e.
it detects network failures and JVM crashes, in addition to graceful
termination of watched actor. Death watch generates the Terminated
message to the watching
actor when the unreachable cluster node has been downed and removed.
If you encounter suspicious false positives when the system is under load you should define a separate dispatcher for the cluster actors as described in Cluster Dispatcher.
All routers can be made aware of member nodes in the cluster, i.e. deploying new routees or looking up routees on nodes in the cluster. When a node becomes unreachable or leaves the cluster the routees of that node are automatically unregistered from the router. When new nodes join the cluster, additional routees are added to the router, according to the configuration. Routees are also added when a node becomes reachable again, after having been unreachable.
Cluster aware routers make use of members with status WeaklyUp if that feature is enabled.
There are two distinct types of routers.
When
using a Group
you must start the routee
actors on the cluster member nodes. That is not done by the router.
The configuration for a group looks like this:
- akka.actor.deployment {
- /statsService/workerRouter {
- router = consistent-hashing-group
- routees.paths = ["/user/statsWorker"]
- cluster {
- enabled = on
- allow-local-routees = on
- use-role = compute
- }
- }
- }
Note
The routee actors should be started as early as possible when starting the actor system, because the router will try to use them as soon as the member status is changed to 'Up'.
The
actor paths without address information that are defined in routees.paths
are used for selecting
the actors to which the messages will be forwarded to by the router.
Messages will be forwarded to the routees using ActorSelection,
so the same delivery semantics should be expected. It is possible to
limit the lookup of routees to member nodes tagged with a certain role
by specifying use-role
.
max-total-nr-of-instances
defines total number of
routees in the cluster. By default max-total-nr-of-instances
is set to a high value
(10000) that will result in new routees added to the router when nodes
join the cluster. Set it to a lower value if you want to limit total
number of routees.
The same type of router could also have been defined in code:
- import akka.cluster.routing.ClusterRouterGroup
- import akka.cluster.routing.ClusterRouterGroupSettings
- import akka.routing.ConsistentHashingGroup
-
- val workerRouter = context.actorOf(
- ClusterRouterGroup(ConsistentHashingGroup(Nil), ClusterRouterGroupSettings(
- totalInstances = 100, routeesPaths = List("/user/statsWorker"),
- allowLocalRoutees = true, useRole = Some("compute"))).props(),
- name = "workerRouter2")
See Configuration section for further descriptions of the settings.
Let's take a look at how to use a cluster aware router with a group of routees, i.e. router sending to the paths of the routees.
The example application provides a service to calculate statistics for a text. When some text is sent to the service it splits it into words, and delegates the task to count number of characters in each word to a separate worker, a routee of a router. The character count for each word is sent back to an aggregator that calculates the average number of characters per word when all results have been collected.
Messages:
- final case class StatsJob(text: String)
- final case class StatsResult(meanWordLength: Double)
- final case class JobFailed(reason: String)
The worker that counts number of characters in each word:
- class StatsWorker extends Actor {
- var cache = Map.empty[String, Int]
- def receive = {
- case word: String =>
- val length = cache.get(word) match {
- case Some(x) => x
- case None =>
- val x = word.length
- cache += (word -> x)
- x
- }
-
- sender() ! length
- }
- }
The service that receives text from users and splits it up into words, delegates to workers and aggregates:
- class StatsService extends Actor {
- // This router is used both with lookup and deploy of routees. If you
- // have a router with only lookup of routees you can use Props.empty
- // instead of Props[StatsWorker.class].
- val workerRouter = context.actorOf(FromConfig.props(Props[StatsWorker]),
- name = "workerRouter")
-
- def receive = {
- case StatsJob(text) if text != "" =>
- val words = text.split(" ")
- val replyTo = sender() // important to not close over sender()
- // create actor that collects replies from workers
- val aggregator = context.actorOf(Props(
- classOf[StatsAggregator], words.size, replyTo))
- words foreach { word =>
- workerRouter.tell(
- ConsistentHashableEnvelope(word, word), aggregator)
- }
- }
- }
-
- class StatsAggregator(expectedResults: Int, replyTo: ActorRef) extends Actor {
- var results = IndexedSeq.empty[Int]
- context.setReceiveTimeout(3.seconds)
-
- def receive = {
- case wordCount: Int =>
- results = results :+ wordCount
- if (results.size == expectedResults) {
- val meanWordLength = results.sum.toDouble / results.size
- replyTo ! StatsResult(meanWordLength)
- context.stop(self)
- }
- case ReceiveTimeout =>
- replyTo ! JobFailed("Service unavailable, try again later")
- context.stop(self)
- }
- }
Note, nothing cluster specific so far, just plain actors.
All
nodes start StatsService
and StatsWorker
actors. Remember, routees
are the workers in this case. The router is configured with routees.paths
:
- akka.actor.deployment {
- /statsService/workerRouter {
- router = consistent-hashing-group
- routees.paths = ["/user/statsWorker"]
- cluster {
- enabled = on
- allow-local-routees = on
- use-role = compute
- }
- }
- }
This
means that user requests can be sent to StatsService
on any node and it will
use StatsWorker
on all nodes.
The Lightbend Activator tutorial named Akka Cluster Samples with Scala. contains the full source code and instructions of how to run the Router Example with Group of Routees.
When
using a Pool
with routees created and
deployed on the cluster member nodes the configuration for a router
looks like this:
- akka.actor.deployment {
- /statsService/singleton/workerRouter {
- router = consistent-hashing-pool
- cluster {
- enabled = on
- max-nr-of-instances-per-node = 3
- allow-local-routees = on
- use-role = compute
- }
- }
- }
It
is possible to limit the deployment of routees to member nodes tagged
with a certain role by specifying use-role
.
max-total-nr-of-instances
defines total number of
routees in the cluster, but the number of routees per node,max-nr-of-instances-per-node
,
will not be exceeded. By default max-total-nr-of-instances
is set to a high value
(10000) that will result in new routees added to the router when nodes
join the cluster. Set it to a lower value if you want to limit total
number of routees.
The same type of router could also have been defined in code:
- import akka.cluster.routing.ClusterRouterPool
- import akka.cluster.routing.ClusterRouterPoolSettings
- import akka.routing.ConsistentHashingPool
-
- val workerRouter = context.actorOf(
- ClusterRouterPool(ConsistentHashingPool(0), ClusterRouterPoolSettings(
- totalInstances = 100, maxInstancesPerNode = 3,
- allowLocalRoutees = false, useRole = None)).props(Props[StatsWorker]),
- name = "workerRouter3")
See Configuration section for further descriptions of the settings.
Let's
take a look at how to use a cluster aware router on single master node
that creates and deploys workers. To keep track of a single master we
use the Cluster
Singleton in
the contrib module. The ClusterSingletonManager
is started on each node.
- system.actorOf(ClusterSingletonManager.props(
- singletonProps = Props[StatsService],
- terminationMessage = PoisonPill,
- settings = ClusterSingletonManagerSettings(system).withRole("compute")),
- name = "statsService")
We
also need an actor on each node that keeps track of where current
single master exists and delegates jobs to theStatsService
.
That is provided by the ClusterSingletonProxy
.
- system.actorOf(ClusterSingletonProxy.props(singletonManagerPath = "/user/statsService",
- settings = ClusterSingletonProxySettings(system).withRole("compute")),
- name = "statsServiceProxy")
The ClusterSingletonProxy
receives text from users
and delegates to the current StatsService
,
the single master. It listens to cluster events to lookup the StatsService
on the oldest node.
All
nodes start ClusterSingletonProxy
and the ClusterSingletonManager
.
The router is now configured like this:
- akka.actor.deployment {
- /statsService/singleton/workerRouter {
- router = consistent-hashing-pool
- cluster {
- enabled = on
- max-nr-of-instances-per-node = 3
- allow-local-routees = on
- use-role = compute
- }
- }
- }
The Lightbend Activator tutorial named Akka Cluster Samples with Scala. contains the full source code and instructions of how to run the Router Example with Pool of Remote Deployed Routees.
The member nodes of the cluster can collect system health metrics and publish that to other cluster nodes and to the registered subscribers on the system event bus with the help of Cluster Metrics Extension.
Multi Node Testing is useful for testing cluster applications.
Set
up your project according to the instructions in Multi
Node Testing and Multi
JVM Testing, i.e. add the sbt-multi-jvm
plugin and the dependency
to akka-multi-node-testkit
.
First,
as described in Multi
Node Testing, we need some scaffolding to configure the MultiNodeSpec
.
Define the participating roles and their Configuration in an object extending MultiNodeConfig
:
- import akka.remote.testkit.MultiNodeConfig
- import com.typesafe.config.ConfigFactory
-
- object StatsSampleSpecConfig extends MultiNodeConfig {
- // register the named roles (nodes) of the test
- val first = role("first")
- val second = role("second")
- val third = role("thrid")
-
- def nodeList = Seq(first, second, third)
-
- // Extract individual sigar library for every node.
- nodeList foreach { role =>
- nodeConfig(role) {
- ConfigFactory.parseString(s"""
- # Disable legacy metrics in akka-cluster.
- akka.cluster.metrics.enabled=off
- # Enable metrics extension in akka-cluster-metrics.
- akka.extensions=["akka.cluster.metrics.ClusterMetricsExtension"]
- # Sigar native library extract location during tests.
- akka.cluster.metrics.native-library-extract-folder=target/native/${role.name}
- """)
- }
- }
-
- // this configuration will be used for all nodes
- // note that no fixed host names and ports are used
- commonConfig(ConfigFactory.parseString("""
- akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
- akka.remote.log-remote-lifecycle-events = off
- akka.cluster.roles = [compute]
- // router lookup config ...
- """))
-
- }
Define one concrete test class for each role/node. These will be instantiated on the different nodes (JVMs). They can be implemented differently, but often they are the same and extend an abstract test class, as illustrated here.
- // need one concrete test class per node
- class StatsSampleSpecMultiJvmNode1 extends StatsSampleSpec
- class StatsSampleSpecMultiJvmNode2 extends StatsSampleSpec
- class StatsSampleSpecMultiJvmNode3 extends StatsSampleSpec
Note
the naming convention of these classes. The name of the classes must end
with MultiJvmNode1
,MultiJvmNode2
and so on. It is possible
to define another suffix to be used by the sbt-multi-jvm
,
but the default should be fine in most cases.
Then
the abstract MultiNodeSpec
,
which takes the MultiNodeConfig
as constructor parameter.
- import org.scalatest.BeforeAndAfterAll
- import org.scalatest.WordSpecLike
- import org.scalatest.Matchers
- import akka.remote.testkit.MultiNodeSpec
- import akka.testkit.ImplicitSender
-
- abstract class StatsSampleSpec extends MultiNodeSpec(StatsSampleSpecConfig)
- with WordSpecLike with Matchers with BeforeAndAfterAll
- with ImplicitSender {
-
- import StatsSampleSpecConfig._
-
- override def initialParticipants = roles.size
-
- override def beforeAll() = multiNodeSpecBeforeAll()
-
- override def afterAll() = multiNodeSpecAfterAll()
Most of this can of course be extracted to a separate trait to avoid repeating this in all your tests.
Typically you begin your test by starting up the cluster and let the members join, and create some actors. That can be done like this:
- "illustrate how to startup cluster" in within(15 seconds) {
- Cluster(system).subscribe(testActor, classOf[MemberUp])
- expectMsgClass(classOf[CurrentClusterState])
-
- val firstAddress = node(first).address
- val secondAddress = node(second).address
- val thirdAddress = node(third).address
-
- Cluster(system) join firstAddress
-
- system.actorOf(Props[StatsWorker], "statsWorker")
- system.actorOf(Props[StatsService], "statsService")
-
- receiveN(3).collect { case MemberUp(m) => m.address }.toSet should be(
- Set(firstAddress, secondAddress, thirdAddress))
-
- Cluster(system).unsubscribe(testActor)
-
- testConductor.enter("all-up")
- }
From
the test you interact with the cluster using the Cluster
extension, e.g. join
.
- Cluster(system) join firstAddress
Notice how the testActor from testkit is added as subscriber to cluster changes and then waiting for certain events, such as in this case all members becoming 'Up'.
The
above code was running for all roles (JVMs). runOn
is a convenient utility to
declare that a certain block of code should only run for a specific
role.
- "show usage of the statsService from one node" in within(15 seconds) {
- runOn(second) {
- assertServiceOk()
- }
-
- testConductor.enter("done-2")
- }
-
- def assertServiceOk(): Unit = {
- val service = system.actorSelection(node(third) / "user" / "statsService")
- // eventually the service should be ok,
- // first attempts might fail because worker actors not started yet
- awaitAssert {
- service ! StatsJob("this is the text that will be analyzed")
- expectMsgType[StatsResult](1.second).meanWordLength should be(
- 3.875 +- 0.001)
- }
-
- }
Once
again we take advantage of the facilities in testkit to verify expected
behavior. Here using testActor
as sender (via ImplicitSender
)
and verifying the reply with expectMsgPF
.
In
the above code you can see node(third)
,
which is useful facility to get the root actor reference of the actor
system for a specific role. This can also be used to grab the akka.actor.Address
of that node.
- val firstAddress = node(first).address
- val secondAddress = node(second).address
- val thirdAddress = node(third).address
Information
and management of the cluster is available as JMX MBeans with the root
name akka.Cluster
.
The JMX information can be displayed with an ordinary JMX console such
as JConsole or JVisualVM.
From JMX you can:
Member nodes are identified by their address, in format akka.<protocol>://<actor-system-name>@<hostname>:<port>.
The cluster can be managed with the script bin/akka-cluster provided in the Akka distribution.
Run it without parameters to see instructions about how to use the script:
- Usage: bin/akka-cluster <node-hostname> <jmx-port> <command> ...
-
- Supported commands are:
- join <node-url> - Sends request a JOIN node with the specified URL
- leave <node-url> - Sends a request for node with URL to LEAVE the cluster
- down <node-url> - Sends a request for marking node with URL as DOWN
- member-status - Asks the member node for its current status
- members - Asks the cluster for addresses of current members
- unreachable - Asks the cluster for addresses of unreachable members
- cluster-status - Asks the cluster for its current status (member ring,
- unavailable nodes, meta data etc.)
- leader - Asks the cluster who the current leader is
- is-singleton - Checks if the cluster is a singleton cluster (single
- node cluster)
- is-available - Checks if the member node is available
- Where the <node-url> should be on the format of
- 'akka.<protocol>://<actor-system-name>@<hostname>:<port>'
-
- Examples: bin/akka-cluster localhost 9999 is-available
- bin/akka-cluster localhost 9999 join akka.tcp://MySystem@darkstar:2552
- bin/akka-cluster localhost 9999 cluster-status
To be able to use the script you must enable remote monitoring and management when starting the JVMs of the cluster nodes, as described in Monitoring and Management Using JMX Technology
Example of system properties to enable remote monitoring and management:
- java -Dcom.sun.management.jmxremote.port=9999 \
- -Dcom.sun.management.jmxremote.authenticate=false \
- -Dcom.sun.management.jmxremote.ssl=false
There are several configuration properties for the cluster. We refer to the reference configuration for more information.
You can silence the logging of cluster events at info level with configuration property:
- akka.cluster.log-info = off
Under the hood the cluster extension is implemented with actors and it can be necessary to create a bulkhead for those actors to avoid disturbance from other actors. Especially the heartbeating actors that is used for failure detection can generate false positives if they are not given a chance to run at regular intervals. For this purpose you can define a separate dispatcher to be used for the cluster actors:
- akka.cluster.use-dispatcher = cluster-dispatcher
-
- cluster-dispatcher {
- type = "Dispatcher"
- executor = "fork-join-executor"
- fork-join-executor {
- parallelism-min = 2
- parallelism-max = 4
- }
- }
Note
Normally
it should not be necessary to configure a separate dispatcher for
the Cluster. The default-dispatcher should be sufficient for
performing the Cluster tasks, i.e. akka.cluster.use-dispatcher
should not be changed.
If you have Cluster related problems when using the
default-dispatcher that is typically an indication that you are
running blocking or CPU intensive actors/tasks on the
default-dispatcher. Use dedicated dispatchers for such actors/tasks
instead of running them on the default-dispatcher, because that may
starve system internal tasks. Related config properties: akka.cluster.use-dispatcher = akka.cluster.cluster-dispatcher
.
Corresponding default values: akka.cluster.use-dispatcher =
.
For some use cases it is convenient and sometimes also mandatory to ensure that you have exactly one actor of a certain type running somewhere in the cluster.
Some examples:
Using a singleton should not be the first design choice. It has several drawbacks, such as single-point of bottleneck. Single-point of failure is also a relevant concern, but for some cases this feature takes care of that by making sure that another singleton instance will eventually be started.
The
cluster singleton pattern is implemented by akka.cluster.singleton.ClusterSingletonManager
.
It manages one singleton actor instance among all cluster nodes or a group
of nodes tagged with a specific role.ClusterSingletonManager
is an actor that is supposed
to be started on all nodes, or all nodes with specified role, in the
cluster. The actual singleton actor is started by the ClusterSingletonManager
on the oldest node by
creating a child actor from supplied Props
. ClusterSingletonManager
makes sure that at most one
singleton instance is running at any point in time.
The
singleton actor is always running on the oldest member with specified
role. The oldest member is determined byakka.cluster.Member#isOlderThan
.
This can change when removing that member from the cluster. Be aware that
there is a short time period when there is no active singleton during the
hand-over process.
The cluster failure detector will notice when oldest node becomes unreachable due to things like JVM crash, hard shut down, or network failure. Then a new oldest node will take over and a new singleton actor is created. For these failure scenarios there will not be a graceful hand-over, but more than one active singletons is prevented by all reasonable means. Some corner cases are eventually resolved by configurable timeouts.
You
can access the singleton actor by using the provided akka.cluster.singleton.ClusterSingletonProxy
,
which will route all messages to the current instance of the singleton.
The proxy will keep track of the oldest node in the cluster and resolve
the singleton's ActorRef
by explicitly sending the
singleton's actorSelection
theakka.actor.Identify
message and waiting for it to
reply. This is performed periodically if the singleton doesn't reply
within a certain (configurable) time. Given the implementation, there
might be periods of time during which theActorRef
is unavailable, e.g., when a
node leaves the cluster. In these cases, the proxy will buffer the
messages sent to the singleton and then deliver them when the singleton is
finally available. If the buffer is full theClusterSingletonProxy
will drop old messages when
new messages are sent via the proxy. The size of the buffer is
configurable and it can be disabled by using a buffer size of 0.
It's worth noting that messages can always be lost because of the distributed nature of these actors. As always, additional logic should be implemented in the singleton (acknowledgement) and in the client (retry) actors to ensure at-least-once message delivery.
The singleton instance will not run on members with status WeaklyUp if that feature is enabled.
This pattern may seem to be very tempting to use at first, but it has several drawbacks, some of them are listed below:
Especially the last point is something you should be aware of — in general when using the Cluster Singleton pattern you should take care of downing nodes yourself and not rely on the timing based auto-down feature.
Warning
Don't use Cluster Singleton together with Automatic Downing, since it allows the cluster to split up into two separate clusters, which in turn will result in multiple Singletons being started, one in each separate cluster!
Assume that we need one single entry point to an external system. An actor that receives messages from a JMS queue with the strict requirement that only one JMS consumer must exist to be make sure that the messages are processed in order. That is perhaps not how one would like to design things, but a typical real-world scenario when integrating with external systems.
On
each node in the cluster you need to start the ClusterSingletonManager
and supply the Props
of the singleton actor, in
this case the JMS queue consumer.
- system.actorOf(
- ClusterSingletonManager.props(
- singletonProps = Props(classOf[Consumer], queue, testActor),
- terminationMessage = End,
- settings = ClusterSingletonManagerSettings(system).withRole("worker")),
- name = "consumer")
Here
we limit the singleton to nodes tagged with the "worker"
role, but all nodes,
independent of role, can be used by not specifying withRole
.
Here
we use an application specific terminationMessage
to be able to close the
resources before actually stopping the singleton actor. Note that PoisonPill
is a perfectly fine terminationMessage
if you only need to stop
the actor.
Here
is how the singleton actor handles the terminationMessage
in this example.
- case End ⇒
- queue ! UnregisterConsumer
- case UnregistrationOk ⇒
- stoppedBeforeUnregistration = false
- context stop self
- case Ping ⇒
- sender() ! Pong
With the names given above, access to the singleton can be obtained from any cluster node using a properly configured proxy.
- system.actorOf(
- ClusterSingletonProxy.props(
- singletonManagerPath = "/user/consumer",
- settings = ClusterSingletonProxySettings(system).withRole("worker")),
- name = "consumerProxy")
A more comprehensive sample is available in the Lightbend Activator tutorial named Distributed workers with Akka and Scala!.
To use the Cluster Singleton you must add the following dependency in your project.
sbt:
- "com.typesafe.akka" %% "akka-cluster-tools" % "2.4.10"
maven:
- <dependency>
- <groupId>com.typesafe.akka</groupId>
- <artifactId>akka-cluster-tools_2.11</artifactId>
- <version>2.4.10</version>
- </dependency>
The
following configuration properties are read by the ClusterSingletonManagerSettings
when created with aActorSystem
parameter. It is also
possible to amend the ClusterSingletonManagerSettings
or create it from another
config section with the same layout as below. ClusterSingletonManagerSettings
is a parameter to theClusterSingletonManager.props
factory method, i.e. each
singleton can be configured with different settings if needed.
- akka.cluster.singleton {
- # The actor name of the child singleton actor.
- singleton-name = "singleton"
-
- # Singleton among the nodes tagged with specified role.
- # If the role is not specified it's a singleton among all nodes in the cluster.
- role = ""
-
- # When a node is becoming oldest it sends hand-over request to previous oldest,
- # that might be leaving the cluster. This is retried with this interval until
- # the previous oldest confirms that the hand over has started or the previous
- # oldest member is removed from the cluster (+ akka.cluster.down-removal-margin).
- hand-over-retry-interval = 1s
-
- # The number of retries are derived from hand-over-retry-interval and
- # akka.cluster.down-removal-margin (or ClusterSingletonManagerSettings.removalMargin),
- # but it will never be less than this property.
- min-number-of-hand-over-retries = 10
- }
The
following configuration properties are read by the ClusterSingletonProxySettings
when created with aActorSystem
parameter. It is also
possible to amend the ClusterSingletonProxySettings
or create it from another
config section with the same layout as below. ClusterSingletonProxySettings
is a parameter to theClusterSingletonProxy.props
factory method, i.e. each
singleton proxy can be configured with different settings if needed.
- akka.cluster.singleton-proxy {
- # The actor name of the singleton actor that is started by the ClusterSingletonManager
- singleton-name = ${akka.cluster.singleton.singleton-name}
-
- # The role of the cluster nodes where the singleton can be deployed.
- # If the role is not specified then any node will do.
- role = ""
-
- # Interval at which the proxy will try to resolve the singleton instance.
- singleton-identification-interval = 1s
-
- # If the location of the singleton is unknown the proxy will buffer this
- # number of messages and deliver them when the singleton is identified.
- # When the buffer is full old messages will be dropped when new messages are
- # sent via the proxy.
- # Use 0 to disable buffering, i.e. messages will be dropped immediately if
- # the location of the singleton is unknown.
- # Maximum allowed buffer size is 10000.
- buffer-size = 1000
- }
How do I send a message to an actor without knowing which node it is running on?
How do I send messages to all actors in the cluster that have registered interest in a named topic?
This
pattern provides a mediator actor, akka.cluster.pubsub.DistributedPubSubMediator
,
that manages a registry of actor references and replicates the entries to
peer actors among all cluster nodes or a group of nodes tagged with a
specific role.
The DistributedPubSubMediator
actor is supposed to be
started on all nodes, or all nodes with specified role, in the cluster.
The mediator can be started with the DistributedPubSub
extension or as an ordinary
actor.
The registry is eventually consistent, i.e. changes are not immediately visible at other nodes, but typically they will be fully replicated to all other nodes after a few seconds. Changes are only performed in the own part of the registry and those changes are versioned. Deltas are disseminated in a scalable way to other nodes with a gossip protocol.
Cluster
members with status WeaklyUp,
if that feature is enabled, will participate in Distributed Publish
Subscribe, i.e. subscribers on nodes with WeaklyUp
status will receive published
messages if the publisher and subscriber are on same side of a network
partition.
You can send messages via the mediator on any node to registered actors on any other node.
There a two different modes of message delivery, explained in the sections Publish and Send below.
A more comprehensive sample is available in the Lightbend Activator tutorial named Akka Clustered PubSub with Scala!.
This is the true pub/sub mode. A typical usage of this mode is a chat room in an instant messaging application.
Actors are registered to a named topic. This enables many subscribers on each node. The message will be delivered to all subscribers of the topic.
For efficiency the message is sent over the wire only once per node (that has a matching topic), and then delivered to all subscribers of the local topic representation.
You
register actors to the local mediator with DistributedPubSubMediator.Subscribe
.
Successful Subscribe
andUnsubscribe
is acknowledged with DistributedPubSubMediator.SubscribeAck
andDistributedPubSubMediator.UnsubscribeAck
replies. The acknowledgment
means that the subscription is registered, but it can still take some
time until it is replicated to other nodes.
You
publish messages by sending DistributedPubSubMediator.Publish
message to the local
mediator.
Actors
are automatically removed from the registry when they are terminated, or
you can explicitly remove entries withDistributedPubSubMediator.Unsubscribe
.
An example of a subscriber actor:
- class Subscriber extends Actor with ActorLogging {
- import DistributedPubSubMediator.{ Subscribe, SubscribeAck }
- val mediator = DistributedPubSub(context.system).mediator
- // subscribe to the topic named "content"
- mediator ! Subscribe("content", self)
-
- def receive = {
- case s: String ⇒
- log.info("Got {}", s)
- case SubscribeAck(Subscribe("content", None, `self`)) ⇒
- log.info("subscribing");
- }
- }
Subscriber actors can be started on several nodes in the cluster, and all will receive messages published to the "content" topic.
- runOn(first) {
- system.actorOf(Props[Subscriber], "subscriber1")
- }
- runOn(second) {
- system.actorOf(Props[Subscriber], "subscriber2")
- system.actorOf(Props[Subscriber], "subscriber3")
- }
A simple actor that publishes to this "content" topic:
- class Publisher extends Actor {
- import DistributedPubSubMediator.Publish
- // activate the extension
- val mediator = DistributedPubSub(context.system).mediator
-
- def receive = {
- case in: String ⇒
- val out = in.toUpperCase
- mediator ! Publish("content", out)
- }
- }
It can publish messages to the topic from anywhere in the cluster:
- runOn(third) {
- val publisher = system.actorOf(Props[Publisher], "publisher")
- later()
- // after a while the subscriptions are replicated
- publisher ! "hello"
- }
Actors
may also be subscribed to a named topic with a group
id. If subscribing with a
group id, each message published to a topic with the sendOneMessageToEachGroup
flag set to true
is delivered via the
suppliedRoutingLogic
(default random) to one
actor within each subscribing group.
If
all the subscribed actors have the same group id, then this works just
like Send
and each message is only
delivered to one subscriber.
If
all the subscribed actors have different group names, then this works
like normal Publish
and each message is
broadcasted to all subscribers.
Note
Note
that if the group id is used it is part of the topic identifier.
Messages published withsendOneMessageToEachGroup=false
will not be delivered
to subscribers that subscribed with a group id. Messages published
with sendOneMessageToEachGroup=true
will not be delivered
to subscribers that subscribed without a group id.
This is a point-to-point mode where each message is delivered to one destination, but you still does not have to know where the destination is located. A typical usage of this mode is private chat to one other user in an instant messaging application. It can also be used for distributing tasks to registered workers, like a cluster aware router where the routees dynamically can register themselves.
The
message will be delivered to one recipient with a matching path, if any
such exists in the registry. If several entries match the path because
it has been registered on several nodes the message will be sent via the
suppliedRoutingLogic
(default random) to one
destination. The sender() of the message can specify that local affinity
is preferred, i.e. the message is sent to an actor in the same local
actor system as the used mediator actor, if any such exists, otherwise
route to any other matching entry.
You
register actors to the local mediator with DistributedPubSubMediator.Put
.
The ActorRef
in Put
must belong to the same
local actor system as the mediator. The path without address information
is the key to which you send messages. On each node there can only be
one actor for a given path, since the path is unique within one local
actor system.
You
send messages by sending DistributedPubSubMediator.Send
message to the local
mediator with the path (without address information) of the destination
actors.
Actors
are automatically removed from the registry when they are terminated, or
you can explicitly remove entries withDistributedPubSubMediator.Remove
.
An example of a destination actor:
- class Destination extends Actor with ActorLogging {
- import DistributedPubSubMediator.Put
- val mediator = DistributedPubSub(context.system).mediator
- // register to the path
- mediator ! Put(self)
-
- def receive = {
- case s: String ⇒
- log.info("Got {}", s)
- }
- }
Destination actors can be started on several nodes in the cluster, and all will receive messages sent to the path (without address information).
- runOn(first) {
- system.actorOf(Props[Destination], "destination")
- }
- runOn(second) {
- system.actorOf(Props[Destination], "destination")
- }
A simple actor that sends to the path:
- class Sender extends Actor {
- import DistributedPubSubMediator.Send
- // activate the extension
- val mediator = DistributedPubSub(context.system).mediator
-
- def receive = {
- case in: String ⇒
- val out = in.toUpperCase
- mediator ! Send(path = "/user/destination", msg = out, localAffinity = true)
- }
- }
It can send messages to the path from anywhere in the cluster:
- runOn(third) {
- val sender = system.actorOf(Props[Sender], "sender")
- later()
- // after a while the destinations are replicated
- sender ! "hello"
- }
It
is also possible to broadcast messages to the actors that have been
registered with Put
. SendDistributedPubSubMediator.SendToAll
message to the local
mediator and the wrapped message will then be delivered to all
recipients with a matching path. Actors with the same path, without
address information, can be registered on different nodes. On each node
there can only be one such actor, since the path is unique within one
local actor system.
Typical
usage of this mode is to broadcast messages to all replicas with the
same path, e.g. 3 actors on different nodes that all perform the same
actions, for redundancy. You can also optionally specify a property (allButSelf
)
deciding if the message should be sent to a matching path on the self
node or not.
In
the example above the mediator is started and accessed with the akka.cluster.pubsub.DistributedPubSub
extension.
That is convenient and perfectly fine in most cases, but it can be good
to know that it is possible to start the mediator actor as an ordinary
actor and you can have several different mediators at the same time to
be able to divide a large number of actors/topics to different
mediators. For example you might want to use different cluster roles for
different mediators.
The DistributedPubSub
extension can be configured
with the following properties:
- # Settings for the DistributedPubSub extension
- akka.cluster.pub-sub {
- # Actor name of the mediator actor, /system/distributedPubSubMediator
- name = distributedPubSubMediator
-
- # Start the mediator on members tagged with this role.
- # All members are used if undefined or empty.
- role = ""
-
- # The routing logic to use for 'Send'
- # Possible values: random, round-robin, broadcast
- routing-logic = random
-
- # How often the DistributedPubSubMediator should send out gossip information
- gossip-interval = 1s
-
- # Removed entries are pruned after this duration
- removed-time-to-live = 120s
-
- # Maximum number of elements to transfer in one message when synchronizing the registries.
- # Next chunk will be transferred in next round of gossip.
- max-delta-elements = 3000
-
- # The id of the dispatcher to use for DistributedPubSubMediator actors.
- # If not specified default dispatcher is used.
- # If specified you need to define the settings of the actual dispatcher.
- use-dispatcher = ""
- }
It
is recommended to load the extension when the actor system is started by
defining it in akka.extensions
configuration
property. Otherwise it will be activated when first used and then it
takes a while for it to be populated.
- akka.extensions = ["akka.cluster.pubsub.DistributedPubSub"]
To use Distributed Publish Subscribe you must add the following dependency in your project.
sbt:
- "com.typesafe.akka" %% "akka-cluster-tools" % "2.4.10"
maven:
- <dependency>
- <groupId>com.typesafe.akka</groupId>
- <artifactId>akka-cluster-tools_2.11</artifactId>
- <version>2.4.10</version>
- </dependency>
An
actor system that is not part of the cluster can communicate with actors
somewhere in the cluster via thisClusterClient
.
The client can of course be part of another cluster. It only needs to know
the location of one (or more) nodes to use as initial contact points. It
will establish a connection to a ClusterReceptionist
somewhere in the cluster. It
will monitor the connection to the receptionist and establish a new
connection if the link goes down. When looking for a new receptionist it
uses fresh contact points retrieved from previous establishment, or
periodically refreshed contacts, i.e. not necessarily the initial contact
points.
Note
ClusterClient
should not be used when
sending messages to actors that run within the same cluster. Similar
functionality as the ClusterClient
is provided in a more
efficient way by Distributed
Publish Subscribe in Cluster for
actors that belong to the same cluster.
Also,
note it's necessary to change akka.actor.provider
from akka.actor.LocalActorRefProvider
toakka.remote.RemoteActorRefProvider
or akka.cluster.ClusterActorRefProvider
when using the cluster
client.
The
receptionist is supposed to be started on all nodes, or all nodes with
specified role, in the cluster. The receptionist can be started with the ClusterClientReceptionist
extension or as an ordinary
actor.
You
can send messages via the ClusterClient
to any actor in the cluster
that is registered in theDistributedPubSubMediator
used by the ClusterReceptionist
.
The ClusterClientReceptionist
provides methods for
registration of actors that should be reachable from the client. Messages
are wrapped inClusterClient.Send
, ClusterClient.SendToAll
or ClusterClient.Publish
.
Both
the ClusterClient
and the ClusterClientReceptionist
emit events that can be
subscribed to. TheClusterClient
sends out notifications in
relation to having received a list of contact points from theClusterClientReceptionist
.
One use of this list might be for the client to record its contact points.
A client that is restarted could then use this information to supersede
any previously configured contact points.
The ClusterClientReceptionist
sends out notifications in
relation to having received contact from aClusterClient
.
This notification enables the server containing the receptionist to become
aware of what clients are connected.
1. ClusterClient.Send
The message will be delivered to one recipient with a matching path, if any such exists. If several entries match the path the message will be delivered to one random destination. The sender() of the message can specify that local affinity is preferred, i.e. the message is sent to an actor in the same local actor system as the used receptionist actor, if any such exists, otherwise random to any other matching entry.
2. ClusterClient.SendToAll
The message will be delivered to all recipients with a matching path.
3. ClusterClient.Publish
The message will be delivered to all recipients Actors that have been registered as subscribers to the named topic.
Response
messages from the destination actor are tunneled via the receptionist to
avoid inbound connections from other cluster nodes to the client, i.e. the sender()
,
as seen by the destination actor, is not the client itself. Thesender()
of the response messages, as
seen by the client, is deadLetters
since the client should
normally send subsequent messages via the ClusterClient
.
It is possible to pass the original sender inside the reply messages if
the client is supposed to communicate directly to the actor in the
cluster.
While
establishing a connection to a receptionist the ClusterClient
will buffer messages and send
them when the connection is established. If the buffer is full the ClusterClient
will drop old messages when
new messages are sent via the client. The size of the buffer is
configurable and it can be disabled by using a buffer size of 0.
It's worth noting that messages can always be lost because of the distributed nature of these actors. As always, additional logic should be implemented in the destination (acknowledgement) and in the client (retry) actors to ensure at-least-once message delivery.
On
the cluster nodes first start the receptionist. Note, it is recommended
to load the extension when the actor system is started by defining it in
the akka.extensions
configuration property:
- akka.extensions = ["akka.cluster.client.ClusterClientReceptionist"]
Next, register the actors that should be available for the client.
- runOn(host1) {
- val serviceA = system.actorOf(Props[Service], "serviceA")
- ClusterClientReceptionist(system).registerService(serviceA)
- }
-
- runOn(host2, host3) {
- val serviceB = system.actorOf(Props[Service], "serviceB")
- ClusterClientReceptionist(system).registerService(serviceB)
- }
On
the client you create the ClusterClient
actor and use it as a
gateway for sending messages to the actors identified by their path
(without address information) somewhere in the cluster.
- runOn(client) {
- val c = system.actorOf(ClusterClient.props(
- ClusterClientSettings(system).withInitialContacts(initialContacts)), "client")
- c ! ClusterClient.Send("/user/serviceA", "hello", localAffinity = true)
- c ! ClusterClient.SendToAll("/user/serviceB", "hi")
- }
The initialContacts
parameter is a Set[ActorPath]
,
which can be created like this:
- val initialContacts = Set(
- ActorPath.fromString("akka.tcp://OtherSys@host1:2552/system/receptionist"),
- ActorPath.fromString("akka.tcp://OtherSys@host2:2552/system/receptionist"))
- val settings = ClusterClientSettings(system)
- .withInitialContacts(initialContacts)
You will probably define the address information of the initial contact points in configuration or system property. See also Configuration.
A more comprehensive sample is available in the Lightbend Activator tutorial named Distributed workers with Akka and Scala!.
In
the example above the receptionist is started and accessed with theakka.cluster.client.ClusterClientReceptionist
extension. That is
convenient and perfectly fine in most cases, but it can be good to know
that it is possible to start the akka.cluster.client.ClusterReceptionist
actor as an ordinary actor
and you can have several different receptionists at the same time,
serving different types of clients.
Note
that the ClusterClientReceptionist
uses the DistributedPubSub
extension, which is
described inDistributed
Publish Subscribe in Cluster.
It
is recommended to load the extension when the actor system is started by
defining it in the akka.extensions
configuration
property:
- akka.extensions = ["akka.cluster.client.ClusterClientReceptionist"]
As
mentioned earlier, both the ClusterClient
and ClusterClientReceptionist
emit events that can be
subscribed to. The following code snippet declares an actor that will
receive notifications on contact points (addresses to the available
receptionists), as they become available. The code illustrates
subscribing to the events and receiving theClusterClient
initial state.
- class ClientListener(targetClient: ActorRef) extends Actor {
- override def preStart(): Unit =
- targetClient ! SubscribeContactPoints
-
- def receive: Receive =
- receiveWithContactPoints(Set.empty)
-
- def receiveWithContactPoints(contactPoints: Set[ActorPath]): Receive = {
- case ContactPoints(cps) ⇒
- context.become(receiveWithContactPoints(cps))
- // Now do something with the up-to-date "cps"
- case ContactPointAdded(cp) ⇒
- context.become(receiveWithContactPoints(contactPoints + cp))
- // Now do something with an up-to-date "contactPoints + cp"
- case ContactPointRemoved(cp) ⇒
- context.become(receiveWithContactPoints(contactPoints - cp))
- // Now do something with an up-to-date "contactPoints - cp"
- }
- }
Similarly
we can have an actor that behaves in a similar fashion for learning what
cluster clients contact aClusterClientReceptionist
:
- class ReceptionistListener(targetReceptionist: ActorRef) extends Actor {
- override def preStart(): Unit =
- targetReceptionist ! SubscribeClusterClients
-
- def receive: Receive =
- receiveWithClusterClients(Set.empty)
-
- def receiveWithClusterClients(clusterClients: Set[ActorRef]): Receive = {
- case ClusterClients(cs) ⇒
- context.become(receiveWithClusterClients(cs))
- // Now do something with the up-to-date "c"
- case ClusterClientUp(c) ⇒
- context.become(receiveWithClusterClients(clusterClients + c))
- // Now do something with an up-to-date "clusterClients + c"
- case ClusterClientUnreachable(c) ⇒
- context.become(receiveWithClusterClients(clusterClients - c))
- // Now do something with an up-to-date "clusterClients - c"
- }
- }
To use the Cluster Client you must add the following dependency in your project.
sbt:
- "com.typesafe.akka" %% "akka-cluster-tools" % "2.4.10"
maven:
- <dependency>
- <groupId>com.typesafe.akka</groupId>
- <artifactId>akka-cluster-tools_2.11</artifactId>
- <version>2.4.10</version>
- </dependency>
The ClusterClientReceptionist
extension (or ClusterReceptionistSettings
)
can be configured with the following properties:
- # Settings for the ClusterClientReceptionist extension
- akka.cluster.client.receptionist {
- # Actor name of the ClusterReceptionist actor, /system/receptionist
- name = receptionist
-
- # Start the receptionist on members tagged with this role.
- # All members are used if undefined or empty.
- role = ""
-
- # The receptionist will send this number of contact points to the client
- number-of-contacts = 3
-
- # The actor that tunnel response messages to the client will be stopped
- # after this time of inactivity.
- response-tunnel-receive-timeout = 30s
-
- # The id of the dispatcher to use for ClusterReceptionist actors.
- # If not specified default dispatcher is used.
- # If specified you need to define the settings of the actual dispatcher.
- use-dispatcher = ""
-
- # How often failure detection heartbeat messages should be received for
- # each ClusterClient
- heartbeat-interval = 2s
-
- # Number of potentially lost/delayed heartbeats that will be
- # accepted before considering it to be an anomaly.
- # The ClusterReceptionist is using the akka.remote.DeadlineFailureDetector, which
- # will trigger if there are no heartbeats within the duration
- # heartbeat-interval + acceptable-heartbeat-pause, i.e. 15 seconds with
- # the default settings.
- acceptable-heartbeat-pause = 13s
-
- # Failure detection checking interval for checking all ClusterClients
- failure-detection-interval = 2s
- }
The
following configuration properties are read by the ClusterClientSettings
when created with a ActorSystem
parameter.
It is also possible to amend the ClusterClientSettings
or create it from another
config section with the same layout as below. ClusterClientSettings
is a parameter to the ClusterClient.props
factory method, i.e. each
client can be configured with different settings if needed.
- # Settings for the ClusterClient
- akka.cluster.client {
- # Actor paths of the ClusterReceptionist actors on the servers (cluster nodes)
- # that the client will try to contact initially. It is mandatory to specify
- # at least one initial contact.
- # Comma separated full actor paths defined by a string on the form of
- # "akka.tcp://system@hostname:port/system/receptionist"
- initial-contacts = []
-
- # Interval at which the client retries to establish contact with one of
- # ClusterReceptionist on the servers (cluster nodes)
- establishing-get-contacts-interval = 3s
-
- # Interval at which the client will ask the ClusterReceptionist for
- # new contact points to be used for next reconnect.
- refresh-contacts-interval = 60s
-
- # How often failure detection heartbeat messages should be sent
- heartbeat-interval = 2s
-
- # Number of potentially lost/delayed heartbeats that will be
- # accepted before considering it to be an anomaly.
- # The ClusterClient is using the akka.remote.DeadlineFailureDetector, which
- # will trigger if there are no heartbeats within the duration
- # heartbeat-interval + acceptable-heartbeat-pause, i.e. 15 seconds with
- # the default settings.
- acceptable-heartbeat-pause = 13s
-
- # If connection to the receptionist is not established the client will buffer
- # this number of messages and deliver them the connection is established.
- # When the buffer is full old messages will be dropped when new messages are sent
- # via the client. Use 0 to disable buffering, i.e. messages will be dropped
- # immediately if the location of the singleton is unknown.
- # Maximum allowed buffer size is 10000.
- buffer-size = 1000
-
- # If connection to the receiptionist is lost and the client has not been
- # able to acquire a new connection for this long the client will stop itself.
- # This duration makes it possible to watch the cluster client and react on a more permanent
- # loss of connection with the cluster, for example by accessing some kind of
- # service registry for an updated set of initial contacts to start a new cluster client with.
- # If this is not wanted it can be set to "off" to disable the timeout and retry
- # forever.
- reconnect-timeout = off
- }
When
the cluster client is started it must be provided with a list of initial
contacts which are cluster nodes where receptionists are running. It
will then repeatedly (with an interval configurable by establishing-get-contacts-interval
)
try to contact those until it gets in contact with one of them. While
running, the list of contacts are continuously updated with data from
the receptionists (again, with an interval configurable with refresh-contacts-interval
),
so that if there are more receptionists in the cluster than the initial
contacts provided to the client the client will learn about them.
While the client is running it will detect failures in its connection to the receptionist by heartbeats if more than a configurable amount of heartbeats are missed the client will try to reconnect to its known set of contacts to find a receptionist it can access.
It
is possible to make the cluster client stop entirely if it cannot find a
receptionist it can talk to within a configurable interval. This is
configured with the reconnect-timeout
,
which defaults to off
. This
can be useful when initial contacts are provided from some kind of
service registry, cluster node addresses are entirely dynamic and the
entire cluster might shut down or crash, be restarted on new addresses.
Since the client will be stopped in that case a monitoring actor can
watch it and upon Terminate
a new set of initial
contacts can be fetched and a new cluster client started.
Cluster sharding is useful when you need to distribute actors across several nodes in the cluster and want to be able to interact with them using their logical identifier, but without having to care about their physical location in the cluster, which might also change over time.
It could for example be actors representing Aggregate Roots in Domain-Driven Design terminology. Here we call these actors "entities". These actors typically have persistent (durable) state, but this feature is not limited to actors with persistent state.
Cluster sharding is typically used when you have many stateful actors that together consume more resources (e.g. memory) than fit on one machine. If you only have a few stateful actors it might be easier to run them on a Cluster Singleton node.
In
this context sharding means that actors with an identifier, so called
entities, can be automatically distributed across multiple nodes in the
cluster. Each entity actor runs only at one place, and messages can be
sent to the entity without requiring the sender to know the location of
the destination actor. This is achieved by sending the messages via aShardRegion
actor provided by this
extension, which knows how to route the message with the entity id to the
final destination.
Cluster sharding will not be active on members with status WeaklyUp if that feature is enabled.
Warning
Don't use Cluster Sharding together with Automatic Downing, since it allows the cluster to split up into two separate clusters, which in turn will result in multiple shards and entities being started, one in each separate cluster! See Downing.
This is how an entity actor may look like:
- case object Increment
- case object Decrement
- final case class Get(counterId: Long)
- final case class EntityEnvelope(id: Long, payload: Any)
-
- case object Stop
- final case class CounterChanged(delta: Int)
-
- class Counter extends PersistentActor {
- import ShardRegion.Passivate
-
- context.setReceiveTimeout(120.seconds)
-
- // self.path.name is the entity identifier (utf-8 URL-encoded)
- override def persistenceId: String = "Counter-" + self.path.name
-
- var count = 0
-
- def updateState(event: CounterChanged): Unit =
- count += event.delta
-
- override def receiveRecover: Receive = {
- case evt: CounterChanged ⇒ updateState(evt)
- }
-
- override def receiveCommand: Receive = {
- case Increment ⇒ persist(CounterChanged(+1))(updateState)
- case Decrement ⇒ persist(CounterChanged(-1))(updateState)
- case Get(_) ⇒ sender() ! count
- case ReceiveTimeout ⇒ context.parent ! Passivate(stopMessage = Stop)
- case Stop ⇒ context.stop(self)
- }
- }
The
above actor uses event sourcing and the support provided in PersistentActor
to store its state. It does
not have to be a persistent actor, but in case of failure or migration
of entities between nodes it must be able to recover its state if it is
valuable.
Note
how the persistenceId
is defined. The name of the
actor is the entity identifier (utf-8 URL-encoded). You may define it
another way, but it must be unique.
When
using the sharding extension you are first, typically at system startup
on each node in the cluster, supposed to register the supported entity
types with the ClusterSharding.start
method. ClusterSharding.start
gives you the reference
which you can pass along.
- val counterRegion: ActorRef = ClusterSharding(system).start(
- typeName = "Counter",
- entityProps = Props[Counter],
- settings = ClusterShardingSettings(system),
- extractEntityId = extractEntityId,
- extractShardId = extractShardId)
The extractEntityId
and extractShardId
are two application
specific functions to extract the entity identifier and the shard
identifier from incoming messages.
- val extractEntityId: ShardRegion.ExtractEntityId = {
- case EntityEnvelope(id, payload) ⇒ (id.toString, payload)
- case msg @ Get(id) ⇒ (id.toString, msg)
- }
-
- val numberOfShards = 100
-
- val extractShardId: ShardRegion.ExtractShardId = {
- case EntityEnvelope(id, _) ⇒ (id % numberOfShards).toString
- case Get(id) ⇒ (id % numberOfShards).toString
- }
This example illustrates two different ways to define the entity identifier in the messages:
- The
Get
message includes the identifier itself.- The
EntityEnvelope
holds the identifier, and the actual message that is sent to the entity actor is wrapped in the envelope.
Note
how these two messages types are handled in the extractEntityId
function shown above. The
message sent to the entity actor is the second part of the tuple return
by the extractEntityId
and that makes it possible
to unwrap envelopes if needed.
A
shard is a group of entities that will be managed together. The grouping
is defined by the extractShardId
function shown above. For a
specific entity identifier the shard identifier must always be the same.
Creating a good sharding algorithm is an interesting challenge in itself. Try to produce a uniform distribution, i.e. same amount of entities in each shard. As a rule of thumb, the number of shards should be a factor ten greater than the planned maximum number of cluster nodes. Less shards than number of nodes will result in that some nodes will not host any shards. Too many shards will result in less efficient management of the shards, e.g. rebalancing overhead, and increased latency because the coordinator is involved in the routing of the first message for each shard. The sharding algorithm must be the same on all nodes in a running cluster. It can be changed after stopping all nodes in the cluster.
A
simple sharding algorithm that works fine in most cases is to take the
absolute value of the hashCode
of the entity identifier
modulo number of shards. As a convenience this is provided by theShardRegion.HashCodeMessageExtractor
.
Messages
to the entities are always sent via the local ShardRegion
.
The ShardRegion
actor reference for a named
entity type is returned by ClusterSharding.start
and it can also be
retrieved withClusterSharding.shardRegion
.
The ShardRegion
will lookup the location of
the shard for the entity if it does not already know its location. It
will delegate the message to the right node and it will create the
entity actor on demand, i.e. when the first message for a specific
entity is delivered.
- val counterRegion: ActorRef = ClusterSharding(system).shardRegion("Counter")
- counterRegion ! Get(123)
- expectMsg(0)
-
- counterRegion ! EntityEnvelope(123, Increment)
- counterRegion ! Get(123)
- expectMsg(1)
A more comprehensive sample is available in the Lightbend Activator tutorial named Akka Cluster Sharding with Scala!.
The ShardRegion
actor is started on each
node in the cluster, or group of nodes tagged with a specific role. TheShardRegion
is created with two
application specific functions to extract the entity identifier and the
shard identifier from incoming messages. A shard is a group of entities
that will be managed together. For the first message in a specific shard
the ShardRegion
request the location of the
shard from a central coordinator, the ShardCoordinator
.
The ShardCoordinator
decides which ShardRegion
shall own the Shard
and informs that ShardRegion
.
The region will confirm this request and create the Shard
supervisor as a child
actor. The individual Entities
will then be created when
needed by the Shard
actor. Incoming messages
thus travel via the ShardRegion
and the Shard
to the target Entity
.
If
the shard home is another ShardRegion
instance messages will be
forwarded to that ShardRegion
instance instead. While
resolving the location of a shard incoming messages for that shard are
buffered and later delivered when the shard home is known. Subsequent
messages to the resolved shard can be delivered to the target
destination immediately without involving the ShardCoordinator
.
Scenario 1:
ShardRegion
instance R1.Scenario 2:
To
make sure that at most one instance of a specific entity actor is
running somewhere in the cluster it is important that all nodes have the
same view of where the shards are located. Therefore the shard
allocation decisions are taken by the central ShardCoordinator
,
which is running as a cluster singleton, i.e. one instance on the oldest
member among all cluster nodes or a group of nodes tagged with a
specific role.
The
logic that decides where a shard is to be located is defined in a
pluggable shard allocation strategy. The default implementation ShardCoordinator.LeastShardAllocationStrategy
allocates new shards to the ShardRegion
with
least number of previously allocated shards. This strategy can be
replaced by an application specific implementation.
To
be able to use newly added members in the cluster the coordinator
facilitates rebalancing of shards, i.e. migrate entities from one node
to another. In the rebalance process the coordinator first notifies all ShardRegion
actors that a handoff for a
shard has started. That means they will start buffering incoming
messages for that shard, in the same way as if the shard location is
unknown. During the rebalance process the coordinator will not answer
any requests for the location of shards that are being rebalanced, i.e.
local buffering will continue until the handoff is completed. TheShardRegion
responsible for the
rebalanced shard will stop all entities in that shard by sending the
specifiedhandOffStopMessage
(default PoisonPill
)
to them. When all entities have been terminated the ShardRegion
owning
the entities will acknowledge the handoff as completed to the
coordinator. Thereafter the coordinator will reply to requests for the
location of the shard and thereby allocate a new home for the shard and
then buffered messages in the ShardRegion
actors are delivered to the
new location. This means that the state of the entities are not
transferred or migrated. If the state of the entities are of importance
it should be persistent (durable), e.g. with Persistence,
so that it can be recovered at the new location.
The
logic that decides which shards to rebalance is defined in a pluggable
shard allocation strategy. The default implementation ShardCoordinator.LeastShardAllocationStrategy
picks shards for handoff
from theShardRegion
with most number of
previously allocated shards. They will then be allocated to the ShardRegion
with least number of
previously allocated shards, i.e. new members in the cluster. There is a
configurable threshold of how large the difference must be to begin the
rebalancing. This strategy can be replaced by an application specific
implementation.
The
state of shard locations in the ShardCoordinator
is persistent (durable)
with Persistence to survive failures. Since
it is running in a cluster Persistence must be configured with a
distributed journal. When a crashed or unreachable coordinator node has
been removed (via down) from the cluster a new ShardCoordinator
singleton actor will take
over and the state is recovered. During such a failure period shards
with known location are still available, while messages for new
(unknown) shards are buffered until the new ShardCoordinator
becomes available.
As
long as a sender uses the same ShardRegion
actor to deliver messages
to an entity actor the order of the messages is preserved. As long as
the buffer limit is not reached messages are delivered on a best effort
basis, with at-most once delivery semantics, in the same way as ordinary
message sending. Reliable end-to-end messaging, with at-least-once
semantics can be added by using AtLeastOnceDelivery
in Persistence.
Some additional latency is introduced for messages targeted to new or previously unused shards due to the round-trip to the coordinator. Rebalancing of shards may also add latency. This should be considered when designing the application specific shard resolution, e.g. to avoid too fine grained shards.
Instead
of using Persistence it is possible to use the Distributed
Data module
as storage for the state of the sharding coordinator. In such case the
state of the ShardCoordinator
will be replicated inside a
cluster by the Distributed
Datamodule with WriteMajority
/ReadMajority
consistency.
This mode can be enabled by setting configuration property:
- akka.cluster.sharding.state-store-mode = ddata
It is using the Distributed Data extension that must be running on all nodes in the cluster. Therefore you should add that extension to the configuration to make sure that it is started on all nodes:
- akka.extensions += "akka.cluster.ddata.DistributedData"
You
must explicitly add the akka-distributed-data-experimental
dependency to your build if
you use this mode. It is possible to remove akka-persistence
dependency from a project
if it is not used in user code and remember-entities
is off
.
Using it together with Remember Entities
shards will be recreated
after rebalancing, however will not be recreated after a clean cluster
start as the Sharding Coordinator state is empty after a clean cluster
start when using ddata mode. When Remember Entities
is on
Sharding
Region always keeps data usig persistence, no matter how State Store Mode
is set.
Warning
The ddata
mode is considered as “experimental” as of its introduction in
Akka 2.4.0, since it depends on the experimental Distributed Data
module.
The ShardRegion
actor can also be started
in proxy only mode, i.e. it will not host any entities itself, but knows
how to delegate messages to the right location. A ShardRegion
is started in proxy only
mode with the methodClusterSharding.startProxy
method.
If
the state of the entities are persistent you may stop entities that are
not used to reduce memory consumption. This is done by the application
specific implementation of the entity actors for example by defining
receive timeout (context.setReceiveTimeout
).
If a message is already enqueued to the entity when it stops itself the
enqueued message in the mailbox will be dropped. To support graceful
passivation without losing such messages the entity actor can send ShardRegion.Passivate
to its parent Shard
.
The specified wrapped message in Passivate
will be sent back to the
entity, which is then supposed to stop itself. Incoming messages will be
buffered by the Shard
between reception of Passivate
and termination of the
entity. Such buffered messages are thereafter delivered to a new
incarnation of the entity.
The
list of entities in each Shard
can be made persistent
(durable) by setting the rememberEntities
flag to true inClusterShardingSettings
when calling ClusterSharding.start
.
When configured to remember entities, whenever a Shard
is rebalanced onto another
node or recovers after a crash it will recreate all the entities which
were previously running in that Shard
. To
permanently stop entities, a Passivate
message must be sent to the
parent of the entity actor, otherwise the entity will be automatically
restarted after the entity restart backoff specified in the
configuration.
When rememberEntities
is set to false, a Shard
will not automatically
restart any entities after a rebalance or recovering from a crash.
Entities will only be started once the first message for that entity has
been received in theShard
.
Entities will not be restarted if they stop without using a Passivate
.
Note that the state of the entities themselves will not be restored unless they have been made persistent, e.g. withPersistence.
If
you need to use another supervisorStrategy
for the entity actors than
the default (restarting) strategy you need to create an intermediate
parent actor that defines the supervisorStrategy
to the child entity actor.
- class CounterSupervisor extends Actor {
- val counter = context.actorOf(Props[Counter], "theCounter")
-
- override val supervisorStrategy = OneForOneStrategy() {
- case _: IllegalArgumentException ⇒ SupervisorStrategy.Resume
- case _: ActorInitializationException ⇒ SupervisorStrategy.Stop
- case _: DeathPactException ⇒ SupervisorStrategy.Stop
- case _: Exception ⇒ SupervisorStrategy.Restart
- }
-
- def receive = {
- case msg ⇒ counter forward msg
- }
- }
You start such a supervisor in the same way as if it was the entity actor.
- ClusterSharding(system).start(
- typeName = "SupervisedCounter",
- entityProps = Props[CounterSupervisor],
- settings = ClusterShardingSettings(system),
- extractEntityId = extractEntityId,
- extractShardId = extractShardId)
Note that stopped entities will be started again when a new message is targeted to the entity.
You
can send the message ShardRegion.GracefulShutdown
message to the ShardRegion
actor to handoff all shards
that are hosted by that ShardRegion
and then the ShardRegion
actor will be stopped. You
can watch
theShardRegion
actor to know when it is
completed. During this period other regions will buffer messages for
those shards in the same way as when a rebalance is triggered by the
coordinator. When the shards have been stopped the coordinator will
allocate these shards elsewhere.
When
the ShardRegion
has terminated you probably
want to leave
the cluster, and shut down
the ActorSystem
.
This is how to do that:
- class IllustrateGracefulShutdown extends Actor {
- val system = context.system
- val cluster = Cluster(system)
- val region = ClusterSharding(system).shardRegion("Entity")
-
- def receive = {
- case "leave" ⇒
- context.watch(region)
- region ! ShardRegion.GracefulShutdown
-
- case Terminated(`region`) ⇒
- cluster.registerOnMemberRemoved(self ! "member-removed")
- cluster.leave(cluster.selfAddress)
-
- case "member-removed" ⇒
- // Let singletons hand over gracefully before stopping the system
- import context.dispatcher
- system.scheduler.scheduleOnce(10.seconds, self, "stop-system")
-
- case "stop-system" ⇒
- system.terminate()
- }
- }
The Cluster Sharding coordinator stores the locations of the shards using Akka Persistence. This data can safely be removed when restarting the whole Akka Cluster. Note that this is not application data.
There
is a utility program akka.cluster.sharding.RemoveInternalClusterShardingData
that removes this data.
Warning
Never use this program while there are running Akka Cluster nodes that are using Cluster Sharding. Stop all Cluster nodes before using this program.
It can be needed to remove the data if the Cluster Sharding coordinator cannot startup because of corrupt data, which may happen if accidentally two clusters were running at the same time, e.g. caused by using auto-down and there was a network partition.
Warning
Don't use Cluster Sharding together with Automatic Downing, since it allows the cluster to split up into two separate clusters, which in turn will result in multiple shards and entities being started, one in each separate cluster! See Downing.
Use this program as a standalone Java main program:
- java -classpath <jar files, including akka-cluster-sharding>
- akka.cluster.sharding.RemoveInternalClusterShardingData
- -2.3 entityType1 entityType2 entityType3
The
program is included in the akka-cluster-sharding
jar file. It is easiest to
run it with same classpath and configuration as your ordinary
application. It can be run from sbt or maven in similar way.
Specify
the entity type names (same as you use in the start
method of ClusterSharding
)
as program arguments.
If
you specify -2.3
as the first program
argument it will also try to remove data that was stored by Cluster
Sharding in Akka 2.3.x using different persistenceId.
To use the Cluster Sharding you must add the following dependency in your project.
sbt:
- "com.typesafe.akka" %% "akka-cluster-sharding" % "2.4.10"
maven:
- <dependency>
- <groupId>com.typesafe.akka</groupId>
- <artifactId>akka-cluster-sharding_2.11</artifactId>
- <version>2.4.10</version>
- </dependency>
The ClusterSharding
extension can be configured
with the following properties. These configuration properties are read
by the ClusterShardingSettings
when created with a ActorSystem
parameter. It is also
possible to amend the ClusterShardingSettings
or create it from another
config section with the same layout as below.ClusterShardingSettings
is a parameter to the start
method of the ClusterSharding
extension, i.e. each each
entity type can be configured with different settings if needed.
- # Settings for the ClusterShardingExtension
- akka.cluster.sharding {
-
- # The extension creates a top level actor with this name in top level system scope,
- # e.g. '/system/sharding'
- guardian-name = sharding
-
- # Specifies that entities runs on cluster nodes with a specific role.
- # If the role is not specified (or empty) all nodes in the cluster are used.
- role = ""
-
- # When this is set to 'on' the active entity actors will automatically be restarted
- # upon Shard restart. i.e. if the Shard is started on a different ShardRegion
- # due to rebalance or crash.
- remember-entities = off
-
- # If the coordinator can't store state changes it will be stopped
- # and started again after this duration, with an exponential back-off
- # of up to 5 times this duration.
- coordinator-failure-backoff = 5 s
-
- # The ShardRegion retries registration and shard location requests to the
- # ShardCoordinator with this interval if it does not reply.
- retry-interval = 2 s
-
- # Maximum number of messages that are buffered by a ShardRegion actor.
- buffer-size = 100000
-
- # Timeout of the shard rebalancing process.
- handoff-timeout = 60 s
-
- # Time given to a region to acknowledge it's hosting a shard.
- shard-start-timeout = 10 s
-
- # If the shard is remembering entities and can't store state changes
- # will be stopped and then started again after this duration. Any messages
- # sent to an affected entity may be lost in this process.
- shard-failure-backoff = 10 s
-
- # If the shard is remembering entities and an entity stops itself without
- # using passivate. The entity will be restarted after this duration or when
- # the next message for it is received, which ever occurs first.
- entity-restart-backoff = 10 s
-
- # Rebalance check is performed periodically with this interval.
- rebalance-interval = 10 s
-
- # Absolute path to the journal plugin configuration entity that is to be
- # used for the internal persistence of ClusterSharding. If not defined
- # the default journal plugin is used. Note that this is not related to
- # persistence used by the entity actors.
- journal-plugin-id = ""
-
- # Absolute path to the snapshot plugin configuration entity that is to be
- # used for the internal persistence of ClusterSharding. If not defined
- # the default snapshot plugin is used. Note that this is not related to
- # persistence used by the entity actors.
- snapshot-plugin-id = ""
-
- # Parameter which determines how the coordinator will be store a state
- # valid values either "persistence" or "ddata"
- # The "ddata" mode is experimental, since it depends on the experimental
- # module akka-distributed-data-experimental.
- state-store-mode = "persistence"
-
- # The shard saves persistent snapshots after this number of persistent
- # events. Snapshots are used to reduce recovery times.
- snapshot-after = 1000
-
- # Setting for the default shard allocation strategy
- least-shard-allocation-strategy {
- # Threshold of how large the difference between most and least number of
- # allocated shards must be to begin the rebalancing.
- rebalance-threshold = 10
-
- # The number of ongoing rebalancing processes is limited to this number.
- max-simultaneous-rebalance = 3
- }
-
- # Timeout of waiting the initial distributed state (an initial state will be queried again if the timeout happened)
- # works only for state-store-mode = "ddata"
- waiting-for-state-timeout = 5 s
-
- # Timeout of waiting for update the distributed state (update will be retried if the timeout happened)
- # works only for state-store-mode = "ddata"
- updating-state-timeout = 5 s
-
- # The shard uses this strategy to determines how to recover the underlying entity actors. The strategy is only used
- # by the persistent shard when rebalancing or restarting. The value can either be "all" or "constant". The "all"
- # strategy start all the underlying entity actors at the same time. The constant strategy will start the underlying
- # entity actors at a fix rate. The default strategy "all".
- entity-recovery-strategy = "all"
-
- # Default settings for the constant rate entity recovery strategy
- entity-recovery-constant-rate-strategy {
- # Sets the frequency at which a batch of entity actors is started.
- frequency = 100 ms
- # Sets the number of entity actors to be restart at a particular interval
- number-of-entities = 5
- }
-
- # Settings for the coordinator singleton. Same layout as akka.cluster.singleton.
- # The "role" of the singleton configuration is not used. The singleton role will
- # be the same as "akka.cluster.sharding.role".
- coordinator-singleton = ${akka.cluster.singleton}
-
- # The id of the dispatcher to use for ClusterSharding actors.
- # If not specified default dispatcher is used.
- # If specified you need to define the settings of the actual dispatcher.
- # This dispatcher for the entity actors is defined by the user provided
- # Props, i.e. this dispatcher is not used for the entity actors.
- use-dispatcher = ""
- }
Custom
shard allocation strategy can be defined in an optional parameter to ClusterSharding.start
.
See the API documentation of ShardAllocationStrategy
for details of how to
implement a custom shard allocation strategy.
Two requests to inspect the cluster state are available:
ShardRegion.GetShardRegionState
which will return a ShardRegion.CurrentShardRegionState
that contains the
identifiers of the shards running in a Region and what entities are
alive for each of them.
ShardRegion.GetClusterShardingStats
which will query all the
regions in the cluster and return aShardRegion.ClusterShardingStats
containing the identifiers
of the shards running in each region and a count of entities that are
alive in each shard.
The purpose of these messages is testing and monitoring, they are not provided to give access to directly sending messages to the individual entities.
The member nodes of the cluster can collect system health metrics and publish that to other cluster nodes and to the registered subscribers on the system event bus with the help of Cluster Metrics Extension.
Cluster metrics information is primarily used for load-balancing routers, and can also be used to implement advanced metrics-based node life cycles, such as "Node Let-it-crash" when CPU steal time becomes excessive.
Cluster
Metrics Extension is a separate Akka module delivered in akka-cluster-metrics
jar.
To enable usage of the extension you need to add the following dependency to your project:
- "com.typesafe.akka" % "akka-cluster-metrics_2.11" % "2.4.10"
and
add the following configuration stanza to your application.conf
- akka.extensions = [ "akka.cluster.metrics.ClusterMetricsExtension" ]
Make
sure to disable legacy metrics in akka-cluster: akka.cluster.metrics.enabled=off
,
since it is still enabled in akka-cluster by default (for compatibility
with past releases).
Cluster members with status WeaklyUp, if that feature is enabled, will participate in Cluster Metrics collection and dissemination.
Metrics
collection is delegated to an implementation of akka.cluster.metrics.MetricsCollector
.
Different collector implementations provide different subsets of metrics published to the cluster. Certain message routing and let-it-crash functions may not work when Sigar is not provisioned.
Cluster metrics extension comes with two built-in collector implementations:
akka.cluster.metrics.SigarMetricsCollector
,
which requires Sigar provisioning, and is more rich/preciseakka.cluster.metrics.JmxMetricsCollector
,
which is used as fall back, and is less rich/preciseYou can also plug-in your own metrics collector implementation.
By default, metrics extension will use collector provider fall back and will try to load them in this order:
akka.cluster.metrics.SigarMetricsCollector
akka.cluster.metrics.JmxMetricsCollector
Metrics extension periodically publishes current snapshot of the cluster metrics to the node system event bus.
The
publication period is controlled by the akka.cluster.metrics.collector.sample-period
setting.
The
payload of the akka.cluster.metrics.ClusterMetricsChanged
event will contain latest
metrics of the node as well as other cluster member nodes metrics gossip
which was received during the collector sample period.
You can subscribe your metrics listener actors to these events in order to implement custom node lifecycle
- ClusterMetricsExtension(system).subscribe(metricsListenerActor)
Both user-provided and built-in metrics collectors can optionally use Hyperic Sigar for a wider and more accurate range of metrics compared to what can be retrieved from ordinary JMX MBeans.
Sigar is using a native o/s library, and requires library provisioning, i.e. deployment, extraction and loading of the o/s native library into JVM at runtime.
User can provision Sigar classes and native library in one of the following ways:
java -javaagent:/path/to/sigar-loader.jar
.
Kamon sigar loader agent will extract and load sigar library during
JVM start.sigar.jar
on the classpath
and Sigar native library
for the o/s on the java.library.path
.
User is required to manage both project dependency and library
deployment manually.Warning
When
using Kamon
sigar-loader and
running multiple instances of the same application on the same host,
you have to make sure that sigar library is extracted to a unique per
instance directory. You can control the extract directory with the akka.cluster.metrics.native-library-extract-folder
configuration setting.
To enable usage of Sigar you can add the following dependency to the user project
- "io.kamon" % "sigar-loader" % "1.6.6-rev002"
You can download Kamon sigar-loader from Maven Central
The AdaptiveLoadBalancingPool
/ AdaptiveLoadBalancingGroup
performs load balancing of
messages to cluster nodes based on the cluster metrics data. It uses
random selection of routees with probabilities derived from the
remaining capacity of the corresponding node. It can be configured to
use a specific MetricsSelector to produce the probabilities, a.k.a.
weights:
heap
/ HeapMetricsSelector
- Used and max JVM heap
memory. Weights based on remaining heap capacity; (max - used) / maxload
/ SystemLoadAverageMetricsSelector
- System load average for
the past 1 minute, corresponding value can be found in top
of Linux systems. The
system is possibly nearing a bottleneck if the system load average is
nearing number of cpus/cores. Weights based on remaining load
capacity; 1 - (load / processors)cpu
/ CpuMetricsSelector
- CPU utilization in
percentage, sum of User + Sys + Nice + Wait. Weights based on
remaining cpu capacity; 1 - utilizationmix
/ MixMetricsSelector
- Combines heap, cpu and
load. Weights based on mean of remaining capacity of the combined
selectors.akka.cluster.metrics.MetricsSelector
The collected metrics values are smoothed with exponential weighted moving average. In the Configuration you can adjust how quickly past data is decayed compared to new data.
Let's take a look at this router in action. What can be more demanding than calculating factorials?
The backend worker that performs the factorial calculation:
- class FactorialBackend extends Actor with ActorLogging {
-
- import context.dispatcher
-
- def receive = {
- case (n: Int) =>
- Future(factorial(n)) map { result => (n, result) } pipeTo sender()
- }
-
- def factorial(n: Int): BigInt = {
- @tailrec def factorialAcc(acc: BigInt, n: Int): BigInt = {
- if (n <= 1) acc
- else factorialAcc(acc * n, n - 1)
- }
- factorialAcc(BigInt(1), n)
- }
-
- }
The frontend that receives user jobs and delegates to the backends via the router:
- class FactorialFrontend(upToN: Int, repeat: Boolean) extends Actor with ActorLogging {
-
- val backend = context.actorOf(FromConfig.props(),
- name = "factorialBackendRouter")
-
- override def preStart(): Unit = {
- sendJobs()
- if (repeat) {
- context.setReceiveTimeout(10.seconds)
- }
- }
-
- def receive = {
- case (n: Int, factorial: BigInt) =>
- if (n == upToN) {
- log.debug("{}! = {}", n, factorial)
- if (repeat) sendJobs()
- else context.stop(self)
- }
- case ReceiveTimeout =>
- log.info("Timeout")
- sendJobs()
- }
-
- def sendJobs(): Unit = {
- log.info("Starting batch of factorials up to [{}]", upToN)
- 1 to upToN foreach { backend ! _ }
- }
- }
As you can see, the router is defined in the same way as other routers, and in this case it is configured as follows:
- akka.actor.deployment {
- /factorialFrontend/factorialBackendRouter = {
- # Router type provided by metrics extension.
- router = cluster-metrics-adaptive-group
- # Router parameter specific for metrics extension.
- # metrics-selector = heap
- # metrics-selector = load
- # metrics-selector = cpu
- metrics-selector = mix
- #
- routees.paths = ["/user/factorialBackend"]
- cluster {
- enabled = on
- use-role = backend
- allow-local-routees = off
- }
- }
- }
It
is only router
type and the metrics-selector
parameter that is specific
to this router, other things work in the same way as other routers.
The same type of router could also have been defined in code:
- import akka.cluster.routing.ClusterRouterGroup
- import akka.cluster.routing.ClusterRouterGroupSettings
- import akka.cluster.metrics.AdaptiveLoadBalancingGroup
- import akka.cluster.metrics.HeapMetricsSelector
-
- val backend = context.actorOf(
- ClusterRouterGroup(AdaptiveLoadBalancingGroup(HeapMetricsSelector),
- ClusterRouterGroupSettings(
- totalInstances = 100, routeesPaths = List("/user/factorialBackend"),
- allowLocalRoutees = true, useRole = Some("backend"))).props(),
- name = "factorialBackendRouter2")
- import akka.cluster.routing.ClusterRouterPool
- import akka.cluster.routing.ClusterRouterPoolSettings
- import akka.cluster.metrics.AdaptiveLoadBalancingPool
- import akka.cluster.metrics.SystemLoadAverageMetricsSelector
-
- val backend = context.actorOf(
- ClusterRouterPool(AdaptiveLoadBalancingPool(
- SystemLoadAverageMetricsSelector), ClusterRouterPoolSettings(
- totalInstances = 100, maxInstancesPerNode = 3,
- allowLocalRoutees = false, useRole = Some("backend"))).props(Props[FactorialBackend]),
- name = "factorialBackendRouter3")
The Lightbend Activator tutorial named Akka Cluster Samples with Scala. contains the full source code and instructions of how to run the Adaptive Load Balancing sample.
It is possible to subscribe to the metrics events directly to implement other functionality.
- import akka.actor.ActorLogging
- import akka.actor.Actor
- import akka.cluster.Cluster
- import akka.cluster.metrics.ClusterMetricsEvent
- import akka.cluster.metrics.ClusterMetricsChanged
- import akka.cluster.ClusterEvent.CurrentClusterState
- import akka.cluster.metrics.NodeMetrics
- import akka.cluster.metrics.StandardMetrics.HeapMemory
- import akka.cluster.metrics.StandardMetrics.Cpu
- import akka.cluster.metrics.ClusterMetricsExtension
-
- class MetricsListener extends Actor with ActorLogging {
- val selfAddress = Cluster(context.system).selfAddress
- val extension = ClusterMetricsExtension(context.system)
-
- // Subscribe unto ClusterMetricsEvent events.
- override def preStart(): Unit = extension.subscribe(self)
-
- // Unsubscribe from ClusterMetricsEvent events.
- override def postStop(): Unit = extension.unsubscribe(self)
-
- def receive = {
- case ClusterMetricsChanged(clusterMetrics) =>
- clusterMetrics.filter(_.address == selfAddress) foreach { nodeMetrics =>
- logHeap(nodeMetrics)
- logCpu(nodeMetrics)
- }
- case state: CurrentClusterState => // Ignore.
- }
-
- def logHeap(nodeMetrics: NodeMetrics): Unit = nodeMetrics match {
- case HeapMemory(address, timestamp, used, committed, max) =>
- log.info("Used heap: {} MB", used.doubleValue / 1024 / 1024)
- case _ => // No heap info.
- }
-
- def logCpu(nodeMetrics: NodeMetrics): Unit = nodeMetrics match {
- case Cpu(address, timestamp, Some(systemLoadAverage), cpuCombined, cpuStolen, processors) =>
- log.info("Load: {} ({} processors)", systemLoadAverage, processors)
- case _ => // No cpu info.
- }
- }
Metrics
collection is delegated to the implementation of akka.cluster.metrics.MetricsCollector
You
can plug-in your own metrics collector instead of built-in akka.cluster.metrics.SigarMetricsCollector
orakka.cluster.metrics.JmxMetricsCollector
.
Look at those two implementations for inspiration.
Custom
metrics collector implementation class must be specified in theakka.cluster.metrics.collector.provider
configuration property.
The Cluster metrics extension can be configured with the following properties:
- ##############################################
- # Akka Cluster Metrics Reference Config File #
- ##############################################
-
- # This is the reference config file that contains all the default settings.
- # Make your edits in your application.conf in order to override these settings.
-
- # Sigar provisioning:
- #
- # User can provision sigar classes and native library in one of the following ways:
- #
- # 1) Use https://github.com/kamon-io/sigar-loader Kamon sigar-loader as a project dependency for the user project.
- # Metrics extension will extract and load sigar library on demand with help of Kamon sigar provisioner.
- #
- # 2) Use https://github.com/kamon-io/sigar-loader Kamon sigar-loader as java agent: `java -javaagent:/path/to/sigar-loader.jar`
- # Kamon sigar loader agent will extract and load sigar library during JVM start.
- #
- # 3) Place `sigar.jar` on the `classpath` and sigar native library for the o/s on the `java.library.path`
- # User is required to manage both project dependency and library deployment manually.
-
- # Cluster metrics extension.
- # Provides periodic statistics collection and publication throughout the cluster.
- akka.cluster.metrics {
- # Full path of dispatcher configuration key.
- # Use "" for default key `akka.actor.default-dispatcher`.
- dispatcher = ""
- # How long should any actor wait before starting the periodic tasks.
- periodic-tasks-initial-delay = 1s
- # Sigar native library extract location.
- # Use per-application-instance scoped location, such as program working directory.
- native-library-extract-folder = ${user.dir}"/native"
- # Metrics supervisor actor.
- supervisor {
- # Actor name. Example name space: /system/cluster-metrics
- name = "cluster-metrics"
- # Supervision strategy.
- strategy {
- #
- # FQCN of class providing `akka.actor.SupervisorStrategy`.
- # Must have a constructor with signature `<init>(com.typesafe.config.Config)`.
- # Default metrics strategy provider is a configurable extension of `OneForOneStrategy`.
- provider = "akka.cluster.metrics.ClusterMetricsStrategy"
- #
- # Configuration of the default strategy provider.
- # Replace with custom settings when overriding the provider.
- configuration = {
- # Log restart attempts.
- loggingEnabled = true
- # Child actor restart-on-failure window.
- withinTimeRange = 3s
- # Maximum number of restart attempts before child actor is stopped.
- maxNrOfRetries = 3
- }
- }
- }
- # Metrics collector actor.
- collector {
- # Enable or disable metrics collector for load-balancing nodes.
- # Metrics collection can also be controlled at runtime by sending control messages
- # to /system/cluster-metrics actor: `akka.cluster.metrics.{CollectionStartMessage,CollectionStopMessage}`
- enabled = on
- # FQCN of the metrics collector implementation.
- # It must implement `akka.cluster.metrics.MetricsCollector` and
- # have public constructor with akka.actor.ActorSystem parameter.
- # Will try to load in the following order of priority:
- # 1) configured custom collector 2) internal `SigarMetricsCollector` 3) internal `JmxMetricsCollector`
- provider = ""
- # Try all 3 available collector providers, or else fail on the configured custom collector provider.
- fallback = true
- # How often metrics are sampled on a node.
- # Shorter interval will collect the metrics more often.
- # Also controls frequency of the metrics publication to the node system event bus.
- sample-interval = 3s
- # How often a node publishes metrics information to the other nodes in the cluster.
- # Shorter interval will publish the metrics gossip more often.
- gossip-interval = 3s
- # How quickly the exponential weighting of past data is decayed compared to
- # new data. Set lower to increase the bias toward newer values.
- # The relevance of each data sample is halved for every passing half-life
- # duration, i.e. after 4 times the half-life, a data sample’s relevance is
- # reduced to 6% of its original relevance. The initial relevance of a data
- # sample is given by 1 – 0.5 ^ (collect-interval / half-life).
- # See http://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
- moving-average-half-life = 12s
- }
- }
-
- # Cluster metrics extension serializers and routers.
- akka.actor {
- # Protobuf serializer for remote cluster metrics messages.
- serializers {
- akka-cluster-metrics = "akka.cluster.metrics.protobuf.MessageSerializer"
- }
- # Interface binding for remote cluster metrics messages.
- serialization-bindings {
- "akka.cluster.metrics.ClusterMetricsMessage" = akka-cluster-metrics
- }
- # Globally unique metrics extension serializer identifier.
- serialization-identifiers {
- "akka.cluster.metrics.protobuf.MessageSerializer" = 10
- }
- # Provide routing of messages based on cluster metrics.
- router.type-mapping {
- cluster-metrics-adaptive-pool = "akka.cluster.metrics.AdaptiveLoadBalancingPool"
- cluster-metrics-adaptive-group = "akka.cluster.metrics.AdaptiveLoadBalancingGroup"
- }
- }
Akka Distributed Data is useful when you need to share data between nodes in an Akka Cluster. The data is accessed with an actor providing a key-value store like API. The keys are unique identifiers with type information of the data values. The values are Conflict Free Replicated Data Types (CRDTs).
All data entries are spread to all nodes, or nodes with a certain role, in the cluster via direct replication and gossip based dissemination. You have fine grained control of the consistency level for reads and writes.
The nature CRDTs makes it possible to perform updates from any node without coordination. Concurrent updates from different nodes will automatically be resolved by the monotonic merge function, which all data types must provide. The state changes always converge. Several useful data types for counters, sets, maps and registers are provided and you can also implement your own custom data types.
It is eventually consistent and geared toward providing high read and write availability (partition tolerance), with low latency. Note that in an eventually consistent system a read may return an out-of-date value.
Warning
This
module is marked as “experimental” as of its introduction in
Akka 2.4.0. We will continue to improve this API based on our users’
feedback, which implies that while we try to keep incompatible changes
to a minimum the binary compatibility guarantee for maintenance releases
does not apply to the contents of theakka.persistence
package.
The akka.cluster.ddata.Replicator
actor provides the API for
interacting with the data. The Replicator
actor must be started on
each node in the cluster, or group of nodes tagged with a specific role.
It communicates with otherReplicator
instances with the same
path (without address) that are running on other nodes . For convenience
it can be used with the akka.cluster.ddata.DistributedData
extension.
Cluster members with status WeaklyUp, if that feature is enabled, will currently not participate in Distributed Data, but that is something that should be possible to add in a future release.
Below
is an example of an actor that schedules tick messages to itself and for
each tick adds or removes elements from a ORSet
(observed-remove set). It
also subscribes to changes of this.
- import java.util.concurrent.ThreadLocalRandom
- import akka.actor.Actor
- import akka.actor.ActorLogging
- import akka.cluster.Cluster
- import akka.cluster.ddata.DistributedData
- import akka.cluster.ddata.ORSet
- import akka.cluster.ddata.ORSetKey
- import akka.cluster.ddata.Replicator
- import akka.cluster.ddata.Replicator._
-
- object DataBot {
- private case object Tick
- }
-
- class DataBot extends Actor with ActorLogging {
- import DataBot._
-
- val replicator = DistributedData(context.system).replicator
- implicit val node = Cluster(context.system)
-
- import context.dispatcher
- val tickTask = context.system.scheduler.schedule(5.seconds, 5.seconds, self, Tick)
-
- val DataKey = ORSetKey[String]("key")
-
- replicator ! Subscribe(DataKey, self)
-
- def receive = {
- case Tick =>
- val s = ThreadLocalRandom.current().nextInt(97, 123).toChar.toString
- if (ThreadLocalRandom.current().nextBoolean()) {
- // add
- log.info("Adding: {}", s)
- replicator ! Update(DataKey, ORSet.empty[String], WriteLocal)(_ + s)
- } else {
- // remove
- log.info("Removing: {}", s)
- replicator ! Update(DataKey, ORSet.empty[String], WriteLocal)(_ - s)
- }
-
- case _: UpdateResponse[_] => // ignore
-
- case c @ Changed(DataKey) =>
- val data = c.get(DataKey)
- log.info("Current elements: {}", data.elements)
- }
-
- override def postStop(): Unit = tickTask.cancel()
-
- }
To
modify and replicate a data value you send a Replicator.Update
message to the local Replicator
.
The
current data value for the key
of the Update
is passed as parameter to
the modify
function of the Update
.
The function is supposed to return the new value of the data, which
will then be replicated according to the given consistency level.
The modify
function is called by the Replicator
actor and must therefore
be a pure function that only uses the data parameter and stable fields
from enclosing scope. It must for example not access sender()
reference of an enclosing
actor.
Update
is intended to only be
sent from an actor running in same local ActorSystem
asYou supply a write consistency level which has the following meaning:
WriteLocal
the value will
immediately only be written to the local replica, and later
disseminated with gossipWriteTo(n)
the value will
immediately be written to at least n
replicas, including the
local replicaWriteMajority
the value will
immediately be written to a majority of replicas, i.e. at least N/2
+ 1 replicas,
where N is the number of nodes in the cluster (or cluster role
group)WriteAll
the value will
immediately be written to all nodes in the cluster (or all nodes in
the cluster role group)- implicit val node = Cluster(system)
- val replicator = DistributedData(system).replicator
-
- val Counter1Key = PNCounterKey("counter1")
- val Set1Key = GSetKey[String]("set1")
- val Set2Key = ORSetKey[String]("set2")
- val ActiveFlagKey = FlagKey("active")
-
- replicator ! Update(Counter1Key, PNCounter(), WriteLocal)(_ + 1)
-
- val writeTo3 = WriteTo(n = 3, timeout = 1.second)
- replicator ! Update(Set1Key, GSet.empty[String], writeTo3)(_ + "hello")
-
- val writeMajority = WriteMajority(timeout = 5.seconds)
- replicator ! Update(Set2Key, ORSet.empty[String], writeMajority)(_ + "hello")
-
- val writeAll = WriteAll(timeout = 5.seconds)
- replicator ! Update(ActiveFlagKey, Flag.empty, writeAll)(_.switchOn)
As
reply of the Update
a Replicator.UpdateSuccess
is sent to the sender of
the Update
if the value was
successfully replicated according to the supplied consistency level
within the supplied timeout. Otherwise aReplicator.UpdateFailure
subclass is sent back.
Note that a Replicator.UpdateTimeout
reply does not mean that
the update completely failed or was rolled back. It may still have
been replicated to some nodes, and will eventually be replicated to
all nodes with the gossip protocol.
- case UpdateSuccess(Counter1Key, req) => // ok
- case UpdateSuccess(Set1Key, req) => // ok
- case UpdateTimeout(Set1Key, req) =>
- // write to 3 nodes failed within 1.second
You
will always see your own writes. For example if you send two Update
messages changing the
value of the samekey
,
the modify
function of the second
message will see the change that was performed by the first Update
message.
In
the Update
message you can pass an
optional request context, which the Replicator
does not care about, but
is included in the reply messages. This is a convenient way to pass
contextual information (e.g. original sender) without having to use ask
or maintain local
correlation data structures.
- implicit val node = Cluster(system)
- val replicator = DistributedData(system).replicator
- val writeTwo = WriteTo(n = 2, timeout = 3.second)
- val Counter1Key = PNCounterKey("counter1")
-
- def receive: Receive = {
- case "increment" =>
- // incoming command to increase the counter
- val upd = Update(Counter1Key, PNCounter(), writeTwo, request = Some(sender()))(_ + 1)
- replicator ! upd
-
- case UpdateSuccess(Counter1Key, Some(replyTo: ActorRef)) =>
- replyTo ! "ack"
- case UpdateTimeout(Counter1Key, Some(replyTo: ActorRef)) =>
- replyTo ! "nack"
- }
To
retrieve the current value of a data you send Replicator.Get
message to the Replicator
.
You supply a consistency level which has the following meaning:
ReadLocal
the value will only be
read from the local replicaReadFrom(n)
the value will be read
and merged from n
replicas, including the
local replicaReadMajority
the value will be read
and merged from a majority of replicas, i.e. at least N/2 + 1 replicas, where N is
the number of nodes in the cluster (or cluster role group)ReadAll
the value will be read
and merged from all nodes in the cluster (or all nodes in the
cluster role group)- val replicator = DistributedData(system).replicator
- val Counter1Key = PNCounterKey("counter1")
- val Set1Key = GSetKey[String]("set1")
- val Set2Key = ORSetKey[String]("set2")
- val ActiveFlagKey = FlagKey("active")
-
- replicator ! Get(Counter1Key, ReadLocal)
-
- val readFrom3 = ReadFrom(n = 3, timeout = 1.second)
- replicator ! Get(Set1Key, readFrom3)
-
- val readMajority = ReadMajority(timeout = 5.seconds)
- replicator ! Get(Set2Key, readMajority)
-
- val readAll = ReadAll(timeout = 5.seconds)
- replicator ! Get(ActiveFlagKey, readAll)
As
reply of the Get
a Replicator.GetSuccess
is sent to the sender of
the Get
if the value was
successfully retrieved according to the supplied consistency level
within the supplied timeout. Otherwise aReplicator.GetFailure
is sent. If the key does
not exist the reply will be Replicator.NotFound
.
- case g @ GetSuccess(Counter1Key, req) =>
- val value = g.get(Counter1Key).value
- case NotFound(Counter1Key, req) => // key counter1 does not exist
- case g @ GetSuccess(Set1Key, req) =>
- val elements = g.get(Set1Key).elements
- case GetFailure(Set1Key, req) =>
- // read from 3 nodes failed within 1.second
- case NotFound(Set1Key, req) => // key set1 does not exist
You
will always read your own writes. For example if you send a Update
message followed by a Get
of the same key
the Get
will retrieve the change
that was performed by the preceding Update
message. However, the
order of the reply messages are not defined, i.e. in the previous
example you may receive the GetSuccess
before theUpdateSuccess
.
In
the Get
message you can pass an
optional request context in the same way as for the Update
message, described above.
For example the original sender can be passed and replied to after
receiving and transforming GetSuccess
.
- implicit val node = Cluster(system)
- val replicator = DistributedData(system).replicator
- val readTwo = ReadFrom(n = 2, timeout = 3.second)
- val Counter1Key = PNCounterKey("counter1")
-
- def receive: Receive = {
- case "get-count" =>
- // incoming request to retrieve current value of the counter
- replicator ! Get(Counter1Key, readTwo, request = Some(sender()))
-
- case g @ GetSuccess(Counter1Key, Some(replyTo: ActorRef)) =>
- val value = g.get(Counter1Key).value.longValue
- replyTo ! value
- case GetFailure(Counter1Key, Some(replyTo: ActorRef)) =>
- replyTo ! -1L
- case NotFound(Counter1Key, Some(replyTo: ActorRef)) =>
- replyTo ! 0L
- }
The consistency level that is supplied in the Update and Get specifies per request how many replicas that must respond successfully to a write and read request.
For
low latency reads you use ReadLocal
with the risk of
retrieving stale data, i.e. updates from other nodes might not be
visible yet.
When
using WriteLocal
the update is only
written to the local replica and then disseminated in the background
with the gossip protocol, which can take few seconds to spread to all
nodes.
WriteAll
and ReadAll
is the strongest
consistency level, but also the slowest and with lowest availability.
For example, it is enough that one node is unavailable for a Get
request and you will not
receive the value.
If consistency is important, you can ensure that a read always reflects the most recent write by using the following formula:
- (nodes_written + nodes_read) > N
where
N is the total number of nodes in the cluster, or the number of nodes
with the role that is used for theReplicator
.
For example, in a 7 node cluster this these consistency properties are achieved by writing to 4 nodes and reading from 4 nodes, or writing to 5 nodes and reading from 3 nodes.
By
combining WriteMajority
and ReadMajority
levels a read always
reflects the most recent write. TheReplicator
writes and reads to a
majority of replicas, i.e. N / 2 + 1.
For example, in a 5 node cluster it writes to 3 nodes and reads from 3
nodes. In a 6 node cluster it writes to 4 nodes and reads from 4
nodes.
Here
is an example of using WriteMajority
and ReadMajority
:
- private val timeout = 3.seconds
- private val readMajority = ReadMajority(timeout)
- private val writeMajority = WriteMajority(timeout)
- def receiveGetCart: Receive = {
- case GetCart =>
- replicator ! Get(DataKey, readMajority, Some(sender()))
-
- case g @ GetSuccess(DataKey, Some(replyTo: ActorRef)) =>
- val data = g.get(DataKey)
- val cart = Cart(data.entries.values.toSet)
- replyTo ! cart
-
- case NotFound(DataKey, Some(replyTo: ActorRef)) =>
- replyTo ! Cart(Set.empty)
-
- case GetFailure(DataKey, Some(replyTo: ActorRef)) =>
- // ReadMajority failure, try again with local read
- replicator ! Get(DataKey, ReadLocal, Some(replyTo))
- }
- def receiveAddItem: Receive = {
- case cmd @ AddItem(item) =>
- val update = Update(DataKey, LWWMap.empty[LineItem], writeMajority, Some(cmd)) {
- cart => updateCart(cart, item)
- }
- replicator ! update
- }
In
some rare cases, when performing an Update
it is needed to first try
to fetch latest data from other nodes. That can be done by first
sending a Get
with ReadMajority
and then continue with
the Update
when the GetSuccess
,GetFailure
or NotFound
reply is received. This
might be needed when you need to base a decision on latest information
or when removing entries from ORSet
or ORMap
.
If an entry is added to an ORSet
or ORMap
from one node and removed
from another node the entry will only be removed if the added entry is
visible on the node where the removal is performed (hence the name
observed-removed set).
The following example illustrates how to do that:
- def receiveRemoveItem: Receive = {
- case cmd @ RemoveItem(productId) =>
- // Try to fetch latest from a majority of nodes first, since ORMap
- // remove must have seen the item to be able to remove it.
- replicator ! Get(DataKey, readMajority, Some(cmd))
-
- case GetSuccess(DataKey, Some(RemoveItem(productId))) =>
- replicator ! Update(DataKey, LWWMap(), writeMajority, None) {
- _ - productId
- }
-
- case GetFailure(DataKey, Some(RemoveItem(productId))) =>
- // ReadMajority failed, fall back to best effort local value
- replicator ! Update(DataKey, LWWMap(), writeMajority, None) {
- _ - productId
- }
-
- case NotFound(DataKey, Some(RemoveItem(productId))) =>
- // nothing to remove
- }
Warning
Caveat: Even
if you use WriteMajority
and ReadMajority
there is small risk
that you may read stale data if the cluster membership has changed
between the Update
and the Get
.
For example, in cluster of 5 nodes when you Update
and that change is
written to 3 nodes: n1, n2, n3. Then 2 more nodes are added and a Get
request is reading from
4 nodes, which happens to be n4, n5, n6, n7, i.e. the value on n1,
n2, n3 is not seen in the response of the Get
request.
You
may also register interest in change notifications by sending Replicator.Subscribe
message to theReplicator
.
It will send Replicator.Changed
messages to the
registered subscriber when the data for the subscribed key is updated.
Subscribers will be notified periodically with the configured notify-subscribers-interval
,
and it is also possible to send an explicit Replicator.FlushChanges
message to the Replicator
to notify the subscribers
immediately.
The
subscriber is automatically removed if the subscriber is terminated. A
subscriber can also be deregistered with theReplicator.Unsubscribe
message.
- val replicator = DistributedData(system).replicator
- val Counter1Key = PNCounterKey("counter1")
- // subscribe to changes of the Counter1Key value
- replicator ! Subscribe(Counter1Key, self)
- var currentValue = BigInt(0)
-
- def receive: Receive = {
- case c @ Changed(Counter1Key) =>
- currentValue = c.get(Counter1Key).value
- case "get-count" =>
- // incoming request to retrieve current value of the counter
- sender() ! currentValue
- }
A
data entry can be deleted by sending a Replicator.Delete
message to the local
local Replicator
.
As reply of theDelete
a Replicator.DeleteSuccess
is sent to the sender of
the Delete
if the value was
successfully deleted according to the supplied consistency level
within the supplied timeout. Otherwise aReplicator.ReplicationDeleteFailure
is sent. Note that ReplicationDeleteFailure
does not mean that the
delete completely failed or was rolled back. It may still have been
replicated to some nodes, and may eventually be replicated to all
nodes.
A
deleted key cannot be reused again, but it is still recommended to
delete unused data entries because that reduces the replication
overhead when new nodes join the cluster. Subsequent Delete
, Update
and Get
requests will be replied
with Replicator.DataDeleted
.
Subscribers will receive Replicator.DataDeleted
.
- val replicator = DistributedData(system).replicator
- val Counter1Key = PNCounterKey("counter1")
- val Set2Key = ORSetKey[String]("set2")
-
- replicator ! Delete(Counter1Key, WriteLocal)
-
- val writeMajority = WriteMajority(timeout = 5.seconds)
- replicator ! Delete(Set2Key, writeMajority)
Warning
As
deleted keys continue to be included in the stored data on each node
as well as in gossip messages, a continuous series of updates and
deletes of top-level entities will result in growing memory usage
until an ActorSystem runs out of memory. To use Akka Distributed
Data where frequent adds and removes are required, you should use a
fixed number of top-level data types that support both updates and
removals, for exampleORMap
or ORSet
.
The
data types must be convergent (stateful) CRDTs and implement the ReplicatedData
trait, i.e. they provide a
monotonic merge function and the state changes always converge.
You
can use your own custom ReplicatedData
types, and several types
are provided by this package, such as:
GCounter
, PNCounter
GSet
, ORSet
ORMap
, ORMultiMap
, LWWMap
, PNCounterMap
LWWRegister
, Flag
GCounter
is a "grow only counter".
It only supports increments, no decrements.
It
works in a similar way as a vector clock. It keeps track of one
counter per node and the total value is the sum of these counters. The merge
is implemented by taking
the maximum count for each node.
If
you need both increments and decrements you can use the PNCounter
(positive/negative
counter).
It
is tracking the increments (P) separate from the decrements (N). Both
P and N are represented as two internalGCounter
.
Merge is handled by merging the internal P and N counters. The value
of the counter is the value of the P counter minus the value of the N
counter.
- implicit val node = Cluster(system)
- val c0 = PNCounter.empty
- val c1 = c0 + 1
- val c2 = c1 + 7
- val c3: PNCounter = c2 - 2
- println(c3.value) // 6
Several
related counters can be managed in a map with the PNCounterMap
data type. When the
counters are placed in a PNCounterMap
as opposed to placing
them as separate top level values they are guaranteed to be replicated
together as one unit, which is sometimes necessary for related data.
- implicit val node = Cluster(system)
- val m0 = PNCounterMap.empty
- val m1 = m0.increment("a", 7)
- val m2 = m1.decrement("a", 2)
- val m3 = m2.increment("b", 1)
- println(m3.get("a")) // 5
- m3.entries.foreach { case (key, value) => println(s"$key -> $value") }
If
you only need to add elements to a set and not remove elements the GSet
(grow-only set) is the
data type to use. The elements can be any type of values that can be
serialized. Merge is simply the union of the two sets.
- val s0 = GSet.empty[String]
- val s1 = s0 + "a"
- val s2 = s1 + "b" + "c"
- if (s2.contains("a"))
- println(s2.elements) // a, b, c
If
you need add and remove operations you should use the ORSet
(observed-remove set).
Elements can be added and removed any number of times. If an element
is concurrently added and removed, the add will win. You cannot remove
an element that you have not seen.
The ORSet
has a version vector that
is incremented when an element is added to the set. The version for
the node that added the element is also tracked for each element in a
so called "birth dot". The version vector and the dots are used by the merge
function to track
causality of the operations and resolve concurrent updates.
- implicit val node = Cluster(system)
- val s0 = ORSet.empty[String]
- val s1 = s0 + "a"
- val s2 = s1 + "b"
- val s3 = s2 - "a"
- println(s3.elements) // b
ORMap
(observed-remove map) is
a map with String
keys and the values are ReplicatedData
types themselves. It
supports add, remove and delete any number of times for a map entry.
If
an entry is concurrently added and removed, the add will win. You
cannot remove an entry that you have not seen. This is the same
semantics as for the ORSet
.
If
an entry is concurrently updated to different values the values will
be merged, hence the requirement that the values must be ReplicatedData
types.
It
is rather inconvenient to use the ORMap
directly since it does
not expose specific types of the values. The ORMap
is intended as a low
level tool for building more specific maps, such as the following
specialized maps.
ORMultiMap
(observed-remove
multi-map) is a multi-map implementation that wraps an ORMap
with an ORSet
for the map's value.
PNCounterMap
(positive negative
counter map) is a map of named counters. It is a specialized ORMap
withPNCounter
values.
LWWMap
(last writer wins map) is
a specialized ORMap
with LWWRegister
(last writer wins
register) values.
- implicit val node = Cluster(system)
- val m0 = ORMultiMap.empty[Int]
- val m1 = m0 + ("a" -> Set(1, 2, 3))
- val m2 = m1.addBinding("a", 4)
- val m3 = m2.removeBinding("a", 2)
- val m4 = m3.addBinding("b", 1)
- println(m4.entries)
Note
that LWWRegister
and therefore LWWMap
relies on synchronized
clocks and should only be used when the choice of value is not
important for concurrent updates occurring within the clock skew.
Instead
of using timestamps based on System.currentTimeMillis()
time it is possible to
use a timestamp value based on something else, for example an
increasing version number from a database record that is used for
optimistic concurrency control.
When
a data entry is changed the full state of that entry is replicated to
other nodes, i.e. when you update a map the whole map is replicated.
Therefore, instead of using one ORMap
with 1000 elements it is
more efficient to split that up in 10 top level ORMap
entries with 100 elements
each. Top level entries are replicated individually, which has the
trade-off that different entries may not be replicated at the same
time and you may see inconsistencies between related entries. Separate
top level entries cannot be updated atomically together.
Flag
is a data type for a
boolean value that is initialized to false
and can be switched to true
.
Thereafter it cannot be changed. true
wins over false
in merge.
- val f0 = Flag.empty
- val f1 = f0.switchOn
- println(f1.enabled)
LWWRegister
(last writer wins
register) can hold any (serializable) value.
Merge
of a LWWRegister
takes the register with
highest timestamp. Note that this relies on synchronized clocks.LWWRegister should
only be used when the choice of value is not important for concurrent
updates occurring within the clock skew.
Merge
takes the register updated by the node with lowest address (UniqueAddress
is ordered) if the
timestamps are exactly the same.
- implicit val node = Cluster(system)
- val r1 = LWWRegister("Hello")
- val r2 = r1.withValue("Hi")
- println(s"${r1.value} by ${r1.updatedBy} at ${r1.timestamp}")
Instead
of using timestamps based on System.currentTimeMillis()
time it is possible to
use a timestamp value based on something else, for example an
increasing version number from a database record that is used for
optimistic concurrency control.
- case class Record(version: Int, name: String, address: String)
-
- implicit val node = Cluster(system)
- implicit val recordClock = new LWWRegister.Clock[Record] {
- override def apply(currentTimestamp: Long, value: Record): Long =
- value.version
- }
-
- val record1 = Record(version = 1, "Alice", "Union Square")
- val r1 = LWWRegister(record1)
-
- val record2 = Record(version = 2, "Alice", "Madison Square")
- val r2 = LWWRegister(record2)
-
- val r3 = r1.merge(r2)
- println(r3.value)
For
first-write-wins semantics you can use the LWWRegister#reverseClock
instead of theLWWRegister#defaultClock
.
You
can rather easily implement your own data types. The only requirement
is that it implements the merge
function of the ReplicatedData
trait.
A
nice property of stateful CRDTs is that they typically compose nicely,
i.e. you can combine several smaller data types to build richer data
structures. For example, the PNCounter
is composed of two
internal GCounter
instances to keep track
of increments and decrements separately.
Here
is s simple implementation of a custom TwoPhaseSet
that is using two
internal GSet
types to keep track of
addition and removals. A TwoPhaseSet
is a set where an element
may be added and removed, but never added again thereafter.
- case class TwoPhaseSet(
- adds: GSet[String] = GSet.empty,
- removals: GSet[String] = GSet.empty)
- extends ReplicatedData {
- type T = TwoPhaseSet
-
- def add(element: String): TwoPhaseSet =
- copy(adds = adds.add(element))
-
- def remove(element: String): TwoPhaseSet =
- copy(removals = removals.add(element))
-
- def elements: Set[String] = adds.elements diff removals.elements
-
- override def merge(that: TwoPhaseSet): TwoPhaseSet =
- copy(
- adds = this.adds.merge(that.adds),
- removals = this.removals.merge(that.removals))
- }
Data types should be immutable, i.e. "modifying" methods should return a new instance.
The
data types must be serializable with an Akka
Serializer. It is highly recommended that you
implement efficient serialization with Protobuf or similar for your
custom data types. The built in data types are marked withReplicatedDataSerialization
and serialized withakka.cluster.ddata.protobuf.ReplicatedDataSerializer
.
Serialization of the data types are used in remote messages and also for creating message digests (SHA-1) to detect changes. Therefore it is important that the serialization is efficient and produce the same bytes for the same content. For example sets and maps should be sorted deterministically in the serialization.
This
is a protobuf representation of the above TwoPhaseSet
:
- option java_package = "docs.ddata.protobuf.msg";
- option optimize_for = SPEED;
-
- message TwoPhaseSet {
- repeated string adds = 1;
- repeated string removals = 2;
- }
The
serializer for the TwoPhaseSet
:
- import java.util.ArrayList
- import java.util.Collections
- import scala.collection.JavaConverters._
- import akka.actor.ExtendedActorSystem
- import akka.cluster.ddata.GSet
- import akka.cluster.ddata.protobuf.SerializationSupport
- import akka.serialization.Serializer
- import docs.ddata.TwoPhaseSet
- import docs.ddata.protobuf.msg.TwoPhaseSetMessages
-
- class TwoPhaseSetSerializer(val system: ExtendedActorSystem)
- extends Serializer with SerializationSupport {
-
- override def includeManifest: Boolean = false
-
- override def identifier = 99999
-
- override def toBinary(obj: AnyRef): Array[Byte] = obj match {
- case m: TwoPhaseSet => twoPhaseSetToProto(m).toByteArray
- case _ => throw new IllegalArgumentException(
- s"Can't serialize object of type ${obj.getClass}")
- }
-
- override def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = {
- twoPhaseSetFromBinary(bytes)
- }
-
- def twoPhaseSetToProto(twoPhaseSet: TwoPhaseSet): TwoPhaseSetMessages.TwoPhaseSet = {
- val b = TwoPhaseSetMessages.TwoPhaseSet.newBuilder()
- // using java collections and sorting for performance (avoid conversions)
- val adds = new ArrayList[String]
- twoPhaseSet.adds.elements.foreach(adds.add)
- if (!adds.isEmpty) {
- Collections.sort(adds)
- b.addAllAdds(adds)
- }
- val removals = new ArrayList[String]
- twoPhaseSet.removals.elements.foreach(removals.add)
- if (!removals.isEmpty) {
- Collections.sort(removals)
- b.addAllRemovals(removals)
- }
- b.build()
- }
-
- def twoPhaseSetFromBinary(bytes: Array[Byte]): TwoPhaseSet = {
- val msg = TwoPhaseSetMessages.TwoPhaseSet.parseFrom(bytes)
- TwoPhaseSet(
- adds = GSet(msg.getAddsList.iterator.asScala.toSet),
- removals = GSet(msg.getRemovalsList.iterator.asScala.toSet))
- }
- }
Note that the elements of the sets are sorted so the SHA-1 digests are the same for the same elements.
You register the serializer in configuration:
- akka.actor {
- serializers {
- two-phase-set = "docs.ddata.protobuf.TwoPhaseSetSerializer"
- }
- serialization-bindings {
- "docs.ddata.TwoPhaseSet" = two-phase-set
- }
- }
Using
compression can sometimes be a good idea to reduce the data size.
Gzip compression is provided by theakka.cluster.ddata.protobuf.SerializationSupport
trait:
- override def toBinary(obj: AnyRef): Array[Byte] = obj match {
- case m: TwoPhaseSet => compress(twoPhaseSetToProto(m))
- case _ => throw new IllegalArgumentException(
- s"Can't serialize object of type ${obj.getClass}")
- }
-
- override def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = {
- twoPhaseSetFromBinary(decompress(bytes))
- }
The
two embedded GSet
can be serialized as
illustrated above, but in general when composing new data types from
the existing built in types it is better to make use of the existing
serializer for those types. This can be done by declaring those as
bytes fields in protobuf:
- message TwoPhaseSet2 {
- optional bytes adds = 1;
- optional bytes removals = 2;
- }
and
use the methods otherMessageToProto
and otherMessageFromBinary
that are provided by
theSerializationSupport
trait to serialize and
deserialize the GSet
instances. This works
with any type that has a registered Akka serializer. This is how
such an serializer would look like for the TwoPhaseSet
:
- import akka.actor.ExtendedActorSystem
- import akka.cluster.ddata.GSet
- import akka.cluster.ddata.protobuf.ReplicatedDataSerializer
- import akka.cluster.ddata.protobuf.SerializationSupport
- import akka.serialization.Serializer
- import docs.ddata.TwoPhaseSet
- import docs.ddata.protobuf.msg.TwoPhaseSetMessages
-
- class TwoPhaseSetSerializer2(val system: ExtendedActorSystem)
- extends Serializer with SerializationSupport {
-
- override def includeManifest: Boolean = false
-
- override def identifier = 99999
-
- val replicatedDataSerializer = new ReplicatedDataSerializer(system)
-
- override def toBinary(obj: AnyRef): Array[Byte] = obj match {
- case m: TwoPhaseSet => twoPhaseSetToProto(m).toByteArray
- case _ => throw new IllegalArgumentException(
- s"Can't serialize object of type ${obj.getClass}")
- }
-
- override def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = {
- twoPhaseSetFromBinary(bytes)
- }
-
- def twoPhaseSetToProto(twoPhaseSet: TwoPhaseSet): TwoPhaseSetMessages.TwoPhaseSet2 = {
- val b = TwoPhaseSetMessages.TwoPhaseSet2.newBuilder()
- if (!twoPhaseSet.adds.isEmpty)
- b.setAdds(otherMessageToProto(twoPhaseSet.adds).toByteString())
- if (!twoPhaseSet.removals.isEmpty)
- b.setRemovals(otherMessageToProto(twoPhaseSet.removals).toByteString())
- b.build()
- }
-
- def twoPhaseSetFromBinary(bytes: Array[Byte]): TwoPhaseSet = {
- val msg = TwoPhaseSetMessages.TwoPhaseSet2.parseFrom(bytes)
- val adds =
- if (msg.hasAdds)
- otherMessageFromBinary(msg.getAdds.toByteArray).asInstanceOf[GSet[String]]
- else
- GSet.empty[String]
- val removals =
- if (msg.hasRemovals)
- otherMessageFromBinary(msg.getRemovals.toByteArray).asInstanceOf[GSet[String]]
- else
- GSet.empty[String]
- TwoPhaseSet(adds, removals)
- }
- }
One
thing that can be problematic with CRDTs is that some data types
accumulate history (garbage). For example aGCounter
keeps track of one
counter per node. If a GCounter
has been updated from one
node it will associate the identifier of that node forever. That can
become a problem for long running systems with many cluster nodes
being added and removed. To solve this problem the Replicator
performs pruning of data
associated with nodes that have been removed from the cluster. Data
types that need pruning have to implement the RemovedNodePruning
trait.
Several interesting samples are included and described in the Lightbend Activator tutorial named Akka Distributed Data Samples with Scala.
There are some limitations that you should be aware of.
CRDTs cannot be used for all types of problems, and eventual consistency does not fit all domains. Sometimes you need strong consistency.
It is not intended for Big Data. The number of top level entries should not exceed 100000. When a new node is added to the cluster all these entries are transferred (gossiped) to the new node. The entries are split up in chunks and all existing nodes collaborate in the gossip, but it will take a while (tens of seconds) to transfer all entries and this means that you cannot have too many top level entries. The current recommended limit is 100000. We will be able to improve this if needed, but the design is still not intended for billions of entries.
All data is held in memory, which is another reason why it is not intended for Big Data.
When a data entry is changed the full state of that entry is replicated to other nodes. For example, if you add one element to a Set with 100 existing elements, all 101 elements are transferred to other nodes. This means that you cannot have too large data entries, because then the remote message size will be too large. We might be able to make this more efficient by implementing Efficient State-based CRDTs by Delta-Mutation.
The data is only kept in memory. It is redundant since it is replicated to other nodes in the cluster, but if you stop all nodes the data is lost, unless you have saved it elsewhere. Making the data durable is a possible future feature, but even if we implement that it is not intended to be a full featured database.
To use Distributed Data you must add the following dependency in your project.
sbt:
- "com.typesafe.akka" %% "akka-distributed-data-experimental" % "2.4.10"
maven:
- <dependency>
- <groupId>com.typesafe.akka</groupId>
- <artifactId>akka-distributed-data-experimental_2.11</artifactId>
- <version>2.4.10</version>
- </dependency>
The DistributedData
extension can be configured
with the following properties:
- # Settings for the DistributedData extension
- akka.cluster.distributed-data {
- # Actor name of the Replicator actor, /system/ddataReplicator
- name = ddataReplicator
-
- # Replicas are running on members tagged with this role.
- # All members are used if undefined or empty.
- role = ""
-
- # How often the Replicator should send out gossip information
- gossip-interval = 2 s
-
- # How often the subscribers will be notified of changes, if any
- notify-subscribers-interval = 500 ms
-
- # Maximum number of entries to transfer in one gossip message when synchronizing
- # the replicas. Next chunk will be transferred in next round of gossip.
- max-delta-elements = 1000
-
- # The id of the dispatcher to use for Replicator actors. If not specified
- # default dispatcher is used.
- # If specified you need to define the settings of the actual dispatcher.
- use-dispatcher = ""
-
- # How often the Replicator checks for pruning of data associated with
- # removed cluster nodes.
- pruning-interval = 30 s
-
- # How long time it takes (worst case) to spread the data to all other replica nodes.
- # This is used when initiating and completing the pruning process of data associated
- # with removed cluster nodes. The time measurement is stopped when any replica is
- # unreachable, so it should be configured to worst case in a healthy cluster.
- max-pruning-dissemination = 60 s
-
- # Serialized Write and Read messages are cached when they are sent to
- # several nodes. If no further activity they are removed from the cache
- # after this duration.
- serializer-cache-time-to-live = 10s
-
- }
For an introduction of remoting capabilities of Akka please see Location Transparency.
Note
As explained in that chapter Akka remoting is designed for communication in a peer-to-peer fashion and it has limitations for client-server setups. In particular Akka Remoting does not work transparently with Network Address Translation, Load Balancers, or in Docker containers. For symmetric communication in these situations network and/or Akka configuration will have to be changed as described in Akka behind NAT or in a Docker container.
The Akka remoting is a separate jar file. Make sure that you have the following dependency in your project:
- "com.typesafe.akka" %% "akka-remote" % "2.4.10"
To
enable remote capabilities in your Akka project you should, at a
minimum, add the following changes to yourapplication.conf
file:
- akka {
- actor {
- provider = "akka.remote.RemoteActorRefProvider"
- }
- remote {
- enabled-transports = ["akka.remote.netty.tcp"]
- netty.tcp {
- hostname = "127.0.0.1"
- port = 2552
- }
- }
- }
As you can see in the example above there are four things you need to add to get started:
akka.actor.LocalActorRefProvider
to akka.remote.RemoteActorRefProvider
Note
The port number needs to be unique for each actor system on the same machine even if the actor systems have different names. This is because each actor system has its own networking subsystem listening for connections and handling messages as not to interfere with other actor systems.
The example above only illustrates the bare minimum of properties you have to add to enable remoting. All settings are described in Remote Configuration.
Akka has two ways of using remoting:
actorSelection(path)
actorOf(Props(...), actorName)
In the next sections the two alternatives are described in detail.
actorSelection(path)
will obtain an ActorSelection
to an Actor on a remote
node, e.g.:
- val selection =
- context.actorSelection("akka.tcp://actorSystemName@10.0.0.1:2552/user/actorName")
As you can see from the example above the following pattern is used to find an actor on a remote node:
- akka.<protocol>://<actor system>@<hostname>:<port>/<actor path>
Once you obtained a selection to the actor you can interact with it in the same way you would with a local actor, e.g.:
- selection ! "Pretty awesome feature"
To
acquire an ActorRef
for an ActorSelection
you need to send a message
to the selection and use the sender
reference
of the reply from the actor. There is a built-in Identify
message that all Actors
will understand and automatically reply to with a ActorIdentity
message containing the ActorRef
.
This can also be done with theresolveOne
method of the ActorSelection
,
which returns a Future
of the matching ActorRef
.
Note
For more details on how actor addresses and paths are formed and used, please refer to Actor References, Paths and Addresses.
Note
Message sends to actors that are actually in the sending actor system do not get delivered via the remote actor ref provider. They're delivered directly, by the local actor ref provider.
Aside from providing better performance, this also means that if the hostname you configure remoting to listen as cannot actually be resolved from within the very same actor system, such messages will (perhaps counterintuitively) be delivered just fine.
If
you want to use the creation functionality in Akka remoting you have to
further amend the application.conf
file in the following way
(only showing deployment section):
- akka {
- actor {
- deployment {
- /sampleActor {
- remote = "akka.tcp://sampleActorSystem@127.0.0.1:2553"
- }
- }
- }
- }
The
configuration above instructs Akka to react when an actor with path /sampleActor
is created, i.e. usingsystem.actorOf(Props(...), "sampleActor")
.
This specific actor will not be directly instantiated, but instead the
remote daemon of the remote system will be asked to create the actor,
which in this sample corresponds tosampleActorSystem@127.0.0.1:2553
.
Once you have configured the properties above you would do the following in code:
- val actor = system.actorOf(Props[SampleActor], "sampleActor")
- actor ! "Pretty slick"
The
actor class SampleActor
has to be available to the
runtimes using it, i.e. the classloader of the actor systems has to have
a JAR containing the class.
Note
In
order to ensure serializability of Props
when passing constructor
arguments to the actor being created, do not make the factory an inner
class: this will inherently capture a reference to its enclosing
object, which in most cases is not serializable. It is best to create
a factory method in the companion object of the actor’s class.
Serializability
of all Props can be tested by setting the configuration item akka.actor.serialize-creators=on
.
Only Props whose deploy
has LocalScope
are exempt from this
check.
Note
You
can use asterisks as wildcard matches for the actor paths, so you
could specify: /*/sampleActor
and that would match all sampleActor
on that level in the
hierarchy. You can also use wildcard in the last position to match all
actors at a certain level: /someParent/*
.
Non-wildcard matches always have higher priority to match than
wildcards, so: /foo/bar
is considered more specific than /foo/*
and only the highest
priority match is used. Please note that it cannot be used to partially
match section, like this: /foo*/bar
, /f*o/bar
etc.
To
allow dynamically deployed systems, it is also possible to include
deployment configuration in the Props
which are used to create
an actor: this information is the equivalent of a deployment section
from the configuration file, and if both are given, the external
configuration takes precedence.
With these imports:
- import akka.actor.{ Props, Deploy, Address, AddressFromURIString }
- import akka.remote.RemoteScope
and a remote address like this:
- val one = AddressFromURIString("akka.tcp://sys@host:1234")
- val two = Address("akka.tcp", "sys", "host", 1234) // this gives the same
you can advise the system to create a child on that remote node like so:
- val ref = system.actorOf(Props[SampleActor].
- withDeploy(Deploy(scope = RemoteScope(address))))
Each
link with a remote system can be in one of the four states as
illustrated above. Before any communication happens with a remote system
at a given Address
the state of the
association is Idle
. The
first time a message is attempted to be sent to the remote system or an
inbound connection is accepted the state of the link transitions to Active
denoting
that the two systems has messages to send or receive and no failures
were encountered so far. When a communication failure happens and the
connection is lost between the two systems the link becomes Gated
.
In
this state the system will not attempt to connect to the remote host and
all outbound messages will be dropped. The time while the link is in the Gated
state is controlled by the
setting akka.remote.retry-gate-closed-for
:
after this time elapses the link state transitions to Idle
again. Gate
is one-sided in the sense
that whenever a successfulinbound connection is accepted from
a remote system during Gate
it automatically
transitions to Active
and communication resumes
immediately.
In
the face of communication failures that are unrecoverable because the
state of the participating systems are inconsistent, the remote system
becomes Quarantined
.
Unlike Gate
,
quarantining is permanent and lasts until one of the systems is
restarted. After a restart communication can be resumed again and the
link can become Active
again.
Watching a remote actor is not different than watching a local actor, as described in Lifecycle Monitoring aka DeathWatch.
Under
the hood remote death watch uses heartbeat messages and a failure
detector to generate Terminated
message from network
failures and JVM crashes, in addition to graceful termination of
watched actor.
The heartbeat arrival times is interpreted by an implementation of The Phi Accrual Failure Detector.
The suspicion level of failure is given by a value called phi. The basic idea of the phi failure detector is to express the value of phi on a scale that is dynamically adjusted to reflect current network conditions.
The value of phi is calculated as:
- phi = -log10(1 - F(timeSinceLastHeartbeat))
where F is the cumulative distribution function of a normal distribution with mean and standard deviation estimated from historical heartbeat inter-arrival times.
In
the Remote
Configuration you
can adjust the akka.remote.watch-failure-detector.threshold
to define when a phi value
is considered to be a failure.
A
low threshold
is prone to generate many
false positives but ensures a quick detection in the event of a real
crash. Conversely, a high threshold
generates fewer mistakes
but needs more time to detect actual crashes. The defaultthreshold
is 10 and is appropriate
for most situations. However in cloud environments, such as Amazon
EC2, the value could be increased to 12 in order to account for
network issues that sometimes occur on such platforms.
The following chart illustrates how phi increase with increasing time since the previous heartbeat.
Phi is calculated from the mean and standard deviation of historical inter arrival times. The previous chart is an example for standard deviation of 200 ms. If the heartbeats arrive with less deviation the curve becomes steeper, i.e. it is possible to determine failure more quickly. The curve looks like this for a standard deviation of 100 ms.
To
be able to survive sudden abnormalities, such as garbage collection
pauses and transient network failures the failure detector is
configured with a margin, akka.remote.watch-failure-detector.acceptable-heartbeat-pause
.
You may want to adjust the Remote
Configuration of
this depending on you environment. This is how the curve looks like
foracceptable-heartbeat-pause
configured to 3 seconds.
When
using remoting for actors you must ensure that the props
and messages
used for those actors are
serializable. Failing to do so will cause the system to behave in an
unintended way.
For more information please see Serialization.
It is absolutely feasible to combine remoting with Routing.
A pool of remote deployed routees can be configured as:
- akka.actor.deployment {
- /parent/remotePool {
- router = round-robin-pool
- nr-of-instances = 10
- target.nodes = ["akka.tcp://app@10.0.0.2:2552", "akka://app@10.0.0.3:2552"]
- }
- }
This
configuration setting will clone the actor defined in the Props
of the remotePool
10 times and deploy it
evenly distributed across the two given target nodes.
A group of remote actors can be configured as:
- akka.actor.deployment {
- /parent/remoteGroup {
- router = round-robin-group
- routees.paths = [
- "akka.tcp://app@10.0.0.1:2552/user/workers/w1",
- "akka.tcp://app@10.0.0.2:2552/user/workers/w1",
- "akka.tcp://app@10.0.0.3:2552/user/workers/w1"]
- }
- }
This configuration setting will send messages to the defined remote actor paths. It requires that you create the destination actors on the remote nodes with matching paths. That is not done by the router.
There is a more extensive remote example that comes with Lightbend Activator. The tutorial named Akka Remote Samples with Scala demonstrates both remote deployment and look-up of remote actors.
Akka
can be configured to use various transports to communicate with remote
systems. The core component of this feature is the akka.remote.transport.Transport
SPI. Transport
implementations must extend this trait. Transports can be loaded by
setting the akka.remote.enabled-transports
configuration key to
point to one or more configuration sections containing driver
descriptions.
An example of setting up the default Netty based SSL driver as default:
- akka {
- remote {
- enabled-transports = [akka.remote.netty.ssl]
-
- netty.ssl.security {
- key-store = "mykeystore"
- trust-store = "mytruststore"
- key-store-password = "changeme"
- key-password = "changeme"
- trust-store-password = "changeme"
- protocol = "TLSv1.2"
- random-number-generator = "AES128CounterSecureRNG"
- enabled-algorithms = [TLS_RSA_WITH_AES_128_CBC_SHA]
- }
- }
- }
An example of setting up a custom transport implementation:
- akka {
- remote {
- applied-transports = ["akka.remote.mytransport"]
-
- mytransport {
- # The transport-class configuration entry is required, and
- # it must contain the fully qualified name of the transport
- # implementation
- transport-class = "my.package.MyTransport"
-
- # It is possible to decorate Transports with additional services.
- # Adapters should be registered in the "adapters" sections to
- # be able to apply them to transports
- applied-adapters = []
-
- # Driver specific configuration options has to be in the same
- # section:
- some-config = foo
- another-config = bar
- }
It
is possible to listen to events that occur in Akka Remote, and to
subscribe/unsubscribe to these events you simply register as listener
to the below described types in on the ActorSystem.eventStream
.
Note
To
subscribe to any remote event, subscribe to RemotingLifecycleEvent
.
To subscribe to events related only to the lifecycle of
associations, subscribe to akka.remote.AssociationEvent
.
Note
The use of term "Association" instead of "Connection" reflects that the remoting subsystem may use connectionless transports, but an association similar to transport layer connections is maintained between endpoints by the Akka protocol.
By default an event listener is registered which logs all of the events described below. This default was chosen to help setting up a system, but it is quite common to switch this logging off once that phase of the project is finished.
Note
In
order to switch off the logging, set akka.remote.log-remote-lifecycle-events = off
in yourapplication.conf
.
To
be notified when an association is over ("disconnected") listen to DisassociatedEvent
which holds the direction
of the association (inbound or outbound) and the addresses of the
involved parties.
To
be notified when an association is successfully established
("connected") listen to AssociatedEvent
which holds the direction
of the association (inbound or outbound) and the addresses of the
involved parties.
To
intercept errors directly related to associations, listen to AssociationErrorEvent
which holds the direction
of the association (inbound or outbound), the addresses of the
involved parties and the Throwable
cause.
To
be notified when the remoting subsystem is ready to accept
associations, listen to RemotingListenEvent
which contains the
addresses the remoting listens on.
To
be notified when the current system is quarantined by the remote
system, listen toThisActorSystemQuarantinedEvent
,
which includes the addresses of local and remote ActorSystems.
To
be notified when the remoting subsystem has been shut down, listen to RemotingShutdownEvent
.
To
intercept generic remoting related errors, listen to RemotingErrorEvent
which holds the Throwable
cause.
Akka provides a couple of ways to enhance security between remote nodes (client/server):
As
soon as an actor system can connect to another remotely, it may in
principle send any possible message to any actor contained within that
remote system. One example may be sending a PoisonPill
to the system guardian,
shutting that system down. This is not always desired, and it can be
disabled with the following setting:
- akka.remote.untrusted-mode = on
This
disallows sending of system messages (actor life-cycle commands,
DeathWatch, etc.) and any message extendingPossiblyHarmful
to the system on which
this flag is set. Should a client send them nonetheless they are
dropped and logged (at DEBUG level in order to reduce the
possibilities for a denial of service attack). PossiblyHarmful
covers the predefined
messages like PoisonPill
and Kill
,
but it can also be added as a marker trait to user-defined messages.
Messages sent with actor selection are by default discarded in untrusted mode, but permission to receive actor selection messages can be granted to specific actors defined in configuration:
- akka.remote.trusted-selection-paths = ["/user/receptionist", "/user/namingService"]
The
actual message must still not be of type PossiblyHarmful
.
In summary, the following operations are ignored by a system configured in untrusted mode when incoming via the remoting layer:
system.stop()
, PoisonPill
, Kill
PossiblyHarmful
marker interface, which
includes Terminated
trusted-selection-paths
.Note
Enabling
the untrusted mode does not remove the capability of the client to
freely choose the target of its message sends, which means that
messages not prohibited by the above rules can be sent to any actor
in the remote system. It is good practice for a client-facing system
to only contain a well-defined set of entry point actors, which then
forward requests (possibly after performing validation) to another
actor system containing the actual worker actors. If messaging
between these two server-side systems is done using local ActorRef
(they can be exchanged
safely between actor systems within the same JVM), you can restrict
the messages on this interface by marking them PossiblyHarmful
so that a client cannot
forge them.
SSL
can be used as the remote transport by adding akka.remote.netty.ssl
to the enabled-transport
configuration
section. See a description of the settings in the Remote
Configuration section.
The SSL support is implemented with Java Secure Socket Extension, please consult the official Java Secure Socket Extension documentation and related resources for troubleshooting.
Note
When
using SHA1PRNG on Linux it's recommended specify -Djava.security.egd=file:/dev/./urandom
as argument to the JVM
to prevent blocking. It is NOT as secure because it reuses the seed.
Use '/dev/./urandom', not '/dev/urandom' as that doesn't work
according to Bug
ID: 6202721.
There are lots of configuration properties that are related to remoting in Akka. We refer to the reference configuration for more information.
Note
Setting properties like the listening IP and port number programmatically is best done by using something like the following:
- ConfigFactory.parseString("akka.remote.netty.tcp.hostname=\"1.2.3.4\"")
- .withFallback(ConfigFactory.load());
In setups involving Network Address Translation (NAT), Load Balancers or Docker containers the hostname and port pair that Akka binds to will be different than the "logical" host name and port pair that is used to connect to the system from the outside. This requires special configuration that sets both the logical and the bind pairs for remoting.
- akka {
- remote {
- netty.tcp {
- hostname = my.domain.com # external (logical) hostname
- port = 8000 # external (logical) port
-
- bind-hostname = local.address # internal (bind) hostname
- bind-port = 2552 # internal (bind) port
- }
- }
- }
Akka has a built-in Extension for serialization, and it is both possible to use the built-in serializers and to write your own.
The serialization mechanism is both used by Akka internally to serialize messages, and available for ad-hoc serialization of whatever you might need it for.
For
Akka to know which Serializer
to use for what, you need
edit your Configuration,
in the "akka.actor.serializers"-section you bind names to
implementations of the akka.serialization.Serializer
you wish to use, like
this:
- akka {
- actor {
- serializers {
- java = "akka.serialization.JavaSerializer"
- proto = "akka.remote.serialization.ProtobufSerializer"
- myown = "docs.serialization.MyOwnSerializer"
- }
- }
- }
After
you've bound names to different implementations of Serializer
you need to wire which
classes should be serialized using which Serializer
,
this is done in the "akka.actor.serialization-bindings"-section:
- akka {
- actor {
- serializers {
- java = "akka.serialization.JavaSerializer"
- proto = "akka.remote.serialization.ProtobufSerializer"
- myown = "docs.serialization.MyOwnSerializer"
- }
-
- serialization-bindings {
- "java.lang.String" = java
- "docs.serialization.Customer" = java
- "com.google.protobuf.Message" = proto
- "docs.serialization.MyOwnSerializable" = myown
- "java.lang.Boolean" = myown
- }
- }
- }
You
only need to specify the name of an interface or abstract base class
of the messages. In case of ambiguity, i.e. the message implements
several of the configured classes, the most specific configured class
will be used, i.e. the one of which all other candidates are
superclasses. If this condition cannot be met, because e.g. java.io.Serializable
andMyOwnSerializable
both apply and neither is
a subtype of the other, a warning will be issued
Note
If
your messages are contained inside of a Scala object, then in order
to reference those messages, you will need use the fully qualified
Java class name. For a message named Message
contained inside the
object namedWrapper
you would need to
reference it as Wrapper$Message
instead of Wrapper.Message
.
Akka
provides serializers for java.io.Serializable
and protobuf com.google.protobuf.GeneratedMessage
by default (the latter
only if depending on the akka-remote module), so normally you don't
need to add configuration for that; since com.google.protobuf.GeneratedMessage
implements java.io.Serializable
,
protobuf messages will always be serialized using the protobuf
protocol unless specifically overridden. In order to disable a default
serializer, map its marker type to “none”:
- akka.actor.serialization-bindings {
- "java.io.Serializable" = none
- }
If you want to verify that your messages are serializable you can enable the following config option:
- akka {
- actor {
- serialize-messages = on
- }
- }
Warning
We only recommend using the config option turned on when you're running tests. It is completely pointless to have it turned on in other scenarios.
If
you want to verify that your Props
are serializable you can
enable the following config option:
- akka {
- actor {
- serialize-creators = on
- }
- }
Warning
We only recommend using the config option turned on when you're running tests. It is completely pointless to have it turned on in other scenarios.
If you want to programmatically serialize/deserialize using Akka Serialization, here's some examples:
- import akka.actor.{ ActorRef, ActorSystem }
- import akka.serialization._
- import com.typesafe.config.ConfigFactory
-
- val system = ActorSystem("example")
-
- // Get the Serialization Extension
- val serialization = SerializationExtension(system)
-
- // Have something to serialize
- val original = "woohoo"
-
- // Find the Serializer for it
- val serializer = serialization.findSerializerFor(original)
-
- // Turn it into bytes
- val bytes = serializer.toBinary(original)
-
- // Turn it back into an object
- val back = serializer.fromBinary(bytes, manifest = None)
-
- // Voilá!
- back should be(original)
For
more information, have a look at the ScalaDoc
for akka.serialization._
So,
lets say that you want to create your own Serializer
,
you saw the docs.serialization.MyOwnSerializer
in the config example
above?
First
you need to create a class definition of your Serializer
like so:
- import akka.actor.{ ActorRef, ActorSystem }
- import akka.serialization._
- import com.typesafe.config.ConfigFactory
-
- class MyOwnSerializer extends Serializer {
-
- // This is whether "fromBinary" requires a "clazz" or not
- def includeManifest: Boolean = true
-
- // Pick a unique identifier for your Serializer,
- // you've got a couple of billions to choose from,
- // 0 - 16 is reserved by Akka itself
- def identifier = 1234567
-
- // "toBinary" serializes the given object to an Array of Bytes
- def toBinary(obj: AnyRef): Array[Byte] = {
- // Put the code that serializes the object here
- // ... ...
- }
-
- // "fromBinary" deserializes the given array,
- // using the type hint (if any, see "includeManifest" above)
- def fromBinary(
- bytes: Array[Byte],
- clazz: Option[Class[_]]): AnyRef = {
- // Put your code that deserializes here
- // ... ...
- }
- }
The
manifest is a type hint so that the same serializer can be used for
different classes. The manifest parameter infromBinary
is the class of the
object that was serialized. In fromBinary
you can match on the
class and deserialize the bytes to different objects.
Then you only need to fill in the blanks, bind it to a name in your Configuration and then list which classes that should be serialized using it.
The Serializer
illustrated above
supports a class based manifest (type hint). For serialization of data
that need to evolve over time the SerializerWithStringManifest
is recommended instead of Serializer
because the manifest
(type hint) is a String
instead of a Class
.
That means that the class can be moved/removed and the serializer can
still deserialize old data by matching on the String
.
This is especially useful for Persistence.
The
manifest string can also encode a version number that can be used in fromBinary
to deserialize in
different ways to migrate old data to new domain objects.
If
the data was originally serialized with Serializer
and in a later version of
the system you change toSerializerWithStringManifest
the manifest string will
be the full class name if you usedincludeManifest=true
,
otherwise it will be the empty string.
This
is how a SerializerWithStringManifest
looks like:
- class MyOwnSerializer2 extends SerializerWithStringManifest {
-
- val CustomerManifest = "customer"
- val UserManifest = "user"
- val UTF_8 = StandardCharsets.UTF_8.name()
-
- // Pick a unique identifier for your Serializer,
- // you've got a couple of billions to choose from,
- // 0 - 16 is reserved by Akka itself
- def identifier = 1234567
-
- // The manifest (type hint) that will be provided in the fromBinary method
- // Use `""` if manifest is not needed.
- def manifest(obj: AnyRef): String =
- obj match {
- case _: Customer => CustomerManifest
- case _: User => UserManifest
- }
-
- // "toBinary" serializes the given object to an Array of Bytes
- def toBinary(obj: AnyRef): Array[Byte] = {
- // Put the real code that serializes the object here
- obj match {
- case Customer(name) => name.getBytes(UTF_8)
- case User(name) => name.getBytes(UTF_8)
- }
- }
-
- // "fromBinary" deserializes the given array,
- // using the type hint
- def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = {
- // Put the real code that deserializes here
- manifest match {
- case CustomerManifest =>
- Customer(new String(bytes, UTF_8))
- case UserManifest =>
- User(new String(bytes, UTF_8))
- }
- }
- }
You must also bind it to a name in your Configuration and then list which classes that should be serialized using it.
All
ActorRefs are serializable using JavaSerializer, but in case you are
writing your own serializer, you might want to know how to serialize
and deserialize them properly. In the general case, the local address
to be used depends on the type of remote address which shall be the
recipient of the serialized information. UseSerialization.serializedActorPath(actorRef)
like this:
- import akka.actor.{ ActorRef, ActorSystem }
- import akka.serialization._
- import com.typesafe.config.ConfigFactory
-
- // Serialize
- // (beneath toBinary)
- val identifier: String = Serialization.serializedActorPath(theActorRef)
-
- // Then just serialize the identifier however you like
-
- // Deserialize
- // (beneath fromBinary)
- val deserializedActorRef = extendedSystem.provider.resolveActorRef(identifier)
- // Then just use the ActorRef
This
assumes that serialization happens in the context of sending a message
through the remote transport. There are other uses of serialization,
though, e.g. storing actor references outside of an actor application
(database, etc.). In this case, it is important to keep in mind that
the address part of an actor’s path determines how that actor is
communicated with. Storing a local actor path might be the right
choice if the retrieval happens in the same logical context, but it is
not enough when deserializing it on a different network host: for that
it would need to include the system’s remote transport address. An
actor system is not limited to having just one remote transport per
se, which makes this question a bit more interesting. To find out the
appropriate address to use when sending to remoteAddr
you can useActorRefProvider.getExternalAddressFor(remoteAddr)
like this:
- object ExternalAddress extends ExtensionKey[ExternalAddressExt]
-
- class ExternalAddressExt(system: ExtendedActorSystem) extends Extension {
- def addressFor(remoteAddr: Address): Address =
- system.provider.getExternalAddressFor(remoteAddr) getOrElse
- (throw new UnsupportedOperationException("cannot send to " + remoteAddr))
- }
-
- def serializeTo(ref: ActorRef, remote: Address): String =
- ref.path.toSerializationFormatWithAddress(ExternalAddress(extendedSystem).
- addressFor(remote))
Note
ActorPath.toSerializationFormatWithAddress
differs from toString
if the address does not
already havehost
and port
components, i.e. it
only inserts address information for local addresses.
toSerializationFormatWithAddress
also adds the unique id
of the actor, which will change when the actor is stopped and then
created again with the same name. Sending messages to a reference
pointing the old actor will not be delivered to the new actor. If
you don't want this behavior, e.g. in case of long term storage of
the reference, you can use toStringWithAddress
,
which doesn't include the unique id.
This
requires that you know at least which type of address will be
supported by the system which will deserialize the resulting actor
reference; if you have no concrete address handy you can create a
dummy one for the right protocol using Address(protocol, "", "", 0)
(assuming that the actual
transport used is as lenient as Akka’s RemoteActorRefProvider).
There is also a default remote address which is the one used by cluster support (and typical systems have just this one); you can get it like this:
- object ExternalAddress extends ExtensionKey[ExternalAddressExt]
-
- class ExternalAddressExt(system: ExtendedActorSystem) extends Extension {
- def addressForAkka: Address = system.provider.getDefaultAddress
- }
-
- def serializeAkkaDefault(ref: ActorRef): String =
- ref.path.toSerializationFormatWithAddress(ExternalAddress(theActorSystem).
- addressForAkka)
The recommended approach to do deep serialization of internal actor state is to use Akka Persistence.
When
using Java serialization without employing the JavaSerializer
for the task, you must make
sure to supply a valid ExtendedActorSystem
in the dynamic variable JavaSerializer.currentSystem
.
This is used when reading in the representation of an ActorRef
for turning the string
representation into a real reference. DynamicVariable
is a thread-local variable,
so be sure to have it set while deserializing anything which might
contain actor references.
The akka.io
package has been developed
in collaboration between the Akka and spray.io teams. Its design combines
experiences from the spray-io
module with improvements
that were jointly developed for more general consumption as an
actor-based service.
The guiding design goal for this I/O implementation was to reach extreme scalability, make no compromises in providing an API correctly matching the underlying transport mechanism and to be fully event-driven, non-blocking and asynchronous. The API is meant to be a solid foundation for the implementation of network protocols and building higher abstractions; it is not meant to be a full-service high-level NIO wrapper for end users.
The
I/O API is completely actor based, meaning that all operations are
implemented with message passing instead of direct method calls. Every
I/O driver (TCP, UDP) has a special actor, called a manager that
serves as an entry point for the API. I/O is broken into several
drivers. The manager for a particular driver is accessible through the IO
entry
point. For example the following code looks up the TCP manager and
returns its ActorRef
:
- import akka.io.{ IO, Tcp }
- import context.system // implicitly used by IO(Tcp)
-
- val manager = IO(Tcp)
The
manager receives I/O command messages and instantiates worker actors in
response. The worker actors present themselves to the API user in the
reply to the command that was sent. For example after a Connect
command sent to the TCP
manager the manager creates an actor representing the TCP connection.
All operations related to the given TCP connections can be invoked by
sending messages to the connection actor which announces itself by
sending aConnected
message.
I/O worker actors receive commands and also send out events. They usually need a user-side counterpart actor listening for these events (such events could be inbound connections, incoming bytes or acknowledgements for writes). These worker actors watch their listener counterparts. If the listener stops then the worker will automatically release any resources that it holds. This design makes the API more robust against resource leaks.
Thanks to the completely actor based approach of the I/O API the opposite direction works as well: a user actor responsible for handling a connection can watch the connection actor to be notified if it unexpectedly terminates.
I/O devices have a maximum throughput which limits the frequency and size of writes. When an application tries to push more data than a device can handle, the driver has to buffer bytes until the device is able to write them. With buffering it is possible to handle short bursts of intensive writes --- but no buffer is infinite. "Flow control" is needed to avoid overwhelming device buffers.
Akka supports two types of flow control:
Each of these models is available in both the TCP and the UDP implementations of Akka I/O.
Individual
writes can be acknowledged by providing an ack object in the write
message (Write
in the case of TCP andSend
for UDP). When the write
is complete the worker will send the ack object to the writing actor.
This can be used to implement ack-based flow
control; sending new data only when old data has been acknowledged.
If
a write (or any other command) fails, the driver notifies the actor
that sent the command with a special message (CommandFailed
in the case of UDP and
TCP). This message will also notify the writer of a failed write,
serving as a nack for that write. Please note, that in a nack-based
flow-control setting the writer has to be prepared for the fact that
the failed write might not be the most recent write it sent. For
example, the failure notification for a write W1
might arrive after
additional write commands W2
and W3
have been sent. If the
writer wants to resend any nacked messages it may need to keep a
buffer of pending messages.
Warning
An acknowledged write does not mean acknowledged delivery or storage; receiving an ack for a write simply signals that the I/O driver has successfully processed the write. The Ack/Nack protocol described here is a means of flow control not error handling. In other words, data may still be lost, even if every write is acknowledged.
To
maintain isolation, actors should communicate with immutable objects
only. ByteString
is an immutable container
for bytes. It is used by Akka's I/O system as an efficient, immutable
alternative the traditional byte containers used for I/O on the JVM,
such as Array[Byte]
and ByteBuffer
.
ByteString
is a rope-like data structure that is
immutable and provides fast concatenation and slicing operations
(perfect for I/O). When two ByteString
s
are concatenated together they are both stored within the resultingByteString
instead of copying both
to a new Array
.
Operations such as drop
and take
return ByteString
s
that still reference the original Array
,
but just change the offset and length that is visible. Great care has
also been taken to make sure that the internal Array
cannot be modified.
Whenever a potentially unsafe Array
is used to create a new ByteString
a defensive copy is
created. If you require a ByteString
that only blocks as much
memory as necessary for it's content, use the compact
method to get a CompactByteString
instance. If the ByteString
represented
only a slice of the original array, this will result in copying all
bytes in that slice.
ByteString
inherits all methods from IndexedSeq
,
and it also has some new ones. For more information, look up the akka.util.ByteString
class and it's companion
object in the ScalaDoc.
ByteString
also comes with its own
optimized builder and iterator classes ByteStringBuilder
andByteIterator
which provide extra
features in addition to those of normal builders and iterators.
A ByteStringBuilder
can be wrapped in a java.io.OutputStream
via the asOutputStream
method. Likewise,ByteIterator
can be wrapped in a java.io.InputStream
via asInputStream
.
Using these, akka.io
applications
can integrate legacy code based on java.io
streams.
For further details on the design and internal architecture see I/O Layer Design.
The code snippets through-out this section assume the following imports:
- import akka.actor.{ Actor, ActorRef, Props }
- import akka.io.{ IO, Tcp }
- import akka.util.ByteString
- import java.net.InetSocketAddress
All
of the Akka I/O APIs are accessed through manager objects. When using an
I/O API, the first step is to acquire a reference to the appropriate
manager. The code below shows how to acquire a reference to the Tcp
manager.
- import akka.io.{ IO, Tcp }
- import context.system // implicitly used by IO(Tcp)
-
- val manager = IO(Tcp)
The manager is an actor that handles the underlying low level I/O resources (selectors, channels) and instantiates workers for specific tasks, such as listening to incoming connections.
- object Client {
- def props(remote: InetSocketAddress, replies: ActorRef) =
- Props(classOf[Client], remote, replies)
- }
-
- class Client(remote: InetSocketAddress, listener: ActorRef) extends Actor {
-
- import Tcp._
- import context.system
-
- IO(Tcp) ! Connect(remote)
-
- def receive = {
- case CommandFailed(_: Connect) =>
- listener ! "connect failed"
- context stop self
-
- case c @ Connected(remote, local) =>
- listener ! c
- val connection = sender()
- connection ! Register(self)
- context become {
- case data: ByteString =>
- connection ! Write(data)
- case CommandFailed(w: Write) =>
- // O/S buffer was full
- listener ! "write failed"
- case Received(data) =>
- listener ! data
- case "close" =>
- connection ! Close
- case _: ConnectionClosed =>
- listener ! "connection closed"
- context stop self
- }
- }
- }
The
first step of connecting to a remote address is sending a Connect
message to the TCP manager;
in addition to the simplest form shown above there is also the
possibility to specify a local InetSocketAddress
to bind to and a list of
socket options to apply.
Note
The
SO_NODELAY (TCP_NODELAY on Windows) socket option defaults to true in
Akka, independently of the OS default settings. This setting disables
Nagle's algorithm, considerably improving latency for most
applications. This setting could be overridden by passing SO.TcpNoDelay(false)
in the list of socket
options of the Connect
message.
The
TCP manager will then reply either with a CommandFailed
or it will spawn an
internal actor representing the new connection. This new actor will then
send a Connected
message to the original
sender of the Connect
message.
In
order to activate the new connection a Register
message must be sent to the
connection actor, informing that one about who shall receive data from
the socket. Before this step is done the connection cannot be used, and
there is an internal timeout after which the connection actor will shut
itself down if no Register
message is received.
The connection actor watches the registered handler and closes the connection when that one terminates, thereby cleaning up all internal resources associated with that connection.
The
actor in the example above uses become
to switch from unconnected
to connected operation, demonstrating the commands and events which are
observed in that state. For a discussion on CommandFailed
see Throttling
Reads and Writes below. ConnectionClosed
is a trait, which marks the
different connection close events. The last line handles all connection
close events in the same way. It is possible to listen for more
fine-grained connection close events, seeClosing
Connections below.
- class Server extends Actor {
-
- import Tcp._
- import context.system
-
- IO(Tcp) ! Bind(self, new InetSocketAddress("localhost", 0))
-
- def receive = {
- case b @ Bound(localAddress) =>
- // do some logging or setup ...
-
- case CommandFailed(_: Bind) => context stop self
-
- case c @ Connected(remote, local) =>
- val handler = context.actorOf(Props[SimplisticHandler])
- val connection = sender()
- connection ! Register(handler)
- }
-
- }
To
create a TCP server and listen for inbound connections, a Bind
command has to be sent to
the TCP manager. This will instruct the TCP manager to listen for TCP
connections on a particular InetSocketAddress
;
the port may be specified as 0
in
order to bind to a random port.
The
actor sending the Bind
message will receive a Bound
message signaling that the
server is ready to accept incoming connections; this message also
contains the InetSocketAddress
to which the socket was
actually bound (i.e. resolved IP address and correct port number).
From
this point forward the process of handling connections is the same as
for outgoing connections. The example demonstrates that handling the
reads from a certain connection can be delegated to another actor by
naming it as the handler when sending the Register
message. Writes can be sent
from any actor in the system to the connection actor (i.e. the actor
which sent the Connected
message). The simplistic
handler is defined as:
- class SimplisticHandler extends Actor {
- import Tcp._
- def receive = {
- case Received(data) => sender() ! Write(data)
- case PeerClosed => context stop self
- }
- }
For a more complete sample which also takes into account the possibility of failures when sending please see Throttling Reads and Writes below.
The
only difference to outgoing connections is that the internal actor
managing the listen port—the sender of theBound
message—watches the actor
which was named as the recipient for Connected
messages in the Bind
message.
When that actor terminates the listen port will be closed and all
resources associated with it will be released; existing connections will
not be terminated at this point.
A
connection can be closed by sending one of the commands Close
, ConfirmedClose
or Abort
to the connection actor.
Close
will close the connection
by sending a FIN
message, but without
waiting for confirmation from the remote endpoint. Pending writes will
be flushed. If the close is successful, the listener will be notified
with Closed
.
ConfirmedClose
will close the sending
direction of the connection by sending a FIN
message, but data will
continue to be received until the remote endpoint closes the connection,
too. Pending writes will be flushed. If the close is successful, the
listener will be notified with ConfirmedClosed
.
Abort
will immediately terminate
the connection by sending a RST
message to the remote
endpoint. Pending writes will be not flushed. If the close is
successful, the listener will be notified with Aborted
.
PeerClosed
will be sent to the
listener if the connection has been closed by the remote endpoint. Per
default, the connection will then automatically be closed from this
endpoint as well. To support half-closed connections set thekeepOpenOnPeerClosed
member of the Register
message to true
in which case the
connection stays open until it receives one of the above close commands.
ErrorClosed
will be sent to the
listener whenever an error happened that forced the connection to be
closed.
All
close notifications are sub-types of ConnectionClosed
so listeners who do not
need fine-grained close events may handle all close events in the same
way.
Once
a connection has been established data can be sent to it from any actor
in the form of a Tcp.WriteCommand
.Tcp.WriteCommand
is an abstract class with
three concrete implementations:
WriteCommand
implementation which
wraps a ByteString
instance and an "ack"
event. A ByteString
(as
explained in this
section) models one or more chunks of immutable
in-memory data with a maximum (total) size of 2 GB (2^31 bytes).Tcp.WriteFile
command. This allows you
do designate a (contiguous) chunk of on-disk bytes for sending across
the connection without the need to first load them into the JVM
memory. As such Tcp.WriteFile
can "hold" more than 2GB
of data and an "ack" event if required.Sometimes
you might want to group (or interleave) several Tcp.Write
and/or Tcp.WriteFile
commands into one
atomic write command which gets written to the connection in one go.
The Tcp.CompoundWrite
allows you to do just
that and offers three benefits:
CompoundWrite
you can have them be
sent across the connection with minimum overhead and without the
need to spoon feed them to the connection actor via an ACK-based message
protocol.WriteCommand
is atomic you can be
sure that no other actor can "inject" other writes into your
series of writes if you combine them into one single CompoundWrite
.
In scenarios where several actors write to the same connection
this can be an important feature which can be somewhat hard to
achieve otherwise.CompoundWrite
are regular Write
or WriteFile
commands that
themselves can request "ack" events. These ACKs are sent out as
soon as the respective "sub write" has been completed. This allows
you to attach more than one ACK to a Write
or WriteFile
(by combining it with
an empty write that itself requests an ACK) or to have the
connection actor acknowledge the progress of transmitting the CompoundWrite
by sending out
intermediate ACKs at arbitrary points.The basic model of the TCP connection actor is that it has no internal buffering (i.e. it can only process one write at a time, meaning it can buffer one write until it has been passed on to the O/S kernel in full). Congestion needs to be handled at the user level, for both writes and reads.
For back-pressuring writes there are three modes of operation
Write
command carries an
arbitrary object, and if this object is not Tcp.NoAck
then it will be returned
to the sender of the Write
upon successfully writing
all contained data to the socket. If no other write is initiated
before having received this acknowledgement then no failures can
happen due to buffer overrun.CommandFailed
message containing the
failed write. Just relying on this mechanism requires the implemented
protocol to tolerate skipping writes (e.g. if each write is a valid
message on its own and it is not required that all are delivered).
This mode is enabled by setting the useResumeWriting
flag to false
within the Register
message during connection
activation.ResumeWriting
message is received. This
message will be answered with aWritingResumed
message once the last
accepted write has completed. If the actor driving the connection
implements buffering and resends the NACK’ed messages after having
awaited the WritingResumed
signal then every message
is delivered exactly once to the network socket.These write back-pressure models (with the exception of the second which is rather specialised) are demonstrated in complete examples below. The full and contiguous source is available on GitHub.
For back-pressuring reads there are two modes of operation
Received
events. Whenever the
reader actor wants to signal back-pressure to the remote TCP endpoint
it can send aSuspendReading
message to the connection
actor to indicate that it wants to suspend the reception of new data.
NoReceived
events will arrive until
a corresponding ResumeReading
is sent indicating that
the receiver actor is ready again.Received
event the connection
actor automatically suspends accepting data from the socket until the
reader actor signals with a ResumeReading
message that it is ready
to process more input data. Hence new data is "pulled" from the
connection by sending ResumeReading
messages.Note
It should be obvious that all these flow control schemes only work between one writer/reader and one connection actor; as soon as multiple actors send write commands to a single connection no consistent result can be achieved.
For
proper function of the following example it is important to configure
the connection to remain half-open when the remote side closed its
writing end: this allows the example EchoHandler
to write all outstanding
data back to the client before fully closing the connection. This is
enabled using a flag upon connection activation (observe theRegister
message):
- case Connected(remote, local) =>
- log.info("received connection from {}", remote)
- val handler = context.actorOf(Props(handlerClass, sender(), remote))
- sender() ! Register(handler, keepOpenOnPeerClosed = true)
With this preparation let us dive into the handler itself:
- // storage omitted ...
- class SimpleEchoHandler(connection: ActorRef, remote: InetSocketAddress)
- extends Actor with ActorLogging {
-
- import Tcp._
-
- // sign death pact: this actor terminates when connection breaks
- context watch connection
-
- case object Ack extends Event
-
- def receive = {
- case Received(data) =>
- buffer(data)
- connection ! Write(data, Ack)
-
- context.become({
- case Received(data) => buffer(data)
- case Ack => acknowledge()
- case PeerClosed => closing = true
- }, discardOld = false)
-
- case PeerClosed => context stop self
- }
-
- // storage omitted ...
- }
The
principle is simple: when having written a chunk always wait for the Ack
to come back before sending
the next chunk. While waiting we switch behavior such that new incoming
data are buffered. The helper functions used are a bit lengthy but not
complicated:
- private def buffer(data: ByteString): Unit = {
- storage :+= data
- stored += data.size
-
- if (stored > maxStored) {
- log.warning(s"drop connection to [$remote] (buffer overrun)")
- context stop self
-
- } else if (stored > highWatermark) {
- log.debug(s"suspending reading")
- connection ! SuspendReading
- suspended = true
- }
- }
-
- private def acknowledge(): Unit = {
- require(storage.nonEmpty, "storage was empty")
-
- val size = storage(0).size
- stored -= size
- transferred += size
-
- storage = storage drop 1
-
- if (suspended && stored < lowWatermark) {
- log.debug("resuming reading")
- connection ! ResumeReading
- suspended = false
- }
-
- if (storage.isEmpty) {
- if (closing) context stop self
- else context.unbecome()
- } else connection ! Write(storage(0), Ack)
- }
The
most interesting part is probably the last: an Ack
removes the oldest data
chunk from the buffer, and if that was the last chunk then we either
close the connection (if the peer closed its half already) or return to
the idle behavior; otherwise we just send the next buffered chunk and
stay waiting for the next Ack
.
Back-pressure
can be propagated also across the reading side back to the writer on the
other end of the connection by sending the SuspendReading
command to the connection
actor. This will lead to no data being read from the socket anymore
(although this does happen after a delay because it takes some time
until the connection actor processes this command, hence appropriate
head-room in the buffer should be present), which in turn will lead to
the O/S kernel buffer filling up on our end, then the TCP window
mechanism will stop the remote side from writing, filling up its write
buffer, until finally the writer on the other side cannot push any data
into the socket anymore. This is how end-to-end back-pressure is
realized across a TCP connection.
- object EchoHandler {
- final case class Ack(offset: Int) extends Tcp.Event
-
- def props(connection: ActorRef, remote: InetSocketAddress): Props =
- Props(classOf[EchoHandler], connection, remote)
- }
-
- class EchoHandler(connection: ActorRef, remote: InetSocketAddress)
- extends Actor with ActorLogging {
-
- import Tcp._
- import EchoHandler._
-
- // sign death pact: this actor terminates when connection breaks
- context watch connection
-
- // start out in optimistic write-through mode
- def receive = writing
-
- def writing: Receive = {
- case Received(data) =>
- connection ! Write(data, Ack(currentOffset))
- buffer(data)
-
- case Ack(ack) =>
- acknowledge(ack)
-
- case CommandFailed(Write(_, Ack(ack))) =>
- connection ! ResumeWriting
- context become buffering(ack)
-
- case PeerClosed =>
- if (storage.isEmpty) context stop self
- else context become closing
- }
-
- // buffering ...
-
- // closing ...
-
- override def postStop(): Unit = {
- log.info(s"transferred $transferred bytes from/to [$remote]")
- }
-
- // storage omitted ...
- }
- // storage omitted ...
The
principle here is to keep writing until a CommandFailed
is received, using
acknowledgements only to prune the resend buffer. When a such a failure
was received, transition into a different state for handling and handle
resending of all queued data:
- def buffering(nack: Int): Receive = {
- var toAck = 10
- var peerClosed = false
-
- {
- case Received(data) => buffer(data)
- case WritingResumed => writeFirst()
- case PeerClosed => peerClosed = true
- case Ack(ack) if ack < nack => acknowledge(ack)
- case Ack(ack) =>
- acknowledge(ack)
- if (storage.nonEmpty) {
- if (toAck > 0) {
- // stay in ACK-based mode for a while
- writeFirst()
- toAck -= 1
- } else {
- // then return to NACK-based again
- writeAll()
- context become (if (peerClosed) closing else writing)
- }
- } else if (peerClosed) context stop self
- else context become writing
- }
- }
It
should be noted that all writes which are currently buffered have also
been sent to the connection actor upon entering this state, which means
that the ResumeWriting
message is enqueued after
those writes, leading to the reception of all outstanding CommandFailed
messages (which are ignored
in this state) before receiving the WritingResumed
signal.
That latter message is sent by the connection actor only once the
internally queued write has been fully completed, meaning that a
subsequent write will not fail. This is exploited by the EchoHandler
to switch to an ACK-based
approach for the first ten writes after a failure before resuming the
optimistic write-through behavior.
- def closing: Receive = {
- case CommandFailed(_: Write) =>
- connection ! ResumeWriting
- context.become({
-
- case WritingResumed =>
- writeAll()
- context.unbecome()
-
- case ack: Int => acknowledge(ack)
-
- }, discardOld = false)
-
- case Ack(ack) =>
- acknowledge(ack)
- if (storage.isEmpty) context stop self
- }
Closing
the connection while still sending all data is a bit more involved than
in the ACK-based approach: the idea is to always send all outstanding
messages and acknowledge all successful writes, and if a failure happens
then switch behavior to await the WritingResumed
event and start over.
The helper functions are very similar to the ACK-based case:
- private def buffer(data: ByteString): Unit = {
- storage :+= data
- stored += data.size
-
- if (stored > maxStored) {
- log.warning(s"drop connection to [$remote] (buffer overrun)")
- context stop self
-
- } else if (stored > highWatermark) {
- log.debug(s"suspending reading at $currentOffset")
- connection ! SuspendReading
- suspended = true
- }
- }
-
- private def acknowledge(ack: Int): Unit = {
- require(ack == storageOffset, s"received ack $ack at $storageOffset")
- require(storage.nonEmpty, s"storage was empty at ack $ack")
-
- val size = storage(0).size
- stored -= size
- transferred += size
-
- storageOffset += 1
- storage = storage drop 1
-
- if (suspended && stored < lowWatermark) {
- log.debug("resuming reading")
- connection ! ResumeReading
- suspended = false
- }
- }
When using push based reading, data coming from the socket is sent to the actor as soon as it is available. In the case of the previous Echo server example this meant that we needed to maintain a buffer of incoming data to keep it around since the rate of writing might be slower than the rate of the arrival of new data.
With the Pull mode this buffer can be completely eliminated as the following snippet demonstrates:
- override def preStart: Unit = connection ! ResumeReading
-
- def receive = {
- case Received(data) => connection ! Write(data, Ack)
- case Ack => connection ! ResumeReading
- }
The
idea here is that reading is not resumed until the previous write has
been completely acknowledged by the connection actor. Every pull mode
connection actor starts from suspended state. To start the flow of data
we send aResumeReading
in the preStart
method to tell the
connection actor that we are ready to receive the first chunk of data.
Since we only resume reading when the previous data chunk has been
completely written there is no need for maintaining a buffer.
To
enable pull reading on an outbound connection the pullMode
parameter of the Connect
should be set to true
:
- IO(Tcp) ! Connect(listenAddress, pullMode = true)
The
previous section demonstrated how to enable pull reading mode for
outbound connections but it is possible to create a listener actor
with this mode of reading by setting the pullMode
parameter of the Bind
command to true
:
- IO(Tcp) ! Bind(self, new InetSocketAddress("localhost", 0), pullMode = true)
One of the effects of this setting is that all connections accepted by this listener actor will use pull mode reading.
Another
effect of this setting is that in addition of setting all inbound
connections to pull mode, accepting connections becomes pull based,
too. This means that after handling one (or more) Connected
events the listener actor
has to be resumed by sending it a ResumeAccepting
message.
Listener
actors with pull mode start suspended so to start accepting
connections a ResumeAccepting
command has to be sent to
the listener actor after binding was successful:
- case Bound(localAddress) =>
- // Accept connections one by one
- sender() ! ResumeAccepting(batchSize = 1)
- context.become(listening(sender()))
After handling an incoming connection we need to resume accepting again:
- def listening(listener: ActorRef): Receive = {
- case Connected(remote, local) =>
- val handler = context.actorOf(Props(classOf[PullEcho], sender()))
- sender() ! Register(handler, keepOpenOnPeerClosed = true)
- listener ! ResumeAccepting(batchSize = 1)
- }
The ResumeAccepting
accepts a batchSize
parameter that specifies
how many new connections are accepted before a next ResumeAccepting
message is needed to
resume handling of new connections.
UDP is a connectionless datagram protocol which offers two different ways of communication on the JDK level:
- sockets which are free to send datagrams to any destination and receive datagrams from any origin
- sockets which are restricted to communication with one specific remote socket address
In
the low-level API the distinction is made—confusingly—by whether or not connect
has been called on the socket
(even when connect has been called the protocol is still connectionless).
These two forms of UDP usage are offered using distinct IO extensions
described below.
- class SimpleSender(remote: InetSocketAddress) extends Actor {
- import context.system
- IO(Udp) ! Udp.SimpleSender
-
- def receive = {
- case Udp.SimpleSenderReady =>
- context.become(ready(sender()))
- }
-
- def ready(send: ActorRef): Receive = {
- case msg: String =>
- send ! Udp.Send(ByteString(msg), remote)
- }
- }
The
simplest form of UDP usage is to just send datagrams without the need
of getting a reply. To this end a “simple sender” facility is provided
as demonstrated above. The UDP extension is queried using the SimpleSender
message, which is
answered by a SimpleSenderReady
notification. The sender
of this message is the newly created sender actor which from this
point onward can be used to send datagrams to arbitrary destinations;
in this example it will just send any UTF-8 encoded String
it receives to a
predefined remote address.
Note
The
simple sender will not shut itself down because it cannot know when
you are done with it. You will need to send it a PoisonPill
when you want to close
the ephemeral port the sender is bound to.
- class Listener(nextActor: ActorRef) extends Actor {
- import context.system
- IO(Udp) ! Udp.Bind(self, new InetSocketAddress("localhost", 0))
-
- def receive = {
- case Udp.Bound(local) =>
- context.become(ready(sender()))
- }
-
- def ready(socket: ActorRef): Receive = {
- case Udp.Received(data, remote) =>
- val processed = // parse data etc., e.g. using PipelineStage
- socket ! Udp.Send(data, remote) // example server echoes back
- nextActor ! processed
- case Udp.Unbind => socket ! Udp.Unbind
- case Udp.Unbound => context.stop(self)
- }
- }
If
you want to implement a UDP server which listens on a socket for
incoming datagrams then you need to use theBind
command as shown above.
The local address specified may have a zero port in which case the
operating system will automatically choose a free port and assign it
to the new socket. Which port was actually bound can be found out by
inspecting the Bound
message.
The
sender of the Bound
message is the actor
which manages the new socket. Sending datagrams is achieved by using
the Send
message type and the
socket can be closed by sending a Unbind
command, in which case
the socket actor will reply with a Unbound
notification.
Received
datagrams are sent to the actor designated in the Bind
message, whereas the Bound
message will be sent to
the sender of the Bind
.
The
service provided by the connection based UDP API is similar to the
bind-and-send service we saw earlier, but the main difference is that a
connection is only able to send to the remoteAddress
it was connected to, and
will receive datagrams only from that address.
- class Connected(remote: InetSocketAddress) extends Actor {
- import context.system
- IO(UdpConnected) ! UdpConnected.Connect(self, remote)
-
- def receive = {
- case UdpConnected.Connected =>
- context.become(ready(sender()))
- }
-
- def ready(connection: ActorRef): Receive = {
- case UdpConnected.Received(data) =>
- // process data, send it on, etc.
- case msg: String =>
- connection ! UdpConnected.Send(ByteString(msg))
- case UdpConnected.Disconnect =>
- connection ! UdpConnected.Disconnect
- case UdpConnected.Disconnected => context.stop(self)
- }
- }
Consequently
the example shown here looks quite similar to the previous one, the
biggest difference is the absence of remote address information in Send
and Received
messages.
Note
There is a small performance benefit in using connection based UDP API over the connectionless one. If there is a SecurityManager enabled on the system, every connectionless message send has to go through a security check, while in the case of connection-based UDP the security check is cached after connect, thus writes do not suffer an additional performance penalty.
If
you want to use UDP multicast you will need to use Java 7. Akka provides
a way to control various options ofDatagramChannel
through the akka.io.Inet.SocketOption
interface. The example
below shows how to setup a receiver of multicast messages using IPv6
protocol.
To
select a Protocol Family you must extend akka.io.Inet.DatagramChannelCreator
class which extendsakka.io.Inet.SocketOption
.
Provide custom logic for opening a datagram channel by overriding create
method.
- final case class Inet6ProtocolFamily() extends DatagramChannelCreator {
- override def create() =
- DatagramChannel.open(StandardProtocolFamily.INET6)
- }
Another socket option will be needed to join a multicast group.
- final case class MulticastGroup(address: String, interface: String) extends SocketOptionV2 {
- override def afterBind(s: DatagramSocket) {
- val group = InetAddress.getByName(address)
- val networkInterface = NetworkInterface.getByName(interface)
- s.getChannel.join(group, networkInterface)
- }
- }
Socket
options must be provided to UdpMessage.Bind
message.
- import context.system
- val opts = List(Inet6ProtocolFamily(), MulticastGroup(group, iface))
- IO(Udp) ! Udp.Bind(self, new InetSocketAddress(port), opts)
The akka-camel module allows Untyped Actors to receive and send messages over a great variety of protocols and APIs. In addition to the native Scala and Java actor API, actors can now exchange messages with other systems over large number of protocols and APIs such as HTTP, SOAP, TCP, FTP, SMTP or JMS, to mention a few. At the moment, approximately 80 protocols and APIs are supported.
The akka-camel module is based on Apache Camel, a powerful and light-weight integration framework for the JVM. For an introduction to Apache Camel you may want to read this Apache Camel article. Camel comes with a large number ofcomponents that provide bindings to different protocols and APIs. The camel-extra project provides further components.
Usage of Camel's integration components in Akka is essentially a one-liner. Here's an example.
- import akka.camel.{ CamelMessage, Consumer }
-
- class MyEndpoint extends Consumer {
- def endpointUri = "mina2:tcp://localhost:6200?textline=true"
-
- def receive = {
- case msg: CamelMessage => { /* ... */ }
- case _ => { /* ... */ }
- }
- }
-
- // start and expose actor via tcp
- import akka.actor.{ ActorSystem, Props }
-
- val system = ActorSystem("some-system")
- val mina = system.actorOf(Props[MyEndpoint])
The above example exposes an actor over a TCP endpoint via Apache Camel's Mina component. The actor implements the endpointUri method to define an endpoint from which it can receive messages. After starting the actor, TCP clients can immediately send messages to and receive responses from that actor. If the message exchange should go over HTTP (via Camel's Jetty component), only the actor's endpointUri method must be changed.
- import akka.camel.{ CamelMessage, Consumer }
-
- class MyEndpoint extends Consumer {
- def endpointUri = "jetty:http://localhost:8877/example"
-
- def receive = {
- case msg: CamelMessage => { /* ... */ }
- case _ => { /* ... */ }
- }
- }
Actors can also trigger message exchanges with external systems i.e. produce to Camel endpoints.
- import akka.actor.Actor
- import akka.camel.{ Producer, Oneway }
- import akka.actor.{ ActorSystem, Props }
-
- class Orders extends Actor with Producer with Oneway {
- def endpointUri = "jms:queue:Orders"
- }
-
- val sys = ActorSystem("some-system")
- val orders = sys.actorOf(Props[Orders])
-
- orders ! <order amount="100" currency="PLN" itemId="12345"/>
In
the above example, any message sent to this actor will be sent to the
JMS queue orders
.
Producer actors may choose from the same set of Camel components as
Consumer actors do.
The number of Camel components is constantly increasing. The akka-camel module can support these in a plug-and-play manner. Just add them to your application's classpath, define a component-specific endpoint URI and use it to exchange messages over the component-specific protocols or APIs. This is possible because Camel components bind protocol-specific message formats to a Camel-specific normalized message format. The normalized message format hides protocol-specific details from Akka and makes it therefore very easy to support a large number of protocols through a uniform Camel component interface. The akka-camel module further converts mutable Camel messages into immutable representations which are used by Consumer and Producer actors for pattern matching, transformation, serialization or storage. In the above example of the Orders Producer, the XML message is put in the body of a newly created Camel Message with an empty set of headers. You can also create a CamelMessage yourself with the appropriate body and headers as you see fit.
The
akka-camel module is implemented as an Akka Extension, the CamelExtension
object. Extensions will
only be loaded once per ActorSystem
,
which will be managed by Akka. The CamelExtension
object provides access to
theCamel trait. The Camel trait in turn provides
access to two important Apache Camel objects, the CamelContext and theProducerTemplate.
Below you can see how you can get access to these Apache Camel
objects.
- val system = ActorSystem("some-system")
- val camel = CamelExtension(system)
- val camelContext = camel.context
- val producerTemplate = camel.template
One CamelExtension
is only loaded once for
every one ActorSystem
,
which makes it safe to call theCamelExtension
at any point in your code
to get to the Apache Camel objects associated with it. There is oneCamelContext and one ProducerTemplate for every one ActorSystem
that uses a CamelExtension
.
By Default, a new CamelContext is created when the CamelExtension
starts. If you want to
inject your own context instead, you can extend the ContextProvider trait and add the FQCN of
your implementation in the config, as the value of the
"akka.camel.context-provider". This interface define a single method getContext
used to load the CamelContext.
Below an example on how to add the ActiveMQ component to the CamelContext, which is required when you would like to use the ActiveMQ component.
- // import org.apache.activemq.camel.component.ActiveMQComponent
- val system = ActorSystem("some-system")
- val camel = CamelExtension(system)
- val camelContext = camel.context
- // camelContext.addComponent("activemq", ActiveMQComponent.activeMQComponent(
- // "vm://localhost?broker.persistent=false"))
The CamelContext joins the lifecycle of
the ActorSystem
and CamelExtension
it is associated with;
theCamelContext is started when the CamelExtension
is created, and it is
shut down when the associatedActorSystem
is shut down. The same is
true for the ProducerTemplate.
The CamelExtension
is used by both Producer and Consumer actors to interact with
Apache Camel internally. You can access the CamelExtension
inside a Producer or
a Consumer using the camel
definition, or get
straight at theCamelContext using the camelContext
definition. Actors are
created and started asynchronously. When a Consumeractor is created, the Consumer is published at its Camel
endpoint (more precisely, the route is added to theCamelContext from the Endpoint to the actor). When a Producer actor is created, a SendProcessor and Endpoint are created so that the
Producer can send messages to it. Publication is done asynchronously;
setting up an endpoint may still be in progress after you have
requested the actor to be created. Some Camel components can take a
while to startup, and in some cases you might want to know when the
endpoints are activated and ready to be used. The Cameltrait
allows you to find out when the endpoint is activated or deactivated.
- import akka.camel.{ CamelMessage, Consumer }
- import scala.concurrent.duration._
-
- class MyEndpoint extends Consumer {
- def endpointUri = "mina2:tcp://localhost:6200?textline=true"
-
- def receive = {
- case msg: CamelMessage => { /* ... */ }
- case _ => { /* ... */ }
- }
- }
- val system = ActorSystem("some-system")
- val camel = CamelExtension(system)
- val actorRef = system.actorOf(Props[MyEndpoint])
- // get a future reference to the activation of the endpoint of the Consumer Actor
- val activationFuture = camel.activationFutureFor(actorRef)(
- timeout = 10 seconds,
- executor = system.dispatcher)
The
above code shows that you can get a Future
to the activation of the
route from the endpoint to the actor, or you can wait in a blocking
fashion on the activation of the route. An ActivationTimeoutException
is thrown if the endpoint
could not be activated within the specified timeout. Deactivation
works in a similar fashion:
- system.stop(actorRef)
- // get a future reference to the deactivation of the endpoint of the Consumer Actor
- val deactivationFuture = camel.deactivationFutureFor(actorRef)(
- timeout = 10 seconds,
- executor = system.dispatcher)
Deactivation
of a Consumer or a Producer actor happens when the actor is
terminated. For a Consumer, the route to the actor is stopped. For a
Producer, the SendProcessor is stopped. A DeActivationTimeoutException
is thrown if the
associated camel objects could not be deactivated within the specified
timeout.
For
objects to receive messages, they must mixin the Consumer trait. For example, the
following actor class (Consumer1) implements the endpointUri method,
which is declared in the Consumer trait, in order to receive messages
from the file:data/input/actor
Camel endpoint.
- import akka.camel.{ CamelMessage, Consumer }
-
- class Consumer1 extends Consumer {
- def endpointUri = "file:data/input/actor"
-
- def receive = {
- case msg: CamelMessage => println("received %s" format msg.bodyAs[String])
- }
- }
Whenever a file is put into the data/input/actor directory, its content is picked up by the Camel file component and sent as message to the actor. Messages consumed by actors from Camel endpoints are of type CamelMessage. These are immutable representations of Camel messages.
Here's
another example that sets the endpointUri to jetty:http://localhost:8877/camel/default
.
It causes Camel's Jetty
component to start
an embedded Jetty server, accepting HTTP
connections from localhost on port 8877.
- import akka.camel.{ CamelMessage, Consumer }
-
- class Consumer2 extends Consumer {
- def endpointUri = "jetty:http://localhost:8877/camel/default"
-
- def receive = {
- case msg: CamelMessage => sender() ! ("Hello %s" format msg.bodyAs[String])
- }
- }
After
starting the actor, clients can send messages to that actor by POSTing
tohttp://localhost:8877/camel/default
.
The actor sends a response by using the sender ! method.
For returning a message body and headers to the HTTP client the response
type should be CamelMessage.
For any other response type, a new CamelMessage object is created by
akka-camel with the actor response as message body.
With in-out message exchanges, clients usually know that a message exchange is done when they receive a reply from a consumer actor. The reply message can be a CamelMessage (or any object which is then internally converted to a CamelMessage) on success, and a Failure message on failure.
With
in-only message exchanges, by default, an exchange is done when a
message is added to the consumer actor's mailbox. Any failure or
exception that occurs during processing of that message by the
consumer actor cannot be reported back to the endpoint in this case.
To allow consumer actors to positively or negatively acknowledge the
receipt of a message from an in-only message exchange, they need to
override the autoAck
method to return false.
In this case, consumer actors must reply either with a special
akka.camel.Ack message (positive acknowledgement) or a
akka.actor.Status.Failure (negative acknowledgement).
- import akka.camel.{ CamelMessage, Consumer }
- import akka.camel.Ack
- import akka.actor.Status.Failure
-
- class Consumer3 extends Consumer {
- override def autoAck = false
-
- def endpointUri = "jms:queue:test"
-
- def receive = {
- case msg: CamelMessage =>
- sender() ! Ack
- // on success
- // ..
- val someException = new Exception("e1")
- // on failure
- sender() ! Failure(someException)
- }
- }
Camel Exchanges (and their corresponding endpoints) that support two-way communications need to wait for a response from an actor before returning it to the initiating client. For some endpoint types, timeout values can be defined in an endpoint-specific way which is described in the documentation of the individual Camel components. Another option is to configure timeouts on the level of consumer actors.
Two-way
communications between a Camel endpoint and an actor are initiated by
sending the request message to the actor with the ask pattern and the actor
replies to the endpoint when the response is ready. The ask request to
the actor can timeout, which will result in the Exchange failing with a
TimeoutException set on the failure of the Exchange.
The timeout on the consumer actor can be overridden with the replyTimeout
,
as shown below.
- import akka.camel.{ CamelMessage, Consumer }
- import scala.concurrent.duration._
-
- class Consumer4 extends Consumer {
- def endpointUri = "jetty:http://localhost:8877/camel/default"
- override def replyTimeout = 500 millis
- def receive = {
- case msg: CamelMessage => sender() ! ("Hello %s" format msg.bodyAs[String])
- }
- }
For sending messages to Camel endpoints, actors need to mixin the Producer trait and implement the endpointUri method.
- import akka.actor.Actor
- import akka.actor.{ Props, ActorSystem }
- import akka.camel.{ Producer, CamelMessage }
- import akka.util.Timeout
-
- class Producer1 extends Actor with Producer {
- def endpointUri = "http://localhost:8080/news"
- }
Producer1 inherits a default implementation of the receive method from the Producer trait. To customize a producer actor's default behavior you must override the Producer.transformResponse and Producer.transformOutgoingMessage methods. This is explained later in more detail. Producer Actors cannot override the default Producer.receive method.
Any
message sent to a Producer actor will be sent to the
associated Camel endpoint, in the above example tohttp://localhost:8080/news
.
The Producer always sends messages
asynchronously. Response messages (if supported by the configured
endpoint) will, by default, be returned to the original sender. The
following example uses the ask pattern to send a message to a Producer
actor and waits for a response.
- import akka.pattern.ask
- import scala.concurrent.duration._
- implicit val timeout = Timeout(10 seconds)
-
- val system = ActorSystem("some-system")
- val producer = system.actorOf(Props[Producer1])
- val future = producer.ask("some request").mapTo[CamelMessage]
The
future contains the response CamelMessage, or an AkkaCamelException
when an error occurred,
which contains the headers of the response.
Instead of replying to the initial sender, producer actors can implement custom response processing by overriding the routeResponse method. In the following example, the response message is forwarded to a target actor instead of being replied to the original sender.
- import akka.actor.{ Actor, ActorRef }
- import akka.camel.{ Producer, CamelMessage }
- import akka.actor.{ Props, ActorSystem }
-
- class ResponseReceiver extends Actor {
- def receive = {
- case msg: CamelMessage =>
- // do something with the forwarded response
- }
- }
-
- class Forwarder(uri: String, target: ActorRef) extends Actor with Producer {
- def endpointUri = uri
-
- override def routeResponse(msg: Any) { target forward msg }
- }
- val system = ActorSystem("some-system")
- val receiver = system.actorOf(Props[ResponseReceiver])
- val forwardResponse = system.actorOf(
- Props(classOf[Forwarder], this, "http://localhost:8080/news/akka", receiver))
- // the Forwarder sends out a request to the web page and forwards the response to
- // the ResponseReceiver
- forwardResponse ! "some request"
Before producing messages to endpoints, producer actors can pre-process them by overriding theProducer.transformOutgoingMessage method.
- import akka.actor.Actor
- import akka.camel.{ Producer, CamelMessage }
-
- class Transformer(uri: String) extends Actor with Producer {
- def endpointUri = uri
-
- def upperCase(msg: CamelMessage) = msg.mapBody {
- body: String => body.toUpperCase
- }
-
- override def transformOutgoingMessage(msg: Any) = msg match {
- case msg: CamelMessage => upperCase(msg)
- }
- }
The interaction of producer actors with Camel endpoints can be configured to be one-way or two-way (by initiating in-only or in-out message exchanges, respectively). By default, the producer initiates an in-out message exchange with the endpoint. For initiating an in-only exchange, producer actors have to override the oneway method to return true.
- import akka.actor.{ Actor, Props, ActorSystem }
- import akka.camel.Producer
-
- class OnewaySender(uri: String) extends Actor with Producer {
- def endpointUri = uri
- override def oneway: Boolean = true
- }
-
- val system = ActorSystem("some-system")
- val producer = system.actorOf(Props(classOf[OnewaySender], this, "activemq:FOO.BAR"))
- producer ! "Some message"
To correlate request with response messages, applications can set the Message.MessageExchangeId message header.
- import akka.camel.{ Producer, CamelMessage }
- import akka.actor.Actor
- import akka.actor.{ Props, ActorSystem }
-
- class Producer2 extends Actor with Producer {
- def endpointUri = "activemq:FOO.BAR"
- }
- val system = ActorSystem("some-system")
- val producer = system.actorOf(Props[Producer2])
-
- producer ! CamelMessage("bar", Map(CamelMessage.MessageExchangeId -> "123"))
The Producer trait is a very convenient way for actors to produce messages to Camel endpoints. Actors may also use a Camel ProducerTemplate for producing messages to endpoints.
- import akka.actor.Actor
- class MyActor extends Actor {
- def receive = {
- case msg =>
- val template = CamelExtension(context.system).template
- template.sendBody("direct:news", msg)
- }
- }
For
initiating a two-way message exchange, one of the ProducerTemplate.request*
methods must be used.
- import akka.actor.Actor
- class MyActor extends Actor {
- def receive = {
- case msg =>
- val template = CamelExtension(context.system).template
- sender() ! template.requestBody("direct:news", msg)
- }
- }
In-out message exchanges between endpoints and actors are designed to be asynchronous. This is the case for both, consumer and producer actors.
!
(tell) operator and the
actor returns responses with sender !
once they are ready.routeResponse
method in Custom
Processing).However, asynchronous two-way message exchanges, without allocating a thread for the full duration of exchange, cannot be generically supported by Camel's asynchronous routing engine alone. This must be supported by the individual Camel components (from which endpoints are created) as well. They must be able to suspend any work started for request processing (thereby freeing threads to do other work) and resume processing when the response is ready. This is currently the case for a subset of components such as the Jetty component. All other Camel components can still be used, of course, but they will cause allocation of a thread for the duration of an in-out message exchange. There's also Examples that implements both, an asynchronous consumer and an asynchronous producer, with the jetty component.
If
the used Camel component is blocking it might be necessary to use a
separate dispatcher for the producer. The Camel
processor is invoked by a child actor of the producer and the dispatcher
can be defined in the deployment section of the configuration. For
example, if your producer actor has path /user/integration/output
the dispatcher of the child
actor can be defined with:
- akka.actor.deployment {
- /integration/output/* {
- dispatcher = my-dispatcher
- }
- }
In all the examples so far, routes to consumer actors have been automatically constructed by akka-camel, when the actor was started. Although the default route construction templates, used by akka-camel internally, are sufficient for most use cases, some applications may require more specialized routes to actors. The akka-camel module provides two mechanisms for customizing routes to actors, which will be explained in this section. These are:
Akka actors can be accessed from Camel routes using the actor Camel component. This component can be used to access any Akka actor (not only consumer actors) from Camel routes, as described in the following sections.
To access actors from custom Camel routes, the actor Camel component should be used. It fully supports Camel'sasynchronous routing engine.
This component accepts the following endpoint URI format:
[<actor-path>]?<options>
where <actor-path>
is the ActorPath
to the actor. The <options>
are name-value pairs
separated by &
(i.e.name1=value1&name2=value2&...
).
The following URI options are supported:
Name | Type | Default | Description |
---|---|---|---|
replyTimeout | Duration | false |
The
reply timeout, specified in the same way that you use the
duration in akka, for instance See also Consumer timeout. |
autoAck | Boolean | true |
If set to true, in-only message exchanges are auto-acknowledged when the message is added to the actor's mailbox. If set to false, actors must acknowledge the receipt of the message. See also Delivery acknowledgements. |
Here's an actor endpoint URI example containing an actor path:
- akka://some-system/user/myconsumer?autoAck=false&replyTimeout=100+millis
In
the following example, a custom route to an actor is created, using
the actor's path. the Akka camel package contains an implicit toActorRouteDefinition
that allows for a route
to reference an ActorRef
directly as shown in
the below example, The route starts from a Jetty endpoint and ends at
the target actor.
- import akka.actor.{ Props, ActorSystem, Actor, ActorRef }
- import akka.camel.{ CamelMessage, CamelExtension }
- import org.apache.camel.builder.RouteBuilder
- import akka.camel._
- class Responder extends Actor {
- def receive = {
- case msg: CamelMessage =>
- sender() ! (msg.mapBody {
- body: String => "received %s" format body
- })
- }
- }
-
- class CustomRouteBuilder(system: ActorSystem, responder: ActorRef)
- extends RouteBuilder {
- def configure {
- from("jetty:http://localhost:8877/camel/custom").to(responder)
- }
- }
- val system = ActorSystem("some-system")
- val camel = CamelExtension(system)
- val responder = system.actorOf(Props[Responder], name = "TestResponder")
- camel.context.addRoutes(new CustomRouteBuilder(system, responder))
When a message is received on the jetty endpoint, it is routed to the Responder actor, which in return replies back to the client of the HTTP request.
The previous section, Akka Camel components, explained how to setup a route to an actor manually. It was the application's responsibility to define the route and add it to the current CamelContext. This section explains a more convenient way to define custom routes: akka-camel is still setting up the routes to consumer actors (and adds these routes to the current CamelContext) but applications can define extensions to these routes. Extensions can be defined with Camel's Java DSL or Scala DSL. For example, an extension could be a custom error handler that redelivers messages from an endpoint to an actor's bounded mailbox when the mailbox was full.
The following examples demonstrate how to extend a route to a consumer actor for handling exceptions thrown by that actor.
- import akka.camel.Consumer
-
- import org.apache.camel.builder.Builder
- import org.apache.camel.model.RouteDefinition
-
- class ErrorThrowingConsumer(override val endpointUri: String) extends Consumer {
- def receive = {
- case msg: CamelMessage => throw new Exception("error: %s" format msg.body)
- }
- override def onRouteDefinition = (rd) => rd.onException(classOf[Exception]).
- handled(true).transform(Builder.exceptionMessage).end
-
- final override def preRestart(reason: Throwable, message: Option[Any]) {
- sender() ! Failure(reason)
- }
- }
The above ErrorThrowingConsumer sends the Failure back to the sender in preRestart because the Exception that is thrown in the actor would otherwise just crash the actor, by default the actor would be restarted, and the response would never reach the client of the Consumer.
The akka-camel module creates a RouteDefinition instance by calling from(endpointUri) on a Camel RouteBuilder (where endpointUri is the endpoint URI of the consumer actor) and passes that instance as argument to the route definition handler *). The route definition handler then extends the route and returns a ProcessorDefinition (in the above example, the ProcessorDefinition returned by the end method. See the org.apache.camel.model package for details). After executing the route definition handler, akka-camel finally calls a to(targetActorUri) on the returned ProcessorDefinition to complete the route to the consumer actor (where targetActorUri is the actor component URI as described in Access to actors). If the actor cannot be found, a ActorNotRegisteredException is thrown.
*) Before passing the RouteDefinition instance to the route definition handler, akka-camel may make some further modifications to it.
The Lightbend Activator tutorial named Akka Camel Samples with Scala contains 3 samples:
- Asynchronous routing and transformation - This example demonstrates how to implement consumer and producer actors that support Asynchronous routing with their Camel endpoints.
- Custom Camel route - Demonstrates the combined usage of a
Producer
and aConsumer
actor as well as the inclusion of a custom Camel route.- Quartz Scheduler Example - Showing how simple is to implement a cron-style scheduler by using the Camel Quartz component
There are several configuration properties for the Camel module, please refer to the reference configuration.
For an introduction to akka-camel 2, see also the Peter Gabryanczyk's talk Migrating akka-camel module to Akka 2.x.
For an introduction to akka-camel 1, see also the Appendix E - Akka and Camel (pdf) of the book Camel in Action.
Other, more advanced external articles (for version 1) are: