Переклад українською - Арсеній Чеботарьов - Київ 2016

Akka Networking Cluster Specification

Note

This document describes the design concepts of the clustering.

Intro

Akka Cluster provides a fault-tolerant decentralized peer-to-peer based cluster membership service with no single point of failure or single point of bottleneck. It does this using gossip protocols and an automatic failure detector.

Terms

node
A logical member of a cluster. There could be multiple nodes on a physical machine. Defined by a hostname:port:uid tuple.
cluster
A set of nodes joined together through the membership service.
leader
A single node in the cluster that acts as the leader. Managing cluster convergence and membership state transitions.

Membership

A cluster is made up of a set of member nodes. The identifier for each node is a hostname:port:uid tuple. An Akka application can be distributed over a cluster with each node hosting some part of the application. Cluster membership and the actors running on that node of the application are decoupled. A node could be a member of a cluster without hosting any actors. Joining a cluster is initiated by issuing a Join command to one of the nodes in the cluster to join.

The node identifier internally also contains a UID that uniquely identifies this actor system instance at thathostname:port. Akka uses the UID to be able to reliably trigger remote death watch. This means that the same actor system can never join a cluster again once it's been removed from that cluster. To re-join an actor system with the samehostname:port to a cluster you have to stop the actor system and start a new one with the same hostname:portwhich will then receive a different UID.

The cluster membership state is a specialized CRDT, which means that it has a monotonic merge function. When concurrent changes occur on different nodes the updates can always be merged and converge to the same end result.

Gossip

The cluster membership used in Akka is based on Amazon's Dynamo system and particularly the approach taken in Basho's' Riak distributed database. Cluster membership is communicated using a Gossip Protocol, where the current state of the cluster is gossiped randomly through the cluster, with preference to members that have not seen the latest version.

Vector Clocks

Vector clocks are a type of data structure and algorithm for generating a partial ordering of events in a distributed system and detecting causality violations.

We use vector clocks to reconcile and merge differences in cluster state during gossiping. A vector clock is a set of (node, counter) pairs. Each update to the cluster state has an accompanying update to the vector clock.

Gossip Convergence

Information about the cluster converges locally at a node at certain points in time. This is when a node can prove that the cluster state he is observing has been observed by all other nodes in the cluster. Convergence is implemented by passing a set of nodes that have seen current state version during gossip. This information is referred to as the seen set in the gossip overview. When all nodes are included in the seen set there is convergence.

Gossip convergence cannot occur while any nodes are unreachable. The nodes need to become reachable again, or moved to the down and removed states (see the Membership Lifecycle section below). This only blocks the leader from performing its cluster membership management and does not influence the application running on top of the cluster. For example this means that during a network partition it is not possible to add more nodes to the cluster. The nodes can join, but they will not be moved to the up state until the partition has healed or the unreachable nodes have been downed.

Failure Detector

The failure detector is responsible for trying to detect if a node is unreachable from the rest of the cluster. For this we are using an implementation of The Phi Accrual Failure Detector by Hayashibara et al.

An accrual failure detector decouple monitoring and interpretation. That makes them applicable to a wider area of scenarios and more adequate to build generic failure detection services. The idea is that it is keeping a history of failure statistics, calculated from heartbeats received from other nodes, and is trying to do educated guesses by taking multiple factors, and how they accumulate over time, into account in order to come up with a better guess if a specific node is up or down. Rather than just answering "yes" or "no" to the question "is the node down?" it returns a phi value representing the likelihood that the node is down.

The threshold that is the basis for the calculation is configurable by the user. A low threshold is prone to generate many wrong suspicions but ensures a quick detection in the event of a real crash. Conversely, a high thresholdgenerates fewer mistakes but needs more time to detect actual crashes. The default threshold is 8 and is appropriate for most situations. However in cloud environments, such as Amazon EC2, the value could be increased to 12 in order to account for network issues that sometimes occur on such platforms.

In a cluster each node is monitored by a few (default maximum 5) other nodes, and when any of these detects the node as unreachable that information will spread to the rest of the cluster through the gossip. In other words, only one node needs to mark a node unreachable to have the rest of the cluster mark that node unreachable.

The nodes to monitor are picked out of neighbors in a hashed ordered node ring. This is to increase the likelihood to monitor across racks and data centers, but the order is the same on all nodes, which ensures full coverage.

Heartbeats are sent out every second and every heartbeat is performed in a request/reply handshake with the replies used as input to the failure detector.

The failure detector will also detect if the node becomes reachable again. When all nodes that monitored theunreachable node detects it as reachable again the cluster, after gossip dissemination, will consider it asreachable.

If system messages cannot be delivered to a node it will be quarantined and then it cannot come back fromunreachable. This can happen if the there are too many unacknowledged system messages (e.g. watch, Terminated, remote actor deployment, failures of actors supervised by remote parent). Then the node needs to be moved to thedown or removed states (see the Membership Lifecycle section below) and the actor system must be restarted before it can join the cluster again.

Leader

After gossip convergence a leader for the cluster can be determined. There is no leader election process, theleader can always be recognised deterministically by any node whenever there is gossip convergence. The leader is just a role, any node can be the leader and it can change between convergence rounds. The leader is simply the first node in sorted order that is able to take the leadership role, where the preferred member states for a leader are upand leaving (see the Membership Lifecycle section below for more information about member states).

The role of the leader is to shift members in and out of the cluster, changing joining members to the up state orexiting members to the removed state. Currently leader actions are only triggered by receiving a new cluster state with gossip convergence.

The leader also has the power, if configured so, to "auto-down" a node that according to the Failure Detector is considered unreachable. This means setting the unreachable node status to down automatically after a configured time of unreachability.

Seed Nodes

The seed nodes are configured contact points for new nodes joining the cluster. When a new node is started it sends a message to all seed nodes and then sends a join command to the seed node that answers first.

The seed nodes configuration value does not have any influence on the running cluster itself, it is only relevant for new nodes joining the cluster as it helps them to find contact points to send the join command to; a new member can send this command to any current member of the cluster, not only to the seed nodes.

Gossip Protocol

A variation of push-pull gossip is used to reduce the amount of gossip information sent around the cluster. In push-pull gossip a digest is sent representing current versions but not actual values; the recipient of the gossip can then send back any values for which it has newer versions and also request values for which it has outdated versions. Akka uses a single shared state with a vector clock for versioning, so the variant of push-pull gossip used in Akka makes use of this version to only push the actual state as needed.

Periodically, the default is every 1 second, each node chooses another random node to initiate a round of gossip with. If less than ½ of the nodes resides in the seen set (have seen the new state) then the cluster gossips 3 times instead of once every second. This adjusted gossip interval is a way to speed up the convergence process in the early dissemination phase after a state change.

The choice of node to gossip with is random but it is biased to towards nodes that might not have seen the current state version. During each round of gossip exchange when no convergence it uses a probability of 0.8 (configurable) to gossip to a node not part of the seen set, i.e. that probably has an older version of the state. Otherwise gossip to any random live node.

This biased selection is a way to speed up the convergence process in the late dissemination phase after a state change.

For clusters larger than 400 nodes (configurable, and suggested by empirical evidence) the 0.8 probability is gradually reduced to avoid overwhelming single stragglers with too many concurrent gossip requests. The gossip receiver also has a mechanism to protect itself from too many simultaneous gossip messages by dropping messages that have been enqueued in the mailbox for too long time.

While the cluster is in a converged state the gossiper only sends a small gossip status message containing the gossip version to the chosen node. As soon as there is a change to the cluster (meaning non-convergence) then it goes back to biased gossip again.

The recipient of the gossip state or the gossip status can use the gossip version (vector clock) to determine whether:

  1. it has a newer version of the gossip state, in which case it sends that back to the gossiper
  2. it has an outdated version of the state, in which case the recipient requests the current state from the gossiper by sending back its version of the gossip state
  3. it has conflicting gossip versions, in which case the different versions are merged and sent back

If the recipient and the gossip have the same version then the gossip state is not sent or requested.

The periodic nature of the gossip has a nice batching effect of state changes, e.g. joining several nodes quickly after each other to one node will result in only one state change to be spread to other members in the cluster.

The gossip messages are serialized with protobuf and also gzipped to reduce payload size.

Membership Lifecycle

A node begins in the joining state. Once all nodes have seen that the new node is joining (through gossip convergence) the leader will set the member state to up.

If a node is leaving the cluster in a safe, expected manner then it switches to the leaving state. Once the leader sees the convergence on the node in the leaving state, the leader will then move it to exiting. Once all nodes have seen the exiting state (convergence) the leader will remove the node from the cluster, marking it as removed.

If a node is unreachable then gossip convergence is not possible and therefore any leader actions are also not possible (for instance, allowing a node to become a part of the cluster). To be able to move forward the state of theunreachable nodes must be changed. It must become reachable again or marked as down. If the node is to join the cluster again the actor system must be restarted and go through the joining process again. The cluster can, through the leader, also auto-down a node after a configured time of unreachability. If new incarnation of unreachable node tries to rejoin the cluster old incarnation will be marked as down and new incarnation can rejoin the cluster without manual intervention.

Note

If you have auto-down enabled and the failure detector triggers, you can over time end up with a lot of single node clusters if you don't put measures in place to shut down nodes that have become unreachable. This follows from the fact that the unreachable node will likely see the rest of the cluster as unreachable, become its own leader and form its own cluster.

As mentioned before, if a node is unreachable then gossip convergence is not possible and therefore any leaderactions are also not possible. By enabling akka.cluster.allow-weakly-up-members it is possible to let new joining nodes be promoted while convergence is not yet reached. These Joining nodes will be promoted as WeaklyUp. Once gossip convergence is reached, the leader will move WeaklyUp members to Up.

Note that members on the other side of a network partition have no knowledge about the existence of the new members. You should for example not count WeaklyUp members in quorum decisions.

State Diagram for the Member States (akka.cluster.allow-weakly-up-members=off)

../_images/member-states.png

State Diagram for the Member States (akka.cluster.allow-weakly-up-members=on)

../_images/member-states-weakly-up.png

Member States

  • joining

    transient state when joining a cluster

  • weakly up

    transient state while network split (only if akka.cluster.allow-weakly-up-members=on)

  • up

    normal operating state

  • leaving / exiting

    states during graceful removal

  • down

    marked as down (no longer part of cluster decisions)

  • removed

    tombstone state (no longer a member)

User Actions

  • join

    join a single node to a cluster - can be explicit or automatic on startup if a node to join have been specified in the configuration

  • leave

    tell a node to leave the cluster gracefully

  • down

    mark a node as down

Leader Actions

The leader has the following duties:

  • shifting members in and out of the cluster
    • joining -> up
    • exiting -> removed

Failure Detection and Unreachability

  • fd*

    the failure detector of one of the monitoring nodes has triggered causing the monitored node to be marked as unreachable

  • unreachable*

    unreachable is not a real member states but more of a flag in addition to the state signaling that the cluster is unable to talk to this node, after being unreachable the failure detector may detect it as reachable again and thereby remove the flag

Cluster Usage

For introduction to the Akka Cluster concepts please see Cluster Specification.

Preparing Your Project for Clustering

The Akka cluster is a separate jar file. Make sure that you have the following dependency in your project:

  1. "com.typesafe.akka" %% "akka-cluster" % "2.4.10"

A Simple Cluster Example

The following configuration enables the Cluster extension to be used. It joins the cluster and an actor subscribes to cluster membership events and logs them.

The application.conf configuration looks like this:

  1. akka {
  2. actor {
  3. provider = "akka.cluster.ClusterActorRefProvider"
  4. }
  5. remote {
  6. log-remote-lifecycle-events = off
  7. netty.tcp {
  8. hostname = "127.0.0.1"
  9. port = 0
  10. }
  11. }
  12.  
  13. cluster {
  14. seed-nodes = [
  15. "akka.tcp://ClusterSystem@127.0.0.1:2551",
  16. "akka.tcp://ClusterSystem@127.0.0.1:2552"]
  17.  
  18. # auto downing is NOT safe for production deployments.
  19. # you may want to use it during development, read more about it in the docs.
  20. #
  21. # auto-down-unreachable-after = 10s
  22. }
  23. }
  24.  
  25. # Disable legacy metrics in akka-cluster.
  26. akka.cluster.metrics.enabled=off
  27.  
  28. # Enable metrics extension in akka-cluster-metrics.
  29. akka.extensions=["akka.cluster.metrics.ClusterMetricsExtension"]
  30.  
  31. # Sigar native library extract location during tests.
  32. # Note: use per-jvm-instance folder when running multiple jvm on one host.
  33. akka.cluster.metrics.native-library-extract-folder=${user.dir}/target/native

To enable cluster capabilities in your Akka project you should, at a minimum, add the Remoting settings, but withakka.cluster.ClusterActorRefProvider. The akka.cluster.seed-nodes should normally also be added to your application.conf file.

Note

If you are running Akka in a Docker container or the nodes for some other reason have separate internal and external ip addresses you must configure remoting according to Akka behind NAT or in a Docker container

The seed nodes are configured contact points for initial, automatic, join of the cluster.

Note that if you are going to start the nodes on different machines you need to specify the ip-addresses or host names of the machines in application.conf instead of 127.0.0.1

An actor that uses the cluster extension may look like this:

  1. package sample.cluster.simple
  2.  
  3. import akka.cluster.Cluster
  4. import akka.cluster.ClusterEvent._
  5. import akka.actor.ActorLogging
  6. import akka.actor.Actor
  7.  
  8. class SimpleClusterListener extends Actor with ActorLogging {
  9.  
  10. val cluster = Cluster(context.system)
  11.  
  12. // subscribe to cluster changes, re-subscribe when restart
  13. override def preStart(): Unit = {
  14. //#subscribe
  15. cluster.subscribe(self, initialStateMode = InitialStateAsEvents,
  16. classOf[MemberEvent], classOf[UnreachableMember])
  17. //#subscribe
  18. }
  19. override def postStop(): Unit = cluster.unsubscribe(self)
  20.  
  21. def receive = {
  22. case MemberUp(member) =>
  23. log.info("Member is Up: {}", member.address)
  24. case UnreachableMember(member) =>
  25. log.info("Member detected as unreachable: {}", member)
  26. case MemberRemoved(member, previousStatus) =>
  27. log.info("Member is Removed: {} after {}",
  28. member.address, previousStatus)
  29. case _: MemberEvent => // ignore
  30. }
  31. }

The actor registers itself as subscriber of certain cluster events. It receives events corresponding to the current state of the cluster when the subscription starts and then it receives events for changes that happen in the cluster.

The easiest way to run this example yourself is to download Lightbend Activator and open the tutorial named Akka Cluster Samples with Scala. It contains instructions of how to run the SimpleClusterApp.

Joining to Seed Nodes

You may decide if joining to the cluster should be done manually or automatically to configured initial contact points, so-called seed nodes. When a new node is started it sends a message to all seed nodes and then sends join command to the one that answers first. If no one of the seed nodes replied (might not be started yet) it retries this procedure until successful or shutdown.

You define the seed nodes in the Configuration file (application.conf):

  1. akka.cluster.seed-nodes = [
  2. "akka.tcp://ClusterSystem@host1:2552",
  3. "akka.tcp://ClusterSystem@host2:2552"]

This can also be defined as Java system properties when starting the JVM using the following syntax:

  1. -Dakka.cluster.seed-nodes.0=akka.tcp://ClusterSystem@host1:2552
  2. -Dakka.cluster.seed-nodes.1=akka.tcp://ClusterSystem@host2:2552

The seed nodes can be started in any order and it is not necessary to have all seed nodes running, but the node configured as the first element in the seed-nodes configuration list must be started when initially starting a cluster, otherwise the other seed-nodes will not become initialized and no other node can join the cluster. The reason for the special first seed node is to avoid forming separated islands when starting from an empty cluster. It is quickest to start all configured seed nodes at the same time (order doesn't matter), otherwise it can take up to the configured seed-node-timeout until the nodes can join.

Once more than two seed nodes have been started it is no problem to shut down the first seed node. If the first seed node is restarted, it will first try to join the other seed nodes in the existing cluster.

If you don't configure seed nodes you need to join the cluster programmatically or manually.

Manual joining can be performed by using ref:cluster_jmx_scala or Command Line Management. Joining programmatically can be performed with Cluster(system).join. Unsuccessful join attempts are automatically retried after the time period defined in configuration property retry-unsuccessful-join-after. Retries can be disabled by setting the property to off.

You can join to any node in the cluster. It does not have to be configured as a seed node. Note that you can only join to an existing cluster member, which means that for bootstrapping some node must join itself,and then the following nodes could join them to make up a cluster.

You may also use Cluster(system).joinSeedNodes to join programmatically, which is attractive when dynamically discovering other nodes at startup by using some external tool or API. When using joinSeedNodes you should not include the node itself except for the node that is supposed to be the first seed node, and that should be placed first in parameter to joinSeedNodes.

Unsuccessful attempts to contact seed nodes are automatically retried after the time period defined in configuration property seed-node-timeout. Unsuccessful attempt to join a specific seed node is automatically retried after the configured retry-unsuccessful-join-after. Retrying means that it tries to contact all seed nodes and then joins the node that answers first. The first node in the list of seed nodes will join itself if it cannot contact any of the other seed nodes within the configured seed-node-timeout.

An actor system can only join a cluster once. Additional attempts will be ignored. When it has successfully joined it must be restarted to be able to join another cluster or to join the same cluster again.It can use the same host name and port after the restart, when it come up as new incarnation of existing member in the cluster, trying to join in, then the existing one will be removed from the cluster and then it will be allowed to join.

Note

The name of the ActorSystem must be the same for all members of a cluster. The name is given when you start the ActorSystem.

Downing

When a member is considered by the failure detector to be unreachable the leader is not allowed to perform its duties, such as changing status of new joining members to 'Up'. The node must first become reachable again, or the status of the unreachable member must be changed to 'Down'. Changing status to 'Down' can be performed automatically or manually. By default it must be done manually, using JMX or Command Line Management.

It can also be performed programmatically with Cluster(system).down(address).

A pre-packaged solution for the downing problem is provided by Split Brain Resolver, which is part of the Lightbend Reactive Platform. If you don’t use RP, you should anyway carefully read the documentation of the Split Brain Resolver and make sure that the solution you are using handles the concerns described there.

Auto-downing (DO NOT USE)

There is an atomatic downing feature that you should not use in production. For testing purpose you can enable it with configuration:

  1. akka.cluster.auto-down-unreachable-after = 120s

This means that the cluster leader member will change the unreachable node status to down automatically after the configured time of unreachability.

This is a naïve approach to remove unreachable nodes from the cluster membership. It works great for crashes and short transient network partitions, but not for long network partitions. Both sides of the network partition will see the other side as unreachable and after a while remove it from its cluster membership. Since this happens on both sides the result is that two separate disconnected clusters have been created. This can also happen because of long GC pauses or system overload.

Warning

We recommend against using the auto-down feature of Akka Cluster in production. This is crucial for correct behavior if you use Cluster Singleton or Cluster Sharding, especially together with Akka Persistence. For Akka Persistence with Cluster Sharding it can result in corrupt data in case of network partitions.

Leaving

There are two ways to remove a member from the cluster.

You can just stop the actor system (or the JVM process). It will be detected as unreachable and removed after the automatic or manual downing as described above.

A more graceful exit can be performed if you tell the cluster that a node shall leave. This can be performed using JMX orCommand Line Management. It can also be performed programmatically with:

  1. val cluster = Cluster(system)
  2. cluster.leave(cluster.selfAddress)

Note that this command can be issued to any member in the cluster, not necessarily the one that is leaving. The cluster extension, but not the actor system or JVM, of the leaving member will be shutdown after the leader has changed status of the member to Exiting. Thereafter the member will be removed from the cluster. Normally this is handled automatically, but in case of network failures during this process it might still be necessary to set the node’s status toDown in order to complete the removal.

WeaklyUp Members

If a node is unreachable then gossip convergence is not possible and therefore any leader actions are also not possible. However, we still might want new nodes to join the cluster in this scenario.

Warning

The WeaklyUp feature is marked as “experimental” as of its introduction in Akka 2.4.0. We will continue to improve this feature based on our users’ feedback, which implies that while we try to keep incompatible changes to a minimum the binary compatibility guarantee for maintenance releases does not apply this feature.

This feature is disabled by default. With a configuration option you can allow this behavior:

  1. akka.cluster.allow-weakly-up-members = on

When allow-weakly-up-members is enabled and there is no gossip convergence, Joining members will be promoted to WeaklyUp and they will become part of the cluster. Once gossip convergence is reached, the leader will move WeaklyUp members to Up.

You can subscribe to the WeaklyUp membership event to make use of the members that are in this state, but you should be aware of that members on the other side of a network partition have no knowledge about the existence of the new members. You should for example not count WeaklyUp members in quorum decisions.

Warning

This feature is only available from Akka 2.4.0 and cannot be used if some of your cluster members are running an older version of Akka.

Subscribe to Cluster Events

You can subscribe to change notifications of the cluster membership by using Cluster(system).subscribe.

  1. cluster.subscribe(self, classOf[MemberEvent], classOf[UnreachableMember])

A snapshot of the full state, akka.cluster.ClusterEvent.CurrentClusterState, is sent to the subscriber as the first message, followed by events for incremental updates.

Note that you may receive an empty CurrentClusterState, containing no members, if you start the subscription before the initial join procedure has completed. This is expected behavior. When the node has been accepted in the cluster you will receive MemberUp for that node, and other nodes.

If you find it inconvenient to handle the CurrentClusterState you can use ClusterEvent.InitialStateAsEventsas parameter to subscribe. That means that instead of receiving CurrentClusterState as the first message you will receive the events corresponding to the current state to mimic what you would have seen if you were listening to the events when they occurred in the past. Note that those initial events only correspond to the current state and it is not the full history of all changes that actually has occurred in the cluster.

  1. cluster.subscribe(self, initialStateMode = InitialStateAsEvents,
  2. classOf[MemberEvent], classOf[UnreachableMember])

The events to track the life-cycle of members are:

There are more types of change events, consult the API documentation of classes that extendsakka.cluster.ClusterEvent.ClusterDomainEvent for details about the events.

Instead of subscribing to cluster events it can sometimes be convenient to only get the full membership state withCluster(system).state. Note that this state is not necessarily in sync with the events published to a cluster subscription.

Worker Dial-in Example

Let's take a look at an example that illustrates how workers, here named backend, can detect and register to new master nodes, here named frontend.

The example application provides a service to transform text. When some text is sent to one of the frontend services, it will be delegated to one of the backend workers, which performs the transformation job, and sends the result back to the original client. New backend nodes, as well as new frontend nodes, can be added or removed to the cluster dynamically.

Messages:

  1. final case class TransformationJob(text: String)
  2. final case class TransformationResult(text: String)
  3. final case class JobFailed(reason: String, job: TransformationJob)
  4. case object BackendRegistration

The backend worker that performs the transformation job:

  1. class TransformationBackend extends Actor {
  2.  
  3. val cluster = Cluster(context.system)
  4.  
  5. // subscribe to cluster changes, MemberUp
  6. // re-subscribe when restart
  7. override def preStart(): Unit = cluster.subscribe(self, classOf[MemberUp])
  8. override def postStop(): Unit = cluster.unsubscribe(self)
  9.  
  10. def receive = {
  11. case TransformationJob(text) => sender() ! TransformationResult(text.toUpperCase)
  12. case state: CurrentClusterState =>
  13. state.members.filter(_.status == MemberStatus.Up) foreach register
  14. case MemberUp(m) => register(m)
  15. }
  16.  
  17. def register(member: Member): Unit =
  18. if (member.hasRole("frontend"))
  19. context.actorSelection(RootActorPath(member.address) / "user" / "frontend") !
  20. BackendRegistration
  21. }

Note that the TransformationBackend actor subscribes to cluster events to detect new, potential, frontend nodes, and send them a registration message so that they know that they can use the backend worker.

The frontend that receives user jobs and delegates to one of the registered backend workers:

  1. class TransformationFrontend extends Actor {
  2.  
  3. var backends = IndexedSeq.empty[ActorRef]
  4. var jobCounter = 0
  5.  
  6. def receive = {
  7. case job: TransformationJob if backends.isEmpty =>
  8. sender() ! JobFailed("Service unavailable, try again later", job)
  9.  
  10. case job: TransformationJob =>
  11. jobCounter += 1
  12. backends(jobCounter % backends.size) forward job
  13.  
  14. case BackendRegistration if !backends.contains(sender()) =>
  15. context watch sender()
  16. backends = backends :+ sender()
  17.  
  18. case Terminated(a) =>
  19. backends = backends.filterNot(_ == a)
  20. }
  21. }

Note that the TransformationFrontend actor watch the registered backend to be able to remove it from its list of available backend workers. Death watch uses the cluster failure detector for nodes in the cluster, i.e. it detects network failures and JVM crashes, in addition to graceful termination of watched actor. Death watch generates the Terminatedmessage to the watching actor when the unreachable cluster node has been downed and removed.

The Lightbend Activator tutorial named Akka Cluster Samples with Scala. contains the full source code and instructions of how to run the Worker Dial-in Example.

Node Roles

Not all nodes of a cluster need to perform the same function: there might be one sub-set which runs the web front-end, one which runs the data access layer and one for the number-crunching. Deployment of actors—for example by cluster-aware routers—can take node roles into account to achieve this distribution of responsibilities.

The roles of a node is defined in the configuration property named akka.cluster.roles and it is typically defined in the start script as a system property or environment variable.

The roles of the nodes is part of the membership information in MemberEvent that you can subscribe to.

How To Startup when Cluster Size Reached

A common use case is to start actors after the cluster has been initialized, members have joined, and the cluster has reached a certain size.

With a configuration option you can define required number of members before the leader changes member status of 'Joining' members to 'Up'.

  1. akka.cluster.min-nr-of-members = 3

In a similar way you can define required number of members of a certain role before the leader changes member status of 'Joining' members to 'Up'.

  1. akka.cluster.role {
  2. frontend.min-nr-of-members = 1
  3. backend.min-nr-of-members = 2
  4. }

You can start the actors in a registerOnMemberUp callback, which will be invoked when the current member status is changed to 'Up', i.e. the cluster has at least the defined number of members.

  1. Cluster(system) registerOnMemberUp {
  2. system.actorOf(Props(classOf[FactorialFrontend], upToN, true),
  3. name = "factorialFrontend")
  4. }

This callback can be used for other things than starting actors.

How To Cleanup when Member is Removed

You can do some clean up in a registerOnMemberRemoved callback, which will be invoked when the current member status is changed to 'Removed' or the cluster have been shutdown.

For example, this is how to shut down the ActorSystem and thereafter exit the JVM:

  1. Cluster(system).registerOnMemberRemoved {
  2. // exit JVM when ActorSystem has been terminated
  3. system.registerOnTermination(System.exit(0))
  4. // shut down ActorSystem
  5. system.terminate()
  6.  
  7. // In case ActorSystem shutdown takes longer than 10 seconds,
  8. // exit the JVM forcefully anyway.
  9. // We must spawn a separate thread to not block current thread,
  10. // since that would have blocked the shutdown of the ActorSystem.
  11. new Thread {
  12. override def run(): Unit = {
  13. if (Try(Await.ready(system.whenTerminated, 10.seconds)).isFailure)
  14. System.exit(-1)
  15. }
  16. }.start()
  17. }

Note

Register a OnMemberRemoved callback on a cluster that have been shutdown, the callback will be invoked immediately on the caller thread, otherwise it will be invoked later when the current member status changed to 'Removed'. You may want to install some cleanup handling after the cluster was started up, but the cluster might already be shutting down when you installing, and depending on the race is not healthy.

Cluster Singleton

For some use cases it is convenient and sometimes also mandatory to ensure that you have exactly one actor of a certain type running somewhere in the cluster.

This can be implemented by subscribing to member events, but there are several corner cases to consider. Therefore, this specific use case is made easily accessible by the Cluster Singleton.

Cluster Sharding

Distributes actors across several nodes in the cluster and supports interaction with the actors using their logical identifier, but without having to care about their physical location in the cluster.

See Cluster Sharding

Distributed Publish Subscribe

Publish-subscribe messaging between actors in the cluster, and point-to-point messaging using the logical path of the actors, i.e. the sender does not have to know on which node the destination actor is running.

See Distributed Publish Subscribe in Cluster.

Cluster Client

Communication from an actor system that is not part of the cluster to actors running somewhere in the cluster. The client does not have to know on which node the destination actor is running.

See Cluster Client.

Distributed Data

Akka Distributed Data is useful when you need to share data between nodes in an Akka Cluster. The data is accessed with an actor providing a key-value store like API.

See Distributed Data.

Failure Detector

In a cluster each node is monitored by a few (default maximum 5) other nodes, and when any of these detects the node as unreachable that information will spread to the rest of the cluster through the gossip. In other words, only one node needs to mark a node unreachable to have the rest of the cluster mark that node unreachable.

The failure detector will also detect if the node becomes reachable again. When all nodes that monitored theunreachable node detects it as reachable again the cluster, after gossip dissemination, will consider it asreachable.

If system messages cannot be delivered to a node it will be quarantined and then it cannot come back fromunreachable. This can happen if the there are too many unacknowledged system messages (e.g. watch, Terminated, remote actor deployment, failures of actors supervised by remote parent). Then the node needs to be moved to thedown or removed states and the actor system of the quarantined node must be restarted before it can join the cluster again.

The nodes in the cluster monitor each other by sending heartbeats to detect if a node is unreachable from the rest of the cluster. The heartbeat arrival times is interpreted by an implementation of The Phi Accrual Failure Detector.

The suspicion level of failure is given by a value called phi. The basic idea of the phi failure detector is to express the value of phi on a scale that is dynamically adjusted to reflect current network conditions.

The value of phi is calculated as:

  1. phi = -log10(1 - F(timeSinceLastHeartbeat))

where F is the cumulative distribution function of a normal distribution with mean and standard deviation estimated from historical heartbeat inter-arrival times.

In the Configuration you can adjust the akka.cluster.failure-detector.threshold to define when a phi value is considered to be a failure.

A low threshold is prone to generate many false positives but ensures a quick detection in the event of a real crash. Conversely, a high threshold generates fewer mistakes but needs more time to detect actual crashes. The defaultthreshold is 8 and is appropriate for most situations. However in cloud environments, such as Amazon EC2, the value could be increased to 12 in order to account for network issues that sometimes occur on such platforms.

The following chart illustrates how phi increase with increasing time since the previous heartbeat.

../_images/phi11.png

Phi is calculated from the mean and standard deviation of historical inter arrival times. The previous chart is an example for standard deviation of 200 ms. If the heartbeats arrive with less deviation the curve becomes steeper, i.e. it is possible to determine failure more quickly. The curve looks like this for a standard deviation of 100 ms.

../_images/phi21.png

To be able to survive sudden abnormalities, such as garbage collection pauses and transient network failures the failure detector is configured with a margin, akka.cluster.failure-detector.acceptable-heartbeat-pause. You may want to adjust the Configuration of this depending on you environment. This is how the curve looks like foracceptable-heartbeat-pause configured to 3 seconds.

../_images/phi31.png

Death watch uses the cluster failure detector for nodes in the cluster, i.e. it detects network failures and JVM crashes, in addition to graceful termination of watched actor. Death watch generates the Terminated message to the watching actor when the unreachable cluster node has been downed and removed.

If you encounter suspicious false positives when the system is under load you should define a separate dispatcher for the cluster actors as described in Cluster Dispatcher.

Cluster Aware Routers

All routers can be made aware of member nodes in the cluster, i.e. deploying new routees or looking up routees on nodes in the cluster. When a node becomes unreachable or leaves the cluster the routees of that node are automatically unregistered from the router. When new nodes join the cluster, additional routees are added to the router, according to the configuration. Routees are also added when a node becomes reachable again, after having been unreachable.

Cluster aware routers make use of members with status WeaklyUp if that feature is enabled.

There are two distinct types of routers.

Router with Group of Routees

When using a Group you must start the routee actors on the cluster member nodes. That is not done by the router. The configuration for a group looks like this:

  1. akka.actor.deployment {
  2. /statsService/workerRouter {
  3. router = consistent-hashing-group
  4. routees.paths = ["/user/statsWorker"]
  5. cluster {
  6. enabled = on
  7. allow-local-routees = on
  8. use-role = compute
  9. }
  10. }
  11. }

Note

The routee actors should be started as early as possible when starting the actor system, because the router will try to use them as soon as the member status is changed to 'Up'.

The actor paths without address information that are defined in routees.paths are used for selecting the actors to which the messages will be forwarded to by the router. Messages will be forwarded to the routees using ActorSelection, so the same delivery semantics should be expected. It is possible to limit the lookup of routees to member nodes tagged with a certain role by specifying use-role.

max-total-nr-of-instances defines total number of routees in the cluster. By default max-total-nr-of-instances is set to a high value (10000) that will result in new routees added to the router when nodes join the cluster. Set it to a lower value if you want to limit total number of routees.

The same type of router could also have been defined in code:

  1. import akka.cluster.routing.ClusterRouterGroup
  2. import akka.cluster.routing.ClusterRouterGroupSettings
  3. import akka.routing.ConsistentHashingGroup
  4.  
  5. val workerRouter = context.actorOf(
  6. ClusterRouterGroup(ConsistentHashingGroup(Nil), ClusterRouterGroupSettings(
  7. totalInstances = 100, routeesPaths = List("/user/statsWorker"),
  8. allowLocalRoutees = true, useRole = Some("compute"))).props(),
  9. name = "workerRouter2")

See Configuration section for further descriptions of the settings.

Router Example with Group of Routees

Let's take a look at how to use a cluster aware router with a group of routees, i.e. router sending to the paths of the routees.

The example application provides a service to calculate statistics for a text. When some text is sent to the service it splits it into words, and delegates the task to count number of characters in each word to a separate worker, a routee of a router. The character count for each word is sent back to an aggregator that calculates the average number of characters per word when all results have been collected.

Messages:

  1. final case class StatsJob(text: String)
  2. final case class StatsResult(meanWordLength: Double)
  3. final case class JobFailed(reason: String)

The worker that counts number of characters in each word:

  1. class StatsWorker extends Actor {
  2. var cache = Map.empty[String, Int]
  3. def receive = {
  4. case word: String =>
  5. val length = cache.get(word) match {
  6. case Some(x) => x
  7. case None =>
  8. val x = word.length
  9. cache += (word -> x)
  10. x
  11. }
  12.  
  13. sender() ! length
  14. }
  15. }

The service that receives text from users and splits it up into words, delegates to workers and aggregates:

  1. class StatsService extends Actor {
  2. // This router is used both with lookup and deploy of routees. If you
  3. // have a router with only lookup of routees you can use Props.empty
  4. // instead of Props[StatsWorker.class].
  5. val workerRouter = context.actorOf(FromConfig.props(Props[StatsWorker]),
  6. name = "workerRouter")
  7.  
  8. def receive = {
  9. case StatsJob(text) if text != "" =>
  10. val words = text.split(" ")
  11. val replyTo = sender() // important to not close over sender()
  12. // create actor that collects replies from workers
  13. val aggregator = context.actorOf(Props(
  14. classOf[StatsAggregator], words.size, replyTo))
  15. words foreach { word =>
  16. workerRouter.tell(
  17. ConsistentHashableEnvelope(word, word), aggregator)
  18. }
  19. }
  20. }
  21.  
  22. class StatsAggregator(expectedResults: Int, replyTo: ActorRef) extends Actor {
  23. var results = IndexedSeq.empty[Int]
  24. context.setReceiveTimeout(3.seconds)
  25.  
  26. def receive = {
  27. case wordCount: Int =>
  28. results = results :+ wordCount
  29. if (results.size == expectedResults) {
  30. val meanWordLength = results.sum.toDouble / results.size
  31. replyTo ! StatsResult(meanWordLength)
  32. context.stop(self)
  33. }
  34. case ReceiveTimeout =>
  35. replyTo ! JobFailed("Service unavailable, try again later")
  36. context.stop(self)
  37. }
  38. }

Note, nothing cluster specific so far, just plain actors.

All nodes start StatsService and StatsWorker actors. Remember, routees are the workers in this case. The router is configured with routees.paths:

  1. akka.actor.deployment {
  2. /statsService/workerRouter {
  3. router = consistent-hashing-group
  4. routees.paths = ["/user/statsWorker"]
  5. cluster {
  6. enabled = on
  7. allow-local-routees = on
  8. use-role = compute
  9. }
  10. }
  11. }

This means that user requests can be sent to StatsService on any node and it will use StatsWorker on all nodes.

The Lightbend Activator tutorial named Akka Cluster Samples with Scala. contains the full source code and instructions of how to run the Router Example with Group of Routees.

Router with Pool of Remote Deployed Routees

When using a Pool with routees created and deployed on the cluster member nodes the configuration for a router looks like this:

  1. akka.actor.deployment {
  2. /statsService/singleton/workerRouter {
  3. router = consistent-hashing-pool
  4. cluster {
  5. enabled = on
  6. max-nr-of-instances-per-node = 3
  7. allow-local-routees = on
  8. use-role = compute
  9. }
  10. }
  11. }

It is possible to limit the deployment of routees to member nodes tagged with a certain role by specifying use-role.

max-total-nr-of-instances defines total number of routees in the cluster, but the number of routees per node,max-nr-of-instances-per-node, will not be exceeded. By default max-total-nr-of-instances is set to a high value (10000) that will result in new routees added to the router when nodes join the cluster. Set it to a lower value if you want to limit total number of routees.

The same type of router could also have been defined in code:

  1. import akka.cluster.routing.ClusterRouterPool
  2. import akka.cluster.routing.ClusterRouterPoolSettings
  3. import akka.routing.ConsistentHashingPool
  4.  
  5. val workerRouter = context.actorOf(
  6. ClusterRouterPool(ConsistentHashingPool(0), ClusterRouterPoolSettings(
  7. totalInstances = 100, maxInstancesPerNode = 3,
  8. allowLocalRoutees = false, useRole = None)).props(Props[StatsWorker]),
  9. name = "workerRouter3")

See Configuration section for further descriptions of the settings.

Router Example with Pool of Remote Deployed Routees

Let's take a look at how to use a cluster aware router on single master node that creates and deploys workers. To keep track of a single master we use the Cluster Singleton in the contrib module. The ClusterSingletonManager is started on each node.

  1. system.actorOf(ClusterSingletonManager.props(
  2. singletonProps = Props[StatsService],
  3. terminationMessage = PoisonPill,
  4. settings = ClusterSingletonManagerSettings(system).withRole("compute")),
  5. name = "statsService")

We also need an actor on each node that keeps track of where current single master exists and delegates jobs to theStatsService. That is provided by the ClusterSingletonProxy.

  1. system.actorOf(ClusterSingletonProxy.props(singletonManagerPath = "/user/statsService",
  2. settings = ClusterSingletonProxySettings(system).withRole("compute")),
  3. name = "statsServiceProxy")

The ClusterSingletonProxy receives text from users and delegates to the current StatsService, the single master. It listens to cluster events to lookup the StatsService on the oldest node.

All nodes start ClusterSingletonProxy and the ClusterSingletonManager. The router is now configured like this:

  1. akka.actor.deployment {
  2. /statsService/singleton/workerRouter {
  3. router = consistent-hashing-pool
  4. cluster {
  5. enabled = on
  6. max-nr-of-instances-per-node = 3
  7. allow-local-routees = on
  8. use-role = compute
  9. }
  10. }
  11. }

The Lightbend Activator tutorial named Akka Cluster Samples with Scala. contains the full source code and instructions of how to run the Router Example with Pool of Remote Deployed Routees.

Cluster Metrics

The member nodes of the cluster can collect system health metrics and publish that to other cluster nodes and to the registered subscribers on the system event bus with the help of Cluster Metrics Extension.

How to Test

Multi Node Testing is useful for testing cluster applications.

Set up your project according to the instructions in Multi Node Testing and Multi JVM Testing, i.e. add the sbt-multi-jvm plugin and the dependency to akka-multi-node-testkit.

First, as described in Multi Node Testing, we need some scaffolding to configure the MultiNodeSpec. Define the participating roles and their Configuration in an object extending MultiNodeConfig:

  1. import akka.remote.testkit.MultiNodeConfig
  2. import com.typesafe.config.ConfigFactory
  3.  
  4. object StatsSampleSpecConfig extends MultiNodeConfig {
  5. // register the named roles (nodes) of the test
  6. val first = role("first")
  7. val second = role("second")
  8. val third = role("thrid")
  9.  
  10. def nodeList = Seq(first, second, third)
  11.  
  12. // Extract individual sigar library for every node.
  13. nodeList foreach { role =>
  14. nodeConfig(role) {
  15. ConfigFactory.parseString(s"""
  16. # Disable legacy metrics in akka-cluster.
  17. akka.cluster.metrics.enabled=off
  18. # Enable metrics extension in akka-cluster-metrics.
  19. akka.extensions=["akka.cluster.metrics.ClusterMetricsExtension"]
  20. # Sigar native library extract location during tests.
  21. akka.cluster.metrics.native-library-extract-folder=target/native/${role.name}
  22. """)
  23. }
  24. }
  25.  
  26. // this configuration will be used for all nodes
  27. // note that no fixed host names and ports are used
  28. commonConfig(ConfigFactory.parseString("""
  29. akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
  30. akka.remote.log-remote-lifecycle-events = off
  31. akka.cluster.roles = [compute]
  32. // router lookup config ...
  33. """))
  34.  
  35. }

Define one concrete test class for each role/node. These will be instantiated on the different nodes (JVMs). They can be implemented differently, but often they are the same and extend an abstract test class, as illustrated here.

  1. // need one concrete test class per node
  2. class StatsSampleSpecMultiJvmNode1 extends StatsSampleSpec
  3. class StatsSampleSpecMultiJvmNode2 extends StatsSampleSpec
  4. class StatsSampleSpecMultiJvmNode3 extends StatsSampleSpec

Note the naming convention of these classes. The name of the classes must end with MultiJvmNode1,MultiJvmNode2 and so on. It is possible to define another suffix to be used by the sbt-multi-jvm, but the default should be fine in most cases.

Then the abstract MultiNodeSpec, which takes the MultiNodeConfig as constructor parameter.

  1. import org.scalatest.BeforeAndAfterAll
  2. import org.scalatest.WordSpecLike
  3. import org.scalatest.Matchers
  4. import akka.remote.testkit.MultiNodeSpec
  5. import akka.testkit.ImplicitSender
  6.  
  7. abstract class StatsSampleSpec extends MultiNodeSpec(StatsSampleSpecConfig)
  8. with WordSpecLike with Matchers with BeforeAndAfterAll
  9. with ImplicitSender {
  10.  
  11. import StatsSampleSpecConfig._
  12.  
  13. override def initialParticipants = roles.size
  14.  
  15. override def beforeAll() = multiNodeSpecBeforeAll()
  16.  
  17. override def afterAll() = multiNodeSpecAfterAll()

Most of this can of course be extracted to a separate trait to avoid repeating this in all your tests.

Typically you begin your test by starting up the cluster and let the members join, and create some actors. That can be done like this:

  1. "illustrate how to startup cluster" in within(15 seconds) {
  2. Cluster(system).subscribe(testActor, classOf[MemberUp])
  3. expectMsgClass(classOf[CurrentClusterState])
  4.  
  5. val firstAddress = node(first).address
  6. val secondAddress = node(second).address
  7. val thirdAddress = node(third).address
  8.  
  9. Cluster(system) join firstAddress
  10.  
  11. system.actorOf(Props[StatsWorker], "statsWorker")
  12. system.actorOf(Props[StatsService], "statsService")
  13.  
  14. receiveN(3).collect { case MemberUp(m) => m.address }.toSet should be(
  15. Set(firstAddress, secondAddress, thirdAddress))
  16.  
  17. Cluster(system).unsubscribe(testActor)
  18.  
  19. testConductor.enter("all-up")
  20. }

From the test you interact with the cluster using the Cluster extension, e.g. join.

  1. Cluster(system) join firstAddress

Notice how the testActor from testkit is added as subscriber to cluster changes and then waiting for certain events, such as in this case all members becoming 'Up'.

The above code was running for all roles (JVMs). runOn is a convenient utility to declare that a certain block of code should only run for a specific role.

  1. "show usage of the statsService from one node" in within(15 seconds) {
  2. runOn(second) {
  3. assertServiceOk()
  4. }
  5.  
  6. testConductor.enter("done-2")
  7. }
  8.  
  9. def assertServiceOk(): Unit = {
  10. val service = system.actorSelection(node(third) / "user" / "statsService")
  11. // eventually the service should be ok,
  12. // first attempts might fail because worker actors not started yet
  13. awaitAssert {
  14. service ! StatsJob("this is the text that will be analyzed")
  15. expectMsgType[StatsResult](1.second).meanWordLength should be(
  16. 3.875 +- 0.001)
  17. }
  18.  
  19. }

Once again we take advantage of the facilities in testkit to verify expected behavior. Here using testActor as sender (via ImplicitSender) and verifying the reply with expectMsgPF.

In the above code you can see node(third), which is useful facility to get the root actor reference of the actor system for a specific role. This can also be used to grab the akka.actor.Address of that node.

  1. val firstAddress = node(first).address
  2. val secondAddress = node(second).address
  3. val thirdAddress = node(third).address

JMX

Information and management of the cluster is available as JMX MBeans with the root name akka.Cluster. The JMX information can be displayed with an ordinary JMX console such as JConsole or JVisualVM.

From JMX you can:

Member nodes are identified by their address, in format akka.<protocol>://<actor-system-name>@<hostname>:<port>.

Command Line Management

The cluster can be managed with the script bin/akka-cluster provided in the Akka distribution.

Run it without parameters to see instructions about how to use the script:

  1. Usage: bin/akka-cluster <node-hostname> <jmx-port> <command> ...
  2.  
  3. Supported commands are:
  4. join <node-url> - Sends request a JOIN node with the specified URL
  5. leave <node-url> - Sends a request for node with URL to LEAVE the cluster
  6. down <node-url> - Sends a request for marking node with URL as DOWN
  7. member-status - Asks the member node for its current status
  8. members - Asks the cluster for addresses of current members
  9. unreachable - Asks the cluster for addresses of unreachable members
  10. cluster-status - Asks the cluster for its current status (member ring,
  11. unavailable nodes, meta data etc.)
  12. leader - Asks the cluster who the current leader is
  13. is-singleton - Checks if the cluster is a singleton cluster (single
  14. node cluster)
  15. is-available - Checks if the member node is available
  16. Where the <node-url> should be on the format of
  17. 'akka.<protocol>://<actor-system-name>@<hostname>:<port>'
  18.  
  19. Examples: bin/akka-cluster localhost 9999 is-available
  20. bin/akka-cluster localhost 9999 join akka.tcp://MySystem@darkstar:2552
  21. bin/akka-cluster localhost 9999 cluster-status

To be able to use the script you must enable remote monitoring and management when starting the JVMs of the cluster nodes, as described in Monitoring and Management Using JMX Technology

Example of system properties to enable remote monitoring and management:

  1. java -Dcom.sun.management.jmxremote.port=9999 \
  2. -Dcom.sun.management.jmxremote.authenticate=false \
  3. -Dcom.sun.management.jmxremote.ssl=false

Configuration

There are several configuration properties for the cluster. We refer to the reference configuration for more information.

Cluster Info Logging

You can silence the logging of cluster events at info level with configuration property:

  1. akka.cluster.log-info = off

Cluster Dispatcher

Under the hood the cluster extension is implemented with actors and it can be necessary to create a bulkhead for those actors to avoid disturbance from other actors. Especially the heartbeating actors that is used for failure detection can generate false positives if they are not given a chance to run at regular intervals. For this purpose you can define a separate dispatcher to be used for the cluster actors:

  1. akka.cluster.use-dispatcher = cluster-dispatcher
  2.  
  3. cluster-dispatcher {
  4. type = "Dispatcher"
  5. executor = "fork-join-executor"
  6. fork-join-executor {
  7. parallelism-min = 2
  8. parallelism-max = 4
  9. }
  10. }

Note

Normally it should not be necessary to configure a separate dispatcher for the Cluster. The default-dispatcher should be sufficient for performing the Cluster tasks, i.e. akka.cluster.use-dispatcher should not be changed. If you have Cluster related problems when using the default-dispatcher that is typically an indication that you are running blocking or CPU intensive actors/tasks on the default-dispatcher. Use dedicated dispatchers for such actors/tasks instead of running them on the default-dispatcher, because that may starve system internal tasks. Related config properties: akka.cluster.use-dispatcher = akka.cluster.cluster-dispatcher. Corresponding default values: akka.cluster.use-dispatcher =.

Cluster Singleton

For some use cases it is convenient and sometimes also mandatory to ensure that you have exactly one actor of a certain type running somewhere in the cluster.

Some examples:

Using a singleton should not be the first design choice. It has several drawbacks, such as single-point of bottleneck. Single-point of failure is also a relevant concern, but for some cases this feature takes care of that by making sure that another singleton instance will eventually be started.

The cluster singleton pattern is implemented by akka.cluster.singleton.ClusterSingletonManager. It manages one singleton actor instance among all cluster nodes or a group of nodes tagged with a specific role.ClusterSingletonManager is an actor that is supposed to be started on all nodes, or all nodes with specified role, in the cluster. The actual singleton actor is started by the ClusterSingletonManager on the oldest node by creating a child actor from supplied Props. ClusterSingletonManager makes sure that at most one singleton instance is running at any point in time.

The singleton actor is always running on the oldest member with specified role. The oldest member is determined byakka.cluster.Member#isOlderThan. This can change when removing that member from the cluster. Be aware that there is a short time period when there is no active singleton during the hand-over process.

The cluster failure detector will notice when oldest node becomes unreachable due to things like JVM crash, hard shut down, or network failure. Then a new oldest node will take over and a new singleton actor is created. For these failure scenarios there will not be a graceful hand-over, but more than one active singletons is prevented by all reasonable means. Some corner cases are eventually resolved by configurable timeouts.

You can access the singleton actor by using the provided akka.cluster.singleton.ClusterSingletonProxy, which will route all messages to the current instance of the singleton. The proxy will keep track of the oldest node in the cluster and resolve the singleton's ActorRef by explicitly sending the singleton's actorSelection theakka.actor.Identify message and waiting for it to reply. This is performed periodically if the singleton doesn't reply within a certain (configurable) time. Given the implementation, there might be periods of time during which theActorRef is unavailable, e.g., when a node leaves the cluster. In these cases, the proxy will buffer the messages sent to the singleton and then deliver them when the singleton is finally available. If the buffer is full theClusterSingletonProxy will drop old messages when new messages are sent via the proxy. The size of the buffer is configurable and it can be disabled by using a buffer size of 0.

It's worth noting that messages can always be lost because of the distributed nature of these actors. As always, additional logic should be implemented in the singleton (acknowledgement) and in the client (retry) actors to ensure at-least-once message delivery.

The singleton instance will not run on members with status WeaklyUp if that feature is enabled.

Potential problems to be aware of

This pattern may seem to be very tempting to use at first, but it has several drawbacks, some of them are listed below:

Especially the last point is something you should be aware of — in general when using the Cluster Singleton pattern you should take care of downing nodes yourself and not rely on the timing based auto-down feature.

Warning

Don't use Cluster Singleton together with Automatic Downing, since it allows the cluster to split up into two separate clusters, which in turn will result in multiple Singletons being started, one in each separate cluster!

An Example

Assume that we need one single entry point to an external system. An actor that receives messages from a JMS queue with the strict requirement that only one JMS consumer must exist to be make sure that the messages are processed in order. That is perhaps not how one would like to design things, but a typical real-world scenario when integrating with external systems.

On each node in the cluster you need to start the ClusterSingletonManager and supply the Props of the singleton actor, in this case the JMS queue consumer.

  1. system.actorOf(
  2. ClusterSingletonManager.props(
  3. singletonProps = Props(classOf[Consumer], queue, testActor),
  4. terminationMessage = End,
  5. settings = ClusterSingletonManagerSettings(system).withRole("worker")),
  6. name = "consumer")

Here we limit the singleton to nodes tagged with the "worker" role, but all nodes, independent of role, can be used by not specifying withRole.

Here we use an application specific terminationMessage to be able to close the resources before actually stopping the singleton actor. Note that PoisonPill is a perfectly fine terminationMessage if you only need to stop the actor.

Here is how the singleton actor handles the terminationMessage in this example.

  1. case End
  2. queue ! UnregisterConsumer
  3. case UnregistrationOk
  4. stoppedBeforeUnregistration = false
  5. context stop self
  6. case Ping
  7. sender() ! Pong

With the names given above, access to the singleton can be obtained from any cluster node using a properly configured proxy.

  1. system.actorOf(
  2. ClusterSingletonProxy.props(
  3. singletonManagerPath = "/user/consumer",
  4. settings = ClusterSingletonProxySettings(system).withRole("worker")),
  5. name = "consumerProxy")

A more comprehensive sample is available in the Lightbend Activator tutorial named Distributed workers with Akka and Scala!.

Dependencies

To use the Cluster Singleton you must add the following dependency in your project.

sbt:

  1. "com.typesafe.akka" %% "akka-cluster-tools" % "2.4.10"

maven:

  1. <dependency>
  2. <groupId>com.typesafe.akka</groupId>
  3. <artifactId>akka-cluster-tools_2.11</artifactId>
  4. <version>2.4.10</version>
  5. </dependency>

Configuration

The following configuration properties are read by the ClusterSingletonManagerSettings when created with aActorSystem parameter. It is also possible to amend the ClusterSingletonManagerSettings or create it from another config section with the same layout as below. ClusterSingletonManagerSettings is a parameter to theClusterSingletonManager.props factory method, i.e. each singleton can be configured with different settings if needed.

  1. akka.cluster.singleton {
  2. # The actor name of the child singleton actor.
  3. singleton-name = "singleton"
  4. # Singleton among the nodes tagged with specified role.
  5. # If the role is not specified it's a singleton among all nodes in the cluster.
  6. role = ""
  7. # When a node is becoming oldest it sends hand-over request to previous oldest,
  8. # that might be leaving the cluster. This is retried with this interval until
  9. # the previous oldest confirms that the hand over has started or the previous
  10. # oldest member is removed from the cluster (+ akka.cluster.down-removal-margin).
  11. hand-over-retry-interval = 1s
  12. # The number of retries are derived from hand-over-retry-interval and
  13. # akka.cluster.down-removal-margin (or ClusterSingletonManagerSettings.removalMargin),
  14. # but it will never be less than this property.
  15. min-number-of-hand-over-retries = 10
  16. }

The following configuration properties are read by the ClusterSingletonProxySettings when created with aActorSystem parameter. It is also possible to amend the ClusterSingletonProxySettings or create it from another config section with the same layout as below. ClusterSingletonProxySettings is a parameter to theClusterSingletonProxy.props factory method, i.e. each singleton proxy can be configured with different settings if needed.

  1. akka.cluster.singleton-proxy {
  2. # The actor name of the singleton actor that is started by the ClusterSingletonManager
  3. singleton-name = ${akka.cluster.singleton.singleton-name}
  4. # The role of the cluster nodes where the singleton can be deployed.
  5. # If the role is not specified then any node will do.
  6. role = ""
  7. # Interval at which the proxy will try to resolve the singleton instance.
  8. singleton-identification-interval = 1s
  9. # If the location of the singleton is unknown the proxy will buffer this
  10. # number of messages and deliver them when the singleton is identified.
  11. # When the buffer is full old messages will be dropped when new messages are
  12. # sent via the proxy.
  13. # Use 0 to disable buffering, i.e. messages will be dropped immediately if
  14. # the location of the singleton is unknown.
  15. # Maximum allowed buffer size is 10000.
  16. buffer-size = 1000
  17. }

Distributed Publish Subscribe in Cluster

How do I send a message to an actor without knowing which node it is running on?

How do I send messages to all actors in the cluster that have registered interest in a named topic?

This pattern provides a mediator actor, akka.cluster.pubsub.DistributedPubSubMediator, that manages a registry of actor references and replicates the entries to peer actors among all cluster nodes or a group of nodes tagged with a specific role.

The DistributedPubSubMediator actor is supposed to be started on all nodes, or all nodes with specified role, in the cluster. The mediator can be started with the DistributedPubSub extension or as an ordinary actor.

The registry is eventually consistent, i.e. changes are not immediately visible at other nodes, but typically they will be fully replicated to all other nodes after a few seconds. Changes are only performed in the own part of the registry and those changes are versioned. Deltas are disseminated in a scalable way to other nodes with a gossip protocol.

Cluster members with status WeaklyUp, if that feature is enabled, will participate in Distributed Publish Subscribe, i.e. subscribers on nodes with WeaklyUp status will receive published messages if the publisher and subscriber are on same side of a network partition.

You can send messages via the mediator on any node to registered actors on any other node.

There a two different modes of message delivery, explained in the sections Publish and Send below.

A more comprehensive sample is available in the Lightbend Activator tutorial named Akka Clustered PubSub with Scala!.

Publish

This is the true pub/sub mode. A typical usage of this mode is a chat room in an instant messaging application.

Actors are registered to a named topic. This enables many subscribers on each node. The message will be delivered to all subscribers of the topic.

For efficiency the message is sent over the wire only once per node (that has a matching topic), and then delivered to all subscribers of the local topic representation.

You register actors to the local mediator with DistributedPubSubMediator.Subscribe. Successful Subscribe andUnsubscribe is acknowledged with DistributedPubSubMediator.SubscribeAck andDistributedPubSubMediator.UnsubscribeAck replies. The acknowledgment means that the subscription is registered, but it can still take some time until it is replicated to other nodes.

You publish messages by sending DistributedPubSubMediator.Publish message to the local mediator.

Actors are automatically removed from the registry when they are terminated, or you can explicitly remove entries withDistributedPubSubMediator.Unsubscribe.

An example of a subscriber actor:

  1. class Subscriber extends Actor with ActorLogging {
  2. import DistributedPubSubMediator.{ Subscribe, SubscribeAck }
  3. val mediator = DistributedPubSub(context.system).mediator
  4. // subscribe to the topic named "content"
  5. mediator ! Subscribe("content", self)
  6.  
  7. def receive = {
  8. case s: String
  9. log.info("Got {}", s)
  10. case SubscribeAck(Subscribe("content", None, `self`))
  11. log.info("subscribing");
  12. }
  13. }

Subscriber actors can be started on several nodes in the cluster, and all will receive messages published to the "content" topic.

  1. runOn(first) {
  2. system.actorOf(Props[Subscriber], "subscriber1")
  3. }
  4. runOn(second) {
  5. system.actorOf(Props[Subscriber], "subscriber2")
  6. system.actorOf(Props[Subscriber], "subscriber3")
  7. }

A simple actor that publishes to this "content" topic:

  1. class Publisher extends Actor {
  2. import DistributedPubSubMediator.Publish
  3. // activate the extension
  4. val mediator = DistributedPubSub(context.system).mediator
  5.  
  6. def receive = {
  7. case in: String
  8. val out = in.toUpperCase
  9. mediator ! Publish("content", out)
  10. }
  11. }

It can publish messages to the topic from anywhere in the cluster:

  1. runOn(third) {
  2. val publisher = system.actorOf(Props[Publisher], "publisher")
  3. later()
  4. // after a while the subscriptions are replicated
  5. publisher ! "hello"
  6. }

Topic Groups

Actors may also be subscribed to a named topic with a group id. If subscribing with a group id, each message published to a topic with the sendOneMessageToEachGroup flag set to true is delivered via the suppliedRoutingLogic (default random) to one actor within each subscribing group.

If all the subscribed actors have the same group id, then this works just like Send and each message is only delivered to one subscriber.

If all the subscribed actors have different group names, then this works like normal Publish and each message is broadcasted to all subscribers.

Note

Note that if the group id is used it is part of the topic identifier. Messages published withsendOneMessageToEachGroup=false will not be delivered to subscribers that subscribed with a group id. Messages published with sendOneMessageToEachGroup=true will not be delivered to subscribers that subscribed without a group id.

Send

This is a point-to-point mode where each message is delivered to one destination, but you still does not have to know where the destination is located. A typical usage of this mode is private chat to one other user in an instant messaging application. It can also be used for distributing tasks to registered workers, like a cluster aware router where the routees dynamically can register themselves.

The message will be delivered to one recipient with a matching path, if any such exists in the registry. If several entries match the path because it has been registered on several nodes the message will be sent via the suppliedRoutingLogic (default random) to one destination. The sender() of the message can specify that local affinity is preferred, i.e. the message is sent to an actor in the same local actor system as the used mediator actor, if any such exists, otherwise route to any other matching entry.

You register actors to the local mediator with DistributedPubSubMediator.Put. The ActorRef in Put must belong to the same local actor system as the mediator. The path without address information is the key to which you send messages. On each node there can only be one actor for a given path, since the path is unique within one local actor system.

You send messages by sending DistributedPubSubMediator.Send message to the local mediator with the path (without address information) of the destination actors.

Actors are automatically removed from the registry when they are terminated, or you can explicitly remove entries withDistributedPubSubMediator.Remove.

An example of a destination actor:

  1. class Destination extends Actor with ActorLogging {
  2. import DistributedPubSubMediator.Put
  3. val mediator = DistributedPubSub(context.system).mediator
  4. // register to the path
  5. mediator ! Put(self)
  6.  
  7. def receive = {
  8. case s: String
  9. log.info("Got {}", s)
  10. }
  11. }

Destination actors can be started on several nodes in the cluster, and all will receive messages sent to the path (without address information).

  1. runOn(first) {
  2. system.actorOf(Props[Destination], "destination")
  3. }
  4. runOn(second) {
  5. system.actorOf(Props[Destination], "destination")
  6. }

A simple actor that sends to the path:

  1. class Sender extends Actor {
  2. import DistributedPubSubMediator.Send
  3. // activate the extension
  4. val mediator = DistributedPubSub(context.system).mediator
  5.  
  6. def receive = {
  7. case in: String
  8. val out = in.toUpperCase
  9. mediator ! Send(path = "/user/destination", msg = out, localAffinity = true)
  10. }
  11. }

It can send messages to the path from anywhere in the cluster:

  1. runOn(third) {
  2. val sender = system.actorOf(Props[Sender], "sender")
  3. later()
  4. // after a while the destinations are replicated
  5. sender ! "hello"
  6. }

It is also possible to broadcast messages to the actors that have been registered with Put. SendDistributedPubSubMediator.SendToAll message to the local mediator and the wrapped message will then be delivered to all recipients with a matching path. Actors with the same path, without address information, can be registered on different nodes. On each node there can only be one such actor, since the path is unique within one local actor system.

Typical usage of this mode is to broadcast messages to all replicas with the same path, e.g. 3 actors on different nodes that all perform the same actions, for redundancy. You can also optionally specify a property (allButSelf) deciding if the message should be sent to a matching path on the self node or not.

DistributedPubSub Extension

In the example above the mediator is started and accessed with the akka.cluster.pubsub.DistributedPubSubextension. That is convenient and perfectly fine in most cases, but it can be good to know that it is possible to start the mediator actor as an ordinary actor and you can have several different mediators at the same time to be able to divide a large number of actors/topics to different mediators. For example you might want to use different cluster roles for different mediators.

The DistributedPubSub extension can be configured with the following properties:

  1. # Settings for the DistributedPubSub extension
  2. akka.cluster.pub-sub {
  3. # Actor name of the mediator actor, /system/distributedPubSubMediator
  4. name = distributedPubSubMediator
  5.  
  6. # Start the mediator on members tagged with this role.
  7. # All members are used if undefined or empty.
  8. role = ""
  9.  
  10. # The routing logic to use for 'Send'
  11. # Possible values: random, round-robin, broadcast
  12. routing-logic = random
  13.  
  14. # How often the DistributedPubSubMediator should send out gossip information
  15. gossip-interval = 1s
  16.  
  17. # Removed entries are pruned after this duration
  18. removed-time-to-live = 120s
  19.  
  20. # Maximum number of elements to transfer in one message when synchronizing the registries.
  21. # Next chunk will be transferred in next round of gossip.
  22. max-delta-elements = 3000
  23. # The id of the dispatcher to use for DistributedPubSubMediator actors.
  24. # If not specified default dispatcher is used.
  25. # If specified you need to define the settings of the actual dispatcher.
  26. use-dispatcher = ""
  27. }

It is recommended to load the extension when the actor system is started by defining it in akka.extensionsconfiguration property. Otherwise it will be activated when first used and then it takes a while for it to be populated.

  1. akka.extensions = ["akka.cluster.pubsub.DistributedPubSub"]

Dependencies

To use Distributed Publish Subscribe you must add the following dependency in your project.

sbt:

  1. "com.typesafe.akka" %% "akka-cluster-tools" % "2.4.10"

maven:

  1. <dependency>
  2. <groupId>com.typesafe.akka</groupId>
  3. <artifactId>akka-cluster-tools_2.11</artifactId>
  4. <version>2.4.10</version>
  5. </dependency>

Cluster Client

An actor system that is not part of the cluster can communicate with actors somewhere in the cluster via thisClusterClient. The client can of course be part of another cluster. It only needs to know the location of one (or more) nodes to use as initial contact points. It will establish a connection to a ClusterReceptionist somewhere in the cluster. It will monitor the connection to the receptionist and establish a new connection if the link goes down. When looking for a new receptionist it uses fresh contact points retrieved from previous establishment, or periodically refreshed contacts, i.e. not necessarily the initial contact points.

Note

ClusterClient should not be used when sending messages to actors that run within the same cluster. Similar functionality as the ClusterClient is provided in a more efficient way by Distributed Publish Subscribe in Cluster for actors that belong to the same cluster.

Also, note it's necessary to change akka.actor.provider from akka.actor.LocalActorRefProvider toakka.remote.RemoteActorRefProvider or akka.cluster.ClusterActorRefProvider when using the cluster client.

The receptionist is supposed to be started on all nodes, or all nodes with specified role, in the cluster. The receptionist can be started with the ClusterClientReceptionist extension or as an ordinary actor.

You can send messages via the ClusterClient to any actor in the cluster that is registered in theDistributedPubSubMediator used by the ClusterReceptionist. The ClusterClientReceptionist provides methods for registration of actors that should be reachable from the client. Messages are wrapped inClusterClient.Send, ClusterClient.SendToAll or ClusterClient.Publish.

Both the ClusterClient and the ClusterClientReceptionist emit events that can be subscribed to. TheClusterClient sends out notifications in relation to having received a list of contact points from theClusterClientReceptionist. One use of this list might be for the client to record its contact points. A client that is restarted could then use this information to supersede any previously configured contact points.

The ClusterClientReceptionist sends out notifications in relation to having received contact from aClusterClient. This notification enables the server containing the receptionist to become aware of what clients are connected.

1. ClusterClient.Send

The message will be delivered to one recipient with a matching path, if any such exists. If several entries match the path the message will be delivered to one random destination. The sender() of the message can specify that local affinity is preferred, i.e. the message is sent to an actor in the same local actor system as the used receptionist actor, if any such exists, otherwise random to any other matching entry.

2. ClusterClient.SendToAll

The message will be delivered to all recipients with a matching path.

3. ClusterClient.Publish

The message will be delivered to all recipients Actors that have been registered as subscribers to the named topic.

Response messages from the destination actor are tunneled via the receptionist to avoid inbound connections from other cluster nodes to the client, i.e. the sender(), as seen by the destination actor, is not the client itself. Thesender() of the response messages, as seen by the client, is deadLetters since the client should normally send subsequent messages via the ClusterClient. It is possible to pass the original sender inside the reply messages if the client is supposed to communicate directly to the actor in the cluster.

While establishing a connection to a receptionist the ClusterClient will buffer messages and send them when the connection is established. If the buffer is full the ClusterClient will drop old messages when new messages are sent via the client. The size of the buffer is configurable and it can be disabled by using a buffer size of 0.

It's worth noting that messages can always be lost because of the distributed nature of these actors. As always, additional logic should be implemented in the destination (acknowledgement) and in the client (retry) actors to ensure at-least-once message delivery.

An Example

On the cluster nodes first start the receptionist. Note, it is recommended to load the extension when the actor system is started by defining it in the akka.extensions configuration property:

  1. akka.extensions = ["akka.cluster.client.ClusterClientReceptionist"]

Next, register the actors that should be available for the client.

  1. runOn(host1) {
  2. val serviceA = system.actorOf(Props[Service], "serviceA")
  3. ClusterClientReceptionist(system).registerService(serviceA)
  4. }
  5.  
  6. runOn(host2, host3) {
  7. val serviceB = system.actorOf(Props[Service], "serviceB")
  8. ClusterClientReceptionist(system).registerService(serviceB)
  9. }

On the client you create the ClusterClient actor and use it as a gateway for sending messages to the actors identified by their path (without address information) somewhere in the cluster.

  1. runOn(client) {
  2. val c = system.actorOf(ClusterClient.props(
  3. ClusterClientSettings(system).withInitialContacts(initialContacts)), "client")
  4. c ! ClusterClient.Send("/user/serviceA", "hello", localAffinity = true)
  5. c ! ClusterClient.SendToAll("/user/serviceB", "hi")
  6. }

The initialContacts parameter is a Set[ActorPath], which can be created like this:

  1. val initialContacts = Set(
  2. ActorPath.fromString("akka.tcp://OtherSys@host1:2552/system/receptionist"),
  3. ActorPath.fromString("akka.tcp://OtherSys@host2:2552/system/receptionist"))
  4. val settings = ClusterClientSettings(system)
  5. .withInitialContacts(initialContacts)

You will probably define the address information of the initial contact points in configuration or system property. See also Configuration.

A more comprehensive sample is available in the Lightbend Activator tutorial named Distributed workers with Akka and Scala!.

ClusterClientReceptionist Extension

In the example above the receptionist is started and accessed with theakka.cluster.client.ClusterClientReceptionist extension. That is convenient and perfectly fine in most cases, but it can be good to know that it is possible to start the akka.cluster.client.ClusterReceptionist actor as an ordinary actor and you can have several different receptionists at the same time, serving different types of clients.

Note that the ClusterClientReceptionist uses the DistributedPubSub extension, which is described inDistributed Publish Subscribe in Cluster.

It is recommended to load the extension when the actor system is started by defining it in the akka.extensionsconfiguration property:

  1. akka.extensions = ["akka.cluster.client.ClusterClientReceptionist"]

Events

As mentioned earlier, both the ClusterClient and ClusterClientReceptionist emit events that can be subscribed to. The following code snippet declares an actor that will receive notifications on contact points (addresses to the available receptionists), as they become available. The code illustrates subscribing to the events and receiving theClusterClient initial state.

  1. class ClientListener(targetClient: ActorRef) extends Actor {
  2. override def preStart(): Unit =
  3. targetClient ! SubscribeContactPoints
  4.  
  5. def receive: Receive =
  6. receiveWithContactPoints(Set.empty)
  7.  
  8. def receiveWithContactPoints(contactPoints: Set[ActorPath]): Receive = {
  9. case ContactPoints(cps)
  10. context.become(receiveWithContactPoints(cps))
  11. // Now do something with the up-to-date "cps"
  12. case ContactPointAdded(cp)
  13. context.become(receiveWithContactPoints(contactPoints + cp))
  14. // Now do something with an up-to-date "contactPoints + cp"
  15. case ContactPointRemoved(cp)
  16. context.become(receiveWithContactPoints(contactPoints - cp))
  17. // Now do something with an up-to-date "contactPoints - cp"
  18. }
  19. }

Similarly we can have an actor that behaves in a similar fashion for learning what cluster clients contact aClusterClientReceptionist:

  1. class ReceptionistListener(targetReceptionist: ActorRef) extends Actor {
  2. override def preStart(): Unit =
  3. targetReceptionist ! SubscribeClusterClients
  4.  
  5. def receive: Receive =
  6. receiveWithClusterClients(Set.empty)
  7.  
  8. def receiveWithClusterClients(clusterClients: Set[ActorRef]): Receive = {
  9. case ClusterClients(cs)
  10. context.become(receiveWithClusterClients(cs))
  11. // Now do something with the up-to-date "c"
  12. case ClusterClientUp(c)
  13. context.become(receiveWithClusterClients(clusterClients + c))
  14. // Now do something with an up-to-date "clusterClients + c"
  15. case ClusterClientUnreachable(c)
  16. context.become(receiveWithClusterClients(clusterClients - c))
  17. // Now do something with an up-to-date "clusterClients - c"
  18. }
  19. }

Dependencies

To use the Cluster Client you must add the following dependency in your project.

sbt:

  1. "com.typesafe.akka" %% "akka-cluster-tools" % "2.4.10"

maven:

  1. <dependency>
  2. <groupId>com.typesafe.akka</groupId>
  3. <artifactId>akka-cluster-tools_2.11</artifactId>
  4. <version>2.4.10</version>
  5. </dependency>

Configuration

The ClusterClientReceptionist extension (or ClusterReceptionistSettings) can be configured with the following properties:

  1. # Settings for the ClusterClientReceptionist extension
  2. akka.cluster.client.receptionist {
  3. # Actor name of the ClusterReceptionist actor, /system/receptionist
  4. name = receptionist
  5.  
  6. # Start the receptionist on members tagged with this role.
  7. # All members are used if undefined or empty.
  8. role = ""
  9.  
  10. # The receptionist will send this number of contact points to the client
  11. number-of-contacts = 3
  12.  
  13. # The actor that tunnel response messages to the client will be stopped
  14. # after this time of inactivity.
  15. response-tunnel-receive-timeout = 30s
  16. # The id of the dispatcher to use for ClusterReceptionist actors.
  17. # If not specified default dispatcher is used.
  18. # If specified you need to define the settings of the actual dispatcher.
  19. use-dispatcher = ""
  20.  
  21. # How often failure detection heartbeat messages should be received for
  22. # each ClusterClient
  23. heartbeat-interval = 2s
  24.  
  25. # Number of potentially lost/delayed heartbeats that will be
  26. # accepted before considering it to be an anomaly.
  27. # The ClusterReceptionist is using the akka.remote.DeadlineFailureDetector, which
  28. # will trigger if there are no heartbeats within the duration
  29. # heartbeat-interval + acceptable-heartbeat-pause, i.e. 15 seconds with
  30. # the default settings.
  31. acceptable-heartbeat-pause = 13s
  32.  
  33. # Failure detection checking interval for checking all ClusterClients
  34. failure-detection-interval = 2s
  35. }

The following configuration properties are read by the ClusterClientSettings when created with a ActorSystemparameter. It is also possible to amend the ClusterClientSettings or create it from another config section with the same layout as below. ClusterClientSettings is a parameter to the ClusterClient.props factory method, i.e. each client can be configured with different settings if needed.

  1. # Settings for the ClusterClient
  2. akka.cluster.client {
  3. # Actor paths of the ClusterReceptionist actors on the servers (cluster nodes)
  4. # that the client will try to contact initially. It is mandatory to specify
  5. # at least one initial contact.
  6. # Comma separated full actor paths defined by a string on the form of
  7. # "akka.tcp://system@hostname:port/system/receptionist"
  8. initial-contacts = []
  9. # Interval at which the client retries to establish contact with one of
  10. # ClusterReceptionist on the servers (cluster nodes)
  11. establishing-get-contacts-interval = 3s
  12. # Interval at which the client will ask the ClusterReceptionist for
  13. # new contact points to be used for next reconnect.
  14. refresh-contacts-interval = 60s
  15. # How often failure detection heartbeat messages should be sent
  16. heartbeat-interval = 2s
  17. # Number of potentially lost/delayed heartbeats that will be
  18. # accepted before considering it to be an anomaly.
  19. # The ClusterClient is using the akka.remote.DeadlineFailureDetector, which
  20. # will trigger if there are no heartbeats within the duration
  21. # heartbeat-interval + acceptable-heartbeat-pause, i.e. 15 seconds with
  22. # the default settings.
  23. acceptable-heartbeat-pause = 13s
  24. # If connection to the receptionist is not established the client will buffer
  25. # this number of messages and deliver them the connection is established.
  26. # When the buffer is full old messages will be dropped when new messages are sent
  27. # via the client. Use 0 to disable buffering, i.e. messages will be dropped
  28. # immediately if the location of the singleton is unknown.
  29. # Maximum allowed buffer size is 10000.
  30. buffer-size = 1000
  31.  
  32. # If connection to the receiptionist is lost and the client has not been
  33. # able to acquire a new connection for this long the client will stop itself.
  34. # This duration makes it possible to watch the cluster client and react on a more permanent
  35. # loss of connection with the cluster, for example by accessing some kind of
  36. # service registry for an updated set of initial contacts to start a new cluster client with.
  37. # If this is not wanted it can be set to "off" to disable the timeout and retry
  38. # forever.
  39. reconnect-timeout = off
  40. }

Failure handling

When the cluster client is started it must be provided with a list of initial contacts which are cluster nodes where receptionists are running. It will then repeatedly (with an interval configurable by establishing-get-contacts-interval) try to contact those until it gets in contact with one of them. While running, the list of contacts are continuously updated with data from the receptionists (again, with an interval configurable with refresh-contacts-interval), so that if there are more receptionists in the cluster than the initial contacts provided to the client the client will learn about them.

While the client is running it will detect failures in its connection to the receptionist by heartbeats if more than a configurable amount of heartbeats are missed the client will try to reconnect to its known set of contacts to find a receptionist it can access.

When the cluster cannot be reached at all

It is possible to make the cluster client stop entirely if it cannot find a receptionist it can talk to within a configurable interval. This is configured with the reconnect-timeout, which defaults to off. This can be useful when initial contacts are provided from some kind of service registry, cluster node addresses are entirely dynamic and the entire cluster might shut down or crash, be restarted on new addresses. Since the client will be stopped in that case a monitoring actor can watch it and upon Terminate a new set of initial contacts can be fetched and a new cluster client started.

Cluster Sharding

Cluster sharding is useful when you need to distribute actors across several nodes in the cluster and want to be able to interact with them using their logical identifier, but without having to care about their physical location in the cluster, which might also change over time.

It could for example be actors representing Aggregate Roots in Domain-Driven Design terminology. Here we call these actors "entities". These actors typically have persistent (durable) state, but this feature is not limited to actors with persistent state.

Cluster sharding is typically used when you have many stateful actors that together consume more resources (e.g. memory) than fit on one machine. If you only have a few stateful actors it might be easier to run them on a Cluster Singleton node.

In this context sharding means that actors with an identifier, so called entities, can be automatically distributed across multiple nodes in the cluster. Each entity actor runs only at one place, and messages can be sent to the entity without requiring the sender to know the location of the destination actor. This is achieved by sending the messages via aShardRegion actor provided by this extension, which knows how to route the message with the entity id to the final destination.

Cluster sharding will not be active on members with status WeaklyUp if that feature is enabled.

Warning

Don't use Cluster Sharding together with Automatic Downing, since it allows the cluster to split up into two separate clusters, which in turn will result in multiple shards and entities being started, one in each separate cluster! See Downing.

An Example

This is how an entity actor may look like:

  1. case object Increment
  2. case object Decrement
  3. final case class Get(counterId: Long)
  4. final case class EntityEnvelope(id: Long, payload: Any)
  5.  
  6. case object Stop
  7. final case class CounterChanged(delta: Int)
  8.  
  9. class Counter extends PersistentActor {
  10. import ShardRegion.Passivate
  11.  
  12. context.setReceiveTimeout(120.seconds)
  13.  
  14. // self.path.name is the entity identifier (utf-8 URL-encoded)
  15. override def persistenceId: String = "Counter-" + self.path.name
  16.  
  17. var count = 0
  18.  
  19. def updateState(event: CounterChanged): Unit =
  20. count += event.delta
  21.  
  22. override def receiveRecover: Receive = {
  23. case evt: CounterChanged updateState(evt)
  24. }
  25.  
  26. override def receiveCommand: Receive = {
  27. case Increment persist(CounterChanged(+1))(updateState)
  28. case Decrement persist(CounterChanged(-1))(updateState)
  29. case Get(_) sender() ! count
  30. case ReceiveTimeout context.parent ! Passivate(stopMessage = Stop)
  31. case Stop context.stop(self)
  32. }
  33. }

The above actor uses event sourcing and the support provided in PersistentActor to store its state. It does not have to be a persistent actor, but in case of failure or migration of entities between nodes it must be able to recover its state if it is valuable.

Note how the persistenceId is defined. The name of the actor is the entity identifier (utf-8 URL-encoded). You may define it another way, but it must be unique.

When using the sharding extension you are first, typically at system startup on each node in the cluster, supposed to register the supported entity types with the ClusterSharding.start method. ClusterSharding.start gives you the reference which you can pass along.

  1. val counterRegion: ActorRef = ClusterSharding(system).start(
  2. typeName = "Counter",
  3. entityProps = Props[Counter],
  4. settings = ClusterShardingSettings(system),
  5. extractEntityId = extractEntityId,
  6. extractShardId = extractShardId)

The extractEntityId and extractShardId are two application specific functions to extract the entity identifier and the shard identifier from incoming messages.

  1. val extractEntityId: ShardRegion.ExtractEntityId = {
  2. case EntityEnvelope(id, payload) (id.toString, payload)
  3. case msg @ Get(id) (id.toString, msg)
  4. }
  5.  
  6. val numberOfShards = 100
  7.  
  8. val extractShardId: ShardRegion.ExtractShardId = {
  9. case EntityEnvelope(id, _) (id % numberOfShards).toString
  10. case Get(id) (id % numberOfShards).toString
  11. }

This example illustrates two different ways to define the entity identifier in the messages:

  • The Get message includes the identifier itself.
  • The EntityEnvelope holds the identifier, and the actual message that is sent to the entity actor is wrapped in the envelope.

Note how these two messages types are handled in the extractEntityId function shown above. The message sent to the entity actor is the second part of the tuple return by the extractEntityId and that makes it possible to unwrap envelopes if needed.

A shard is a group of entities that will be managed together. The grouping is defined by the extractShardId function shown above. For a specific entity identifier the shard identifier must always be the same.

Creating a good sharding algorithm is an interesting challenge in itself. Try to produce a uniform distribution, i.e. same amount of entities in each shard. As a rule of thumb, the number of shards should be a factor ten greater than the planned maximum number of cluster nodes. Less shards than number of nodes will result in that some nodes will not host any shards. Too many shards will result in less efficient management of the shards, e.g. rebalancing overhead, and increased latency because the coordinator is involved in the routing of the first message for each shard. The sharding algorithm must be the same on all nodes in a running cluster. It can be changed after stopping all nodes in the cluster.

A simple sharding algorithm that works fine in most cases is to take the absolute value of the hashCode of the entity identifier modulo number of shards. As a convenience this is provided by theShardRegion.HashCodeMessageExtractor.

Messages to the entities are always sent via the local ShardRegion. The ShardRegion actor reference for a named entity type is returned by ClusterSharding.start and it can also be retrieved withClusterSharding.shardRegion. The ShardRegion will lookup the location of the shard for the entity if it does not already know its location. It will delegate the message to the right node and it will create the entity actor on demand, i.e. when the first message for a specific entity is delivered.

  1. val counterRegion: ActorRef = ClusterSharding(system).shardRegion("Counter")
  2. counterRegion ! Get(123)
  3. expectMsg(0)
  4.  
  5. counterRegion ! EntityEnvelope(123, Increment)
  6. counterRegion ! Get(123)
  7. expectMsg(1)

A more comprehensive sample is available in the Lightbend Activator tutorial named Akka Cluster Sharding with Scala!.

How it works

The ShardRegion actor is started on each node in the cluster, or group of nodes tagged with a specific role. TheShardRegion is created with two application specific functions to extract the entity identifier and the shard identifier from incoming messages. A shard is a group of entities that will be managed together. For the first message in a specific shard the ShardRegion request the location of the shard from a central coordinator, the ShardCoordinator.

The ShardCoordinator decides which ShardRegion shall own the Shard and informs that ShardRegion. The region will confirm this request and create the Shard supervisor as a child actor. The individual Entities will then be created when needed by the Shard actor. Incoming messages thus travel via the ShardRegion and the Shard to the target Entity.

If the shard home is another ShardRegion instance messages will be forwarded to that ShardRegion instance instead. While resolving the location of a shard incoming messages for that shard are buffered and later delivered when the shard home is known. Subsequent messages to the resolved shard can be delivered to the target destination immediately without involving the ShardCoordinator.

Scenario 1:

  1. Incoming message M1 to ShardRegion instance R1.
  2. M1 is mapped to shard S1. R1 doesn't know about S1, so it asks the coordinator C for the location of S1.
  3. C answers that the home of S1 is R1.
  4. R1 creates child actor for the entity E1 and sends buffered messages for S1 to E1 child
  5. All incoming messages for S1 which arrive at R1 can be handled by R1 without C. It creates entity children as needed, and forwards messages to them.

Scenario 2:

  1. Incoming message M2 to R1.
  2. M2 is mapped to S2. R1 doesn't know about S2, so it asks C for the location of S2.
  3. C answers that the home of S2 is R2.
  4. R1 sends buffered messages for S2 to R2
  5. All incoming messages for S2 which arrive at R1 can be handled by R1 without C. It forwards messages to R2.
  6. R2 receives message for S2, ask C, which answers that the home of S2 is R2, and we are in Scenario 1 (but for R2).

To make sure that at most one instance of a specific entity actor is running somewhere in the cluster it is important that all nodes have the same view of where the shards are located. Therefore the shard allocation decisions are taken by the central ShardCoordinator, which is running as a cluster singleton, i.e. one instance on the oldest member among all cluster nodes or a group of nodes tagged with a specific role.

The logic that decides where a shard is to be located is defined in a pluggable shard allocation strategy. The default implementation ShardCoordinator.LeastShardAllocationStrategy allocates new shards to the ShardRegionwith least number of previously allocated shards. This strategy can be replaced by an application specific implementation.

To be able to use newly added members in the cluster the coordinator facilitates rebalancing of shards, i.e. migrate entities from one node to another. In the rebalance process the coordinator first notifies all ShardRegion actors that a handoff for a shard has started. That means they will start buffering incoming messages for that shard, in the same way as if the shard location is unknown. During the rebalance process the coordinator will not answer any requests for the location of shards that are being rebalanced, i.e. local buffering will continue until the handoff is completed. TheShardRegion responsible for the rebalanced shard will stop all entities in that shard by sending the specifiedhandOffStopMessage (default PoisonPill) to them. When all entities have been terminated the ShardRegionowning the entities will acknowledge the handoff as completed to the coordinator. Thereafter the coordinator will reply to requests for the location of the shard and thereby allocate a new home for the shard and then buffered messages in the ShardRegion actors are delivered to the new location. This means that the state of the entities are not transferred or migrated. If the state of the entities are of importance it should be persistent (durable), e.g. with Persistence, so that it can be recovered at the new location.

The logic that decides which shards to rebalance is defined in a pluggable shard allocation strategy. The default implementation ShardCoordinator.LeastShardAllocationStrategy picks shards for handoff from theShardRegion with most number of previously allocated shards. They will then be allocated to the ShardRegion with least number of previously allocated shards, i.e. new members in the cluster. There is a configurable threshold of how large the difference must be to begin the rebalancing. This strategy can be replaced by an application specific implementation.

The state of shard locations in the ShardCoordinator is persistent (durable) with Persistence to survive failures. Since it is running in a cluster Persistence must be configured with a distributed journal. When a crashed or unreachable coordinator node has been removed (via down) from the cluster a new ShardCoordinator singleton actor will take over and the state is recovered. During such a failure period shards with known location are still available, while messages for new (unknown) shards are buffered until the new ShardCoordinator becomes available.

As long as a sender uses the same ShardRegion actor to deliver messages to an entity actor the order of the messages is preserved. As long as the buffer limit is not reached messages are delivered on a best effort basis, with at-most once delivery semantics, in the same way as ordinary message sending. Reliable end-to-end messaging, with at-least-once semantics can be added by using AtLeastOnceDelivery in Persistence.

Some additional latency is introduced for messages targeted to new or previously unused shards due to the round-trip to the coordinator. Rebalancing of shards may also add latency. This should be considered when designing the application specific shard resolution, e.g. to avoid too fine grained shards.

Distributed Data Mode

Instead of using Persistence it is possible to use the Distributed Data module as storage for the state of the sharding coordinator. In such case the state of the ShardCoordinator will be replicated inside a cluster by the Distributed Datamodule with WriteMajority/ReadMajority consistency.

This mode can be enabled by setting configuration property:

  1. akka.cluster.sharding.state-store-mode = ddata

It is using the Distributed Data extension that must be running on all nodes in the cluster. Therefore you should add that extension to the configuration to make sure that it is started on all nodes:

  1. akka.extensions += "akka.cluster.ddata.DistributedData"

You must explicitly add the akka-distributed-data-experimental dependency to your build if you use this mode. It is possible to remove akka-persistence dependency from a project if it is not used in user code and remember-entities is off. Using it together with Remember Entities shards will be recreated after rebalancing, however will not be recreated after a clean cluster start as the Sharding Coordinator state is empty after a clean cluster start when using ddata mode. When Remember Entities is on Sharding Region always keeps data usig persistence, no matter how State Store Mode is set.

Warning

The ddata mode is considered as “experimental” as of its introduction in Akka 2.4.0, since it depends on the experimental Distributed Data module.

Proxy Only Mode

The ShardRegion actor can also be started in proxy only mode, i.e. it will not host any entities itself, but knows how to delegate messages to the right location. A ShardRegion is started in proxy only mode with the methodClusterSharding.startProxy method.

Passivation

If the state of the entities are persistent you may stop entities that are not used to reduce memory consumption. This is done by the application specific implementation of the entity actors for example by defining receive timeout (context.setReceiveTimeout). If a message is already enqueued to the entity when it stops itself the enqueued message in the mailbox will be dropped. To support graceful passivation without losing such messages the entity actor can send ShardRegion.Passivate to its parent Shard. The specified wrapped message in Passivate will be sent back to the entity, which is then supposed to stop itself. Incoming messages will be buffered by the Shard between reception of Passivate and termination of the entity. Such buffered messages are thereafter delivered to a new incarnation of the entity.

Remembering Entities

The list of entities in each Shard can be made persistent (durable) by setting the rememberEntities flag to true inClusterShardingSettings when calling ClusterSharding.start. When configured to remember entities, whenever a Shard is rebalanced onto another node or recovers after a crash it will recreate all the entities which were previously running in that Shard. To permanently stop entities, a Passivate message must be sent to the parent of the entity actor, otherwise the entity will be automatically restarted after the entity restart backoff specified in the configuration.

When rememberEntities is set to false, a Shard will not automatically restart any entities after a rebalance or recovering from a crash. Entities will only be started once the first message for that entity has been received in theShard. Entities will not be restarted if they stop without using a Passivate.

Note that the state of the entities themselves will not be restored unless they have been made persistent, e.g. withPersistence.

Supervision

If you need to use another supervisorStrategy for the entity actors than the default (restarting) strategy you need to create an intermediate parent actor that defines the supervisorStrategy to the child entity actor.

  1. class CounterSupervisor extends Actor {
  2. val counter = context.actorOf(Props[Counter], "theCounter")
  3.  
  4. override val supervisorStrategy = OneForOneStrategy() {
  5. case _: IllegalArgumentException SupervisorStrategy.Resume
  6. case _: ActorInitializationException SupervisorStrategy.Stop
  7. case _: DeathPactException SupervisorStrategy.Stop
  8. case _: Exception SupervisorStrategy.Restart
  9. }
  10.  
  11. def receive = {
  12. case msg counter forward msg
  13. }
  14. }

You start such a supervisor in the same way as if it was the entity actor.

  1. ClusterSharding(system).start(
  2. typeName = "SupervisedCounter",
  3. entityProps = Props[CounterSupervisor],
  4. settings = ClusterShardingSettings(system),
  5. extractEntityId = extractEntityId,
  6. extractShardId = extractShardId)

Note that stopped entities will be started again when a new message is targeted to the entity.

Graceful Shutdown

You can send the message ShardRegion.GracefulShutdown message to the ShardRegion actor to handoff all shards that are hosted by that ShardRegion and then the ShardRegion actor will be stopped. You can watch theShardRegion actor to know when it is completed. During this period other regions will buffer messages for those shards in the same way as when a rebalance is triggered by the coordinator. When the shards have been stopped the coordinator will allocate these shards elsewhere.

When the ShardRegion has terminated you probably want to leave the cluster, and shut down the ActorSystem.

This is how to do that:

  1. class IllustrateGracefulShutdown extends Actor {
  2. val system = context.system
  3. val cluster = Cluster(system)
  4. val region = ClusterSharding(system).shardRegion("Entity")
  5.  
  6. def receive = {
  7. case "leave"
  8. context.watch(region)
  9. region ! ShardRegion.GracefulShutdown
  10.  
  11. case Terminated(`region`)
  12. cluster.registerOnMemberRemoved(self ! "member-removed")
  13. cluster.leave(cluster.selfAddress)
  14.  
  15. case "member-removed"
  16. // Let singletons hand over gracefully before stopping the system
  17. import context.dispatcher
  18. system.scheduler.scheduleOnce(10.seconds, self, "stop-system")
  19.  
  20. case "stop-system"
  21. system.terminate()
  22. }
  23. }

Removal of Internal Cluster Sharding Data

The Cluster Sharding coordinator stores the locations of the shards using Akka Persistence. This data can safely be removed when restarting the whole Akka Cluster. Note that this is not application data.

There is a utility program akka.cluster.sharding.RemoveInternalClusterShardingData that removes this data.

Warning

Never use this program while there are running Akka Cluster nodes that are using Cluster Sharding. Stop all Cluster nodes before using this program.

It can be needed to remove the data if the Cluster Sharding coordinator cannot startup because of corrupt data, which may happen if accidentally two clusters were running at the same time, e.g. caused by using auto-down and there was a network partition.

Warning

Don't use Cluster Sharding together with Automatic Downing, since it allows the cluster to split up into two separate clusters, which in turn will result in multiple shards and entities being started, one in each separate cluster! See Downing.

Use this program as a standalone Java main program:

  1. java -classpath <jar files, including akka-cluster-sharding>
  2. akka.cluster.sharding.RemoveInternalClusterShardingData
  3. -2.3 entityType1 entityType2 entityType3

The program is included in the akka-cluster-sharding jar file. It is easiest to run it with same classpath and configuration as your ordinary application. It can be run from sbt or maven in similar way.

Specify the entity type names (same as you use in the start method of ClusterSharding) as program arguments.

If you specify -2.3 as the first program argument it will also try to remove data that was stored by Cluster Sharding in Akka 2.3.x using different persistenceId.

Dependencies

To use the Cluster Sharding you must add the following dependency in your project.

sbt:

  1. "com.typesafe.akka" %% "akka-cluster-sharding" % "2.4.10"

maven:

  1. <dependency>
  2. <groupId>com.typesafe.akka</groupId>
  3. <artifactId>akka-cluster-sharding_2.11</artifactId>
  4. <version>2.4.10</version>
  5. </dependency>

Configuration

The ClusterSharding extension can be configured with the following properties. These configuration properties are read by the ClusterShardingSettings when created with a ActorSystem parameter. It is also possible to amend the ClusterShardingSettings or create it from another config section with the same layout as below.ClusterShardingSettings is a parameter to the start method of the ClusterSharding extension, i.e. each each entity type can be configured with different settings if needed.

  1. # Settings for the ClusterShardingExtension
  2. akka.cluster.sharding {
  3.  
  4. # The extension creates a top level actor with this name in top level system scope,
  5. # e.g. '/system/sharding'
  6. guardian-name = sharding
  7.  
  8. # Specifies that entities runs on cluster nodes with a specific role.
  9. # If the role is not specified (or empty) all nodes in the cluster are used.
  10. role = ""
  11.  
  12. # When this is set to 'on' the active entity actors will automatically be restarted
  13. # upon Shard restart. i.e. if the Shard is started on a different ShardRegion
  14. # due to rebalance or crash.
  15. remember-entities = off
  16.  
  17. # If the coordinator can't store state changes it will be stopped
  18. # and started again after this duration, with an exponential back-off
  19. # of up to 5 times this duration.
  20. coordinator-failure-backoff = 5 s
  21.  
  22. # The ShardRegion retries registration and shard location requests to the
  23. # ShardCoordinator with this interval if it does not reply.
  24. retry-interval = 2 s
  25.  
  26. # Maximum number of messages that are buffered by a ShardRegion actor.
  27. buffer-size = 100000
  28.  
  29. # Timeout of the shard rebalancing process.
  30. handoff-timeout = 60 s
  31.  
  32. # Time given to a region to acknowledge it's hosting a shard.
  33. shard-start-timeout = 10 s
  34.  
  35. # If the shard is remembering entities and can't store state changes
  36. # will be stopped and then started again after this duration. Any messages
  37. # sent to an affected entity may be lost in this process.
  38. shard-failure-backoff = 10 s
  39.  
  40. # If the shard is remembering entities and an entity stops itself without
  41. # using passivate. The entity will be restarted after this duration or when
  42. # the next message for it is received, which ever occurs first.
  43. entity-restart-backoff = 10 s
  44.  
  45. # Rebalance check is performed periodically with this interval.
  46. rebalance-interval = 10 s
  47.  
  48. # Absolute path to the journal plugin configuration entity that is to be
  49. # used for the internal persistence of ClusterSharding. If not defined
  50. # the default journal plugin is used. Note that this is not related to
  51. # persistence used by the entity actors.
  52. journal-plugin-id = ""
  53.  
  54. # Absolute path to the snapshot plugin configuration entity that is to be
  55. # used for the internal persistence of ClusterSharding. If not defined
  56. # the default snapshot plugin is used. Note that this is not related to
  57. # persistence used by the entity actors.
  58. snapshot-plugin-id = ""
  59.  
  60. # Parameter which determines how the coordinator will be store a state
  61. # valid values either "persistence" or "ddata"
  62. # The "ddata" mode is experimental, since it depends on the experimental
  63. # module akka-distributed-data-experimental.
  64. state-store-mode = "persistence"
  65.  
  66. # The shard saves persistent snapshots after this number of persistent
  67. # events. Snapshots are used to reduce recovery times.
  68. snapshot-after = 1000
  69.  
  70. # Setting for the default shard allocation strategy
  71. least-shard-allocation-strategy {
  72. # Threshold of how large the difference between most and least number of
  73. # allocated shards must be to begin the rebalancing.
  74. rebalance-threshold = 10
  75.  
  76. # The number of ongoing rebalancing processes is limited to this number.
  77. max-simultaneous-rebalance = 3
  78. }
  79.  
  80. # Timeout of waiting the initial distributed state (an initial state will be queried again if the timeout happened)
  81. # works only for state-store-mode = "ddata"
  82. waiting-for-state-timeout = 5 s
  83.  
  84. # Timeout of waiting for update the distributed state (update will be retried if the timeout happened)
  85. # works only for state-store-mode = "ddata"
  86. updating-state-timeout = 5 s
  87.  
  88. # The shard uses this strategy to determines how to recover the underlying entity actors. The strategy is only used
  89. # by the persistent shard when rebalancing or restarting. The value can either be "all" or "constant". The "all"
  90. # strategy start all the underlying entity actors at the same time. The constant strategy will start the underlying
  91. # entity actors at a fix rate. The default strategy "all".
  92. entity-recovery-strategy = "all"
  93.  
  94. # Default settings for the constant rate entity recovery strategy
  95. entity-recovery-constant-rate-strategy {
  96. # Sets the frequency at which a batch of entity actors is started.
  97. frequency = 100 ms
  98. # Sets the number of entity actors to be restart at a particular interval
  99. number-of-entities = 5
  100. }
  101.  
  102. # Settings for the coordinator singleton. Same layout as akka.cluster.singleton.
  103. # The "role" of the singleton configuration is not used. The singleton role will
  104. # be the same as "akka.cluster.sharding.role".
  105. coordinator-singleton = ${akka.cluster.singleton}
  106.  
  107. # The id of the dispatcher to use for ClusterSharding actors.
  108. # If not specified default dispatcher is used.
  109. # If specified you need to define the settings of the actual dispatcher.
  110. # This dispatcher for the entity actors is defined by the user provided
  111. # Props, i.e. this dispatcher is not used for the entity actors.
  112. use-dispatcher = ""
  113. }

Custom shard allocation strategy can be defined in an optional parameter to ClusterSharding.start. See the API documentation of ShardAllocationStrategy for details of how to implement a custom shard allocation strategy.

Inspecting cluster sharding state

Two requests to inspect the cluster state are available:

ShardRegion.GetShardRegionState which will return a ShardRegion.CurrentShardRegionState that contains the identifiers of the shards running in a Region and what entities are alive for each of them.

ShardRegion.GetClusterShardingStats which will query all the regions in the cluster and return aShardRegion.ClusterShardingStats containing the identifiers of the shards running in each region and a count of entities that are alive in each shard.

The purpose of these messages is testing and monitoring, they are not provided to give access to directly sending messages to the individual entities.

Cluster Metrics Extension

Introduction

The member nodes of the cluster can collect system health metrics and publish that to other cluster nodes and to the registered subscribers on the system event bus with the help of Cluster Metrics Extension.

Cluster metrics information is primarily used for load-balancing routers, and can also be used to implement advanced metrics-based node life cycles, such as "Node Let-it-crash" when CPU steal time becomes excessive.

Cluster Metrics Extension is a separate Akka module delivered in akka-cluster-metrics jar.

To enable usage of the extension you need to add the following dependency to your project:

  1. "com.typesafe.akka" % "akka-cluster-metrics_2.11" % "2.4.10"

and add the following configuration stanza to your application.conf

  1. akka.extensions = [ "akka.cluster.metrics.ClusterMetricsExtension" ]

Make sure to disable legacy metrics in akka-cluster: akka.cluster.metrics.enabled=off, since it is still enabled in akka-cluster by default (for compatibility with past releases).

Cluster members with status WeaklyUp, if that feature is enabled, will participate in Cluster Metrics collection and dissemination.

Metrics Collector

Metrics collection is delegated to an implementation of akka.cluster.metrics.MetricsCollector.

Different collector implementations provide different subsets of metrics published to the cluster. Certain message routing and let-it-crash functions may not work when Sigar is not provisioned.

Cluster metrics extension comes with two built-in collector implementations:

  1. akka.cluster.metrics.SigarMetricsCollector, which requires Sigar provisioning, and is more rich/precise
  2. akka.cluster.metrics.JmxMetricsCollector, which is used as fall back, and is less rich/precise

You can also plug-in your own metrics collector implementation.

By default, metrics extension will use collector provider fall back and will try to load them in this order:

  1. configured user-provided collector
  2. built-in akka.cluster.metrics.SigarMetricsCollector
  3. and finally akka.cluster.metrics.JmxMetricsCollector

Metrics Events

Metrics extension periodically publishes current snapshot of the cluster metrics to the node system event bus.

The publication period is controlled by the akka.cluster.metrics.collector.sample-period setting.

The payload of the akka.cluster.metrics.ClusterMetricsChanged event will contain latest metrics of the node as well as other cluster member nodes metrics gossip which was received during the collector sample period.

You can subscribe your metrics listener actors to these events in order to implement custom node lifecycle

  1. ClusterMetricsExtension(system).subscribe(metricsListenerActor)

Hyperic Sigar Provisioning

Both user-provided and built-in metrics collectors can optionally use Hyperic Sigar for a wider and more accurate range of metrics compared to what can be retrieved from ordinary JMX MBeans.

Sigar is using a native o/s library, and requires library provisioning, i.e. deployment, extraction and loading of the o/s native library into JVM at runtime.

User can provision Sigar classes and native library in one of the following ways:

  1. Use Kamon sigar-loader as a project dependency for the user project. Metrics extension will extract and load sigar library on demand with help of Kamon sigar provisioner.
  2. Use Kamon sigar-loader as java agent: java -javaagent:/path/to/sigar-loader.jar. Kamon sigar loader agent will extract and load sigar library during JVM start.
  3. Place sigar.jar on the classpath and Sigar native library for the o/s on the java.library.path. User is required to manage both project dependency and library deployment manually.

Warning

When using Kamon sigar-loader and running multiple instances of the same application on the same host, you have to make sure that sigar library is extracted to a unique per instance directory. You can control the extract directory with the akka.cluster.metrics.native-library-extract-folder configuration setting.

To enable usage of Sigar you can add the following dependency to the user project

  1. "io.kamon" % "sigar-loader" % "1.6.6-rev002"

You can download Kamon sigar-loader from Maven Central

Adaptive Load Balancing

The AdaptiveLoadBalancingPool / AdaptiveLoadBalancingGroup performs load balancing of messages to cluster nodes based on the cluster metrics data. It uses random selection of routees with probabilities derived from the remaining capacity of the corresponding node. It can be configured to use a specific MetricsSelector to produce the probabilities, a.k.a. weights:

The collected metrics values are smoothed with exponential weighted moving average. In the Configuration you can adjust how quickly past data is decayed compared to new data.

Let's take a look at this router in action. What can be more demanding than calculating factorials?

The backend worker that performs the factorial calculation:

  1. class FactorialBackend extends Actor with ActorLogging {
  2.  
  3. import context.dispatcher
  4.  
  5. def receive = {
  6. case (n: Int) =>
  7. Future(factorial(n)) map { result => (n, result) } pipeTo sender()
  8. }
  9.  
  10. def factorial(n: Int): BigInt = {
  11. @tailrec def factorialAcc(acc: BigInt, n: Int): BigInt = {
  12. if (n <= 1) acc
  13. else factorialAcc(acc * n, n - 1)
  14. }
  15. factorialAcc(BigInt(1), n)
  16. }
  17.  
  18. }

The frontend that receives user jobs and delegates to the backends via the router:

  1. class FactorialFrontend(upToN: Int, repeat: Boolean) extends Actor with ActorLogging {
  2.  
  3. val backend = context.actorOf(FromConfig.props(),
  4. name = "factorialBackendRouter")
  5.  
  6. override def preStart(): Unit = {
  7. sendJobs()
  8. if (repeat) {
  9. context.setReceiveTimeout(10.seconds)
  10. }
  11. }
  12.  
  13. def receive = {
  14. case (n: Int, factorial: BigInt) =>
  15. if (n == upToN) {
  16. log.debug("{}! = {}", n, factorial)
  17. if (repeat) sendJobs()
  18. else context.stop(self)
  19. }
  20. case ReceiveTimeout =>
  21. log.info("Timeout")
  22. sendJobs()
  23. }
  24.  
  25. def sendJobs(): Unit = {
  26. log.info("Starting batch of factorials up to [{}]", upToN)
  27. 1 to upToN foreach { backend ! _ }
  28. }
  29. }

As you can see, the router is defined in the same way as other routers, and in this case it is configured as follows:

  1. akka.actor.deployment {
  2. /factorialFrontend/factorialBackendRouter = {
  3. # Router type provided by metrics extension.
  4. router = cluster-metrics-adaptive-group
  5. # Router parameter specific for metrics extension.
  6. # metrics-selector = heap
  7. # metrics-selector = load
  8. # metrics-selector = cpu
  9. metrics-selector = mix
  10. #
  11. routees.paths = ["/user/factorialBackend"]
  12. cluster {
  13. enabled = on
  14. use-role = backend
  15. allow-local-routees = off
  16. }
  17. }
  18. }

It is only router type and the metrics-selector parameter that is specific to this router, other things work in the same way as other routers.

The same type of router could also have been defined in code:

  1. import akka.cluster.routing.ClusterRouterGroup
  2. import akka.cluster.routing.ClusterRouterGroupSettings
  3. import akka.cluster.metrics.AdaptiveLoadBalancingGroup
  4. import akka.cluster.metrics.HeapMetricsSelector
  5.  
  6. val backend = context.actorOf(
  7. ClusterRouterGroup(AdaptiveLoadBalancingGroup(HeapMetricsSelector),
  8. ClusterRouterGroupSettings(
  9. totalInstances = 100, routeesPaths = List("/user/factorialBackend"),
  10. allowLocalRoutees = true, useRole = Some("backend"))).props(),
  11. name = "factorialBackendRouter2")
  1. import akka.cluster.routing.ClusterRouterPool
  2. import akka.cluster.routing.ClusterRouterPoolSettings
  3. import akka.cluster.metrics.AdaptiveLoadBalancingPool
  4. import akka.cluster.metrics.SystemLoadAverageMetricsSelector
  5.  
  6. val backend = context.actorOf(
  7. ClusterRouterPool(AdaptiveLoadBalancingPool(
  8. SystemLoadAverageMetricsSelector), ClusterRouterPoolSettings(
  9. totalInstances = 100, maxInstancesPerNode = 3,
  10. allowLocalRoutees = false, useRole = Some("backend"))).props(Props[FactorialBackend]),
  11. name = "factorialBackendRouter3")

The Lightbend Activator tutorial named Akka Cluster Samples with Scala. contains the full source code and instructions of how to run the Adaptive Load Balancing sample.

Subscribe to Metrics Events

It is possible to subscribe to the metrics events directly to implement other functionality.

  1. import akka.actor.ActorLogging
  2. import akka.actor.Actor
  3. import akka.cluster.Cluster
  4. import akka.cluster.metrics.ClusterMetricsEvent
  5. import akka.cluster.metrics.ClusterMetricsChanged
  6. import akka.cluster.ClusterEvent.CurrentClusterState
  7. import akka.cluster.metrics.NodeMetrics
  8. import akka.cluster.metrics.StandardMetrics.HeapMemory
  9. import akka.cluster.metrics.StandardMetrics.Cpu
  10. import akka.cluster.metrics.ClusterMetricsExtension
  11.  
  12. class MetricsListener extends Actor with ActorLogging {
  13. val selfAddress = Cluster(context.system).selfAddress
  14. val extension = ClusterMetricsExtension(context.system)
  15.  
  16. // Subscribe unto ClusterMetricsEvent events.
  17. override def preStart(): Unit = extension.subscribe(self)
  18. // Unsubscribe from ClusterMetricsEvent events.
  19. override def postStop(): Unit = extension.unsubscribe(self)
  20.  
  21. def receive = {
  22. case ClusterMetricsChanged(clusterMetrics) =>
  23. clusterMetrics.filter(_.address == selfAddress) foreach { nodeMetrics =>
  24. logHeap(nodeMetrics)
  25. logCpu(nodeMetrics)
  26. }
  27. case state: CurrentClusterState => // Ignore.
  28. }
  29.  
  30. def logHeap(nodeMetrics: NodeMetrics): Unit = nodeMetrics match {
  31. case HeapMemory(address, timestamp, used, committed, max) =>
  32. log.info("Used heap: {} MB", used.doubleValue / 1024 / 1024)
  33. case _ => // No heap info.
  34. }
  35.  
  36. def logCpu(nodeMetrics: NodeMetrics): Unit = nodeMetrics match {
  37. case Cpu(address, timestamp, Some(systemLoadAverage), cpuCombined, cpuStolen, processors) =>
  38. log.info("Load: {} ({} processors)", systemLoadAverage, processors)
  39. case _ => // No cpu info.
  40. }
  41. }

Custom Metrics Collector

Metrics collection is delegated to the implementation of akka.cluster.metrics.MetricsCollector

You can plug-in your own metrics collector instead of built-in akka.cluster.metrics.SigarMetricsCollector orakka.cluster.metrics.JmxMetricsCollector.

Look at those two implementations for inspiration.

Custom metrics collector implementation class must be specified in theakka.cluster.metrics.collector.provider configuration property.

Configuration

The Cluster metrics extension can be configured with the following properties:

  1. ##############################################
  2. # Akka Cluster Metrics Reference Config File #
  3. ##############################################
  4.  
  5. # This is the reference config file that contains all the default settings.
  6. # Make your edits in your application.conf in order to override these settings.
  7.  
  8. # Sigar provisioning:
  9. #
  10. # User can provision sigar classes and native library in one of the following ways:
  11. #
  12. # 1) Use https://github.com/kamon-io/sigar-loader Kamon sigar-loader as a project dependency for the user project.
  13. # Metrics extension will extract and load sigar library on demand with help of Kamon sigar provisioner.
  14. #
  15. # 2) Use https://github.com/kamon-io/sigar-loader Kamon sigar-loader as java agent: `java -javaagent:/path/to/sigar-loader.jar`
  16. # Kamon sigar loader agent will extract and load sigar library during JVM start.
  17. #
  18. # 3) Place `sigar.jar` on the `classpath` and sigar native library for the o/s on the `java.library.path`
  19. # User is required to manage both project dependency and library deployment manually.
  20.  
  21. # Cluster metrics extension.
  22. # Provides periodic statistics collection and publication throughout the cluster.
  23. akka.cluster.metrics {
  24. # Full path of dispatcher configuration key.
  25. # Use "" for default key `akka.actor.default-dispatcher`.
  26. dispatcher = ""
  27. # How long should any actor wait before starting the periodic tasks.
  28. periodic-tasks-initial-delay = 1s
  29. # Sigar native library extract location.
  30. # Use per-application-instance scoped location, such as program working directory.
  31. native-library-extract-folder = ${user.dir}"/native"
  32. # Metrics supervisor actor.
  33. supervisor {
  34. # Actor name. Example name space: /system/cluster-metrics
  35. name = "cluster-metrics"
  36. # Supervision strategy.
  37. strategy {
  38. #
  39. # FQCN of class providing `akka.actor.SupervisorStrategy`.
  40. # Must have a constructor with signature `<init>(com.typesafe.config.Config)`.
  41. # Default metrics strategy provider is a configurable extension of `OneForOneStrategy`.
  42. provider = "akka.cluster.metrics.ClusterMetricsStrategy"
  43. #
  44. # Configuration of the default strategy provider.
  45. # Replace with custom settings when overriding the provider.
  46. configuration = {
  47. # Log restart attempts.
  48. loggingEnabled = true
  49. # Child actor restart-on-failure window.
  50. withinTimeRange = 3s
  51. # Maximum number of restart attempts before child actor is stopped.
  52. maxNrOfRetries = 3
  53. }
  54. }
  55. }
  56. # Metrics collector actor.
  57. collector {
  58. # Enable or disable metrics collector for load-balancing nodes.
  59. # Metrics collection can also be controlled at runtime by sending control messages
  60. # to /system/cluster-metrics actor: `akka.cluster.metrics.{CollectionStartMessage,CollectionStopMessage}`
  61. enabled = on
  62. # FQCN of the metrics collector implementation.
  63. # It must implement `akka.cluster.metrics.MetricsCollector` and
  64. # have public constructor with akka.actor.ActorSystem parameter.
  65. # Will try to load in the following order of priority:
  66. # 1) configured custom collector 2) internal `SigarMetricsCollector` 3) internal `JmxMetricsCollector`
  67. provider = ""
  68. # Try all 3 available collector providers, or else fail on the configured custom collector provider.
  69. fallback = true
  70. # How often metrics are sampled on a node.
  71. # Shorter interval will collect the metrics more often.
  72. # Also controls frequency of the metrics publication to the node system event bus.
  73. sample-interval = 3s
  74. # How often a node publishes metrics information to the other nodes in the cluster.
  75. # Shorter interval will publish the metrics gossip more often.
  76. gossip-interval = 3s
  77. # How quickly the exponential weighting of past data is decayed compared to
  78. # new data. Set lower to increase the bias toward newer values.
  79. # The relevance of each data sample is halved for every passing half-life
  80. # duration, i.e. after 4 times the half-life, a data sample’s relevance is
  81. # reduced to 6% of its original relevance. The initial relevance of a data
  82. # sample is given by 1 – 0.5 ^ (collect-interval / half-life).
  83. # See http://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
  84. moving-average-half-life = 12s
  85. }
  86. }
  87.  
  88. # Cluster metrics extension serializers and routers.
  89. akka.actor {
  90. # Protobuf serializer for remote cluster metrics messages.
  91. serializers {
  92. akka-cluster-metrics = "akka.cluster.metrics.protobuf.MessageSerializer"
  93. }
  94. # Interface binding for remote cluster metrics messages.
  95. serialization-bindings {
  96. "akka.cluster.metrics.ClusterMetricsMessage" = akka-cluster-metrics
  97. }
  98. # Globally unique metrics extension serializer identifier.
  99. serialization-identifiers {
  100. "akka.cluster.metrics.protobuf.MessageSerializer" = 10
  101. }
  102. # Provide routing of messages based on cluster metrics.
  103. router.type-mapping {
  104. cluster-metrics-adaptive-pool = "akka.cluster.metrics.AdaptiveLoadBalancingPool"
  105. cluster-metrics-adaptive-group = "akka.cluster.metrics.AdaptiveLoadBalancingGroup"
  106. }
  107. }

Distributed Data

Akka Distributed Data is useful when you need to share data between nodes in an Akka Cluster. The data is accessed with an actor providing a key-value store like API. The keys are unique identifiers with type information of the data values. The values are Conflict Free Replicated Data Types (CRDTs).

All data entries are spread to all nodes, or nodes with a certain role, in the cluster via direct replication and gossip based dissemination. You have fine grained control of the consistency level for reads and writes.

The nature CRDTs makes it possible to perform updates from any node without coordination. Concurrent updates from different nodes will automatically be resolved by the monotonic merge function, which all data types must provide. The state changes always converge. Several useful data types for counters, sets, maps and registers are provided and you can also implement your own custom data types.

It is eventually consistent and geared toward providing high read and write availability (partition tolerance), with low latency. Note that in an eventually consistent system a read may return an out-of-date value.

Warning

This module is marked as “experimental” as of its introduction in Akka 2.4.0. We will continue to improve this API based on our users’ feedback, which implies that while we try to keep incompatible changes to a minimum the binary compatibility guarantee for maintenance releases does not apply to the contents of theakka.persistence package.

Using the Replicator

The akka.cluster.ddata.Replicator actor provides the API for interacting with the data. The Replicator actor must be started on each node in the cluster, or group of nodes tagged with a specific role. It communicates with otherReplicator instances with the same path (without address) that are running on other nodes . For convenience it can be used with the akka.cluster.ddata.DistributedData extension.

Cluster members with status WeaklyUp, if that feature is enabled, will currently not participate in Distributed Data, but that is something that should be possible to add in a future release.

Below is an example of an actor that schedules tick messages to itself and for each tick adds or removes elements from a ORSet (observed-remove set). It also subscribes to changes of this.

  1. import java.util.concurrent.ThreadLocalRandom
  2. import akka.actor.Actor
  3. import akka.actor.ActorLogging
  4. import akka.cluster.Cluster
  5. import akka.cluster.ddata.DistributedData
  6. import akka.cluster.ddata.ORSet
  7. import akka.cluster.ddata.ORSetKey
  8. import akka.cluster.ddata.Replicator
  9. import akka.cluster.ddata.Replicator._
  10.  
  11. object DataBot {
  12. private case object Tick
  13. }
  14.  
  15. class DataBot extends Actor with ActorLogging {
  16. import DataBot._
  17.  
  18. val replicator = DistributedData(context.system).replicator
  19. implicit val node = Cluster(context.system)
  20.  
  21. import context.dispatcher
  22. val tickTask = context.system.scheduler.schedule(5.seconds, 5.seconds, self, Tick)
  23.  
  24. val DataKey = ORSetKey[String]("key")
  25.  
  26. replicator ! Subscribe(DataKey, self)
  27.  
  28. def receive = {
  29. case Tick =>
  30. val s = ThreadLocalRandom.current().nextInt(97, 123).toChar.toString
  31. if (ThreadLocalRandom.current().nextBoolean()) {
  32. // add
  33. log.info("Adding: {}", s)
  34. replicator ! Update(DataKey, ORSet.empty[String], WriteLocal)(_ + s)
  35. } else {
  36. // remove
  37. log.info("Removing: {}", s)
  38. replicator ! Update(DataKey, ORSet.empty[String], WriteLocal)(_ - s)
  39. }
  40.  
  41. case _: UpdateResponse[_] => // ignore
  42.  
  43. case c @ Changed(DataKey) =>
  44. val data = c.get(DataKey)
  45. log.info("Current elements: {}", data.elements)
  46. }
  47.  
  48. override def postStop(): Unit = tickTask.cancel()
  49.  
  50. }

Update

To modify and replicate a data value you send a Replicator.Update message to the local Replicator.

The current data value for the key of the Update is passed as parameter to the modify function of the Update. The function is supposed to return the new value of the data, which will then be replicated according to the given consistency level.

The modify function is called by the Replicator actor and must therefore be a pure function that only uses the data parameter and stable fields from enclosing scope. It must for example not access sender() reference of an enclosing actor.

Update is intended to only be sent from an actor running in same local ActorSystem as
  • the Replicator, because the modify function is typically not serializable.

You supply a write consistency level which has the following meaning:

  1. implicit val node = Cluster(system)
  2. val replicator = DistributedData(system).replicator
  3.  
  4. val Counter1Key = PNCounterKey("counter1")
  5. val Set1Key = GSetKey[String]("set1")
  6. val Set2Key = ORSetKey[String]("set2")
  7. val ActiveFlagKey = FlagKey("active")
  8.  
  9. replicator ! Update(Counter1Key, PNCounter(), WriteLocal)(_ + 1)
  10.  
  11. val writeTo3 = WriteTo(n = 3, timeout = 1.second)
  12. replicator ! Update(Set1Key, GSet.empty[String], writeTo3)(_ + "hello")
  13.  
  14. val writeMajority = WriteMajority(timeout = 5.seconds)
  15. replicator ! Update(Set2Key, ORSet.empty[String], writeMajority)(_ + "hello")
  16.  
  17. val writeAll = WriteAll(timeout = 5.seconds)
  18. replicator ! Update(ActiveFlagKey, Flag.empty, writeAll)(_.switchOn)

As reply of the Update a Replicator.UpdateSuccess is sent to the sender of the Update if the value was successfully replicated according to the supplied consistency level within the supplied timeout. Otherwise aReplicator.UpdateFailure subclass is sent back. Note that a Replicator.UpdateTimeout reply does not mean that the update completely failed or was rolled back. It may still have been replicated to some nodes, and will eventually be replicated to all nodes with the gossip protocol.

  1. case UpdateSuccess(Counter1Key, req) => // ok
  1. case UpdateSuccess(Set1Key, req) => // ok
  2. case UpdateTimeout(Set1Key, req) =>
  3. // write to 3 nodes failed within 1.second

You will always see your own writes. For example if you send two Update messages changing the value of the samekey, the modify function of the second message will see the change that was performed by the first Updatemessage.

In the Update message you can pass an optional request context, which the Replicator does not care about, but is included in the reply messages. This is a convenient way to pass contextual information (e.g. original sender) without having to use ask or maintain local correlation data structures.

  1. implicit val node = Cluster(system)
  2. val replicator = DistributedData(system).replicator
  3. val writeTwo = WriteTo(n = 2, timeout = 3.second)
  4. val Counter1Key = PNCounterKey("counter1")
  5.  
  6. def receive: Receive = {
  7. case "increment" =>
  8. // incoming command to increase the counter
  9. val upd = Update(Counter1Key, PNCounter(), writeTwo, request = Some(sender()))(_ + 1)
  10. replicator ! upd
  11.  
  12. case UpdateSuccess(Counter1Key, Some(replyTo: ActorRef)) =>
  13. replyTo ! "ack"
  14. case UpdateTimeout(Counter1Key, Some(replyTo: ActorRef)) =>
  15. replyTo ! "nack"
  16. }

Get

To retrieve the current value of a data you send Replicator.Get message to the Replicator. You supply a consistency level which has the following meaning:

  1. val replicator = DistributedData(system).replicator
  2. val Counter1Key = PNCounterKey("counter1")
  3. val Set1Key = GSetKey[String]("set1")
  4. val Set2Key = ORSetKey[String]("set2")
  5. val ActiveFlagKey = FlagKey("active")
  6.  
  7. replicator ! Get(Counter1Key, ReadLocal)
  8.  
  9. val readFrom3 = ReadFrom(n = 3, timeout = 1.second)
  10. replicator ! Get(Set1Key, readFrom3)
  11.  
  12. val readMajority = ReadMajority(timeout = 5.seconds)
  13. replicator ! Get(Set2Key, readMajority)
  14.  
  15. val readAll = ReadAll(timeout = 5.seconds)
  16. replicator ! Get(ActiveFlagKey, readAll)

As reply of the Get a Replicator.GetSuccess is sent to the sender of the Get if the value was successfully retrieved according to the supplied consistency level within the supplied timeout. Otherwise aReplicator.GetFailure is sent. If the key does not exist the reply will be Replicator.NotFound.

  1. case g @ GetSuccess(Counter1Key, req) =>
  2. val value = g.get(Counter1Key).value
  3. case NotFound(Counter1Key, req) => // key counter1 does not exist
  1. case g @ GetSuccess(Set1Key, req) =>
  2. val elements = g.get(Set1Key).elements
  3. case GetFailure(Set1Key, req) =>
  4. // read from 3 nodes failed within 1.second
  5. case NotFound(Set1Key, req) => // key set1 does not exist

You will always read your own writes. For example if you send a Update message followed by a Get of the same keythe Get will retrieve the change that was performed by the preceding Update message. However, the order of the reply messages are not defined, i.e. in the previous example you may receive the GetSuccess before theUpdateSuccess.

In the Get message you can pass an optional request context in the same way as for the Update message, described above. For example the original sender can be passed and replied to after receiving and transforming GetSuccess.

  1. implicit val node = Cluster(system)
  2. val replicator = DistributedData(system).replicator
  3. val readTwo = ReadFrom(n = 2, timeout = 3.second)
  4. val Counter1Key = PNCounterKey("counter1")
  5.  
  6. def receive: Receive = {
  7. case "get-count" =>
  8. // incoming request to retrieve current value of the counter
  9. replicator ! Get(Counter1Key, readTwo, request = Some(sender()))
  10.  
  11. case g @ GetSuccess(Counter1Key, Some(replyTo: ActorRef)) =>
  12. val value = g.get(Counter1Key).value.longValue
  13. replyTo ! value
  14. case GetFailure(Counter1Key, Some(replyTo: ActorRef)) =>
  15. replyTo ! -1L
  16. case NotFound(Counter1Key, Some(replyTo: ActorRef)) =>
  17. replyTo ! 0L
  18. }

Consistency

The consistency level that is supplied in the Update and Get specifies per request how many replicas that must respond successfully to a write and read request.

For low latency reads you use ReadLocal with the risk of retrieving stale data, i.e. updates from other nodes might not be visible yet.

When using WriteLocal the update is only written to the local replica and then disseminated in the background with the gossip protocol, which can take few seconds to spread to all nodes.

WriteAll and ReadAll is the strongest consistency level, but also the slowest and with lowest availability. For example, it is enough that one node is unavailable for a Get request and you will not receive the value.

If consistency is important, you can ensure that a read always reflects the most recent write by using the following formula:

  1. (nodes_written + nodes_read) > N

where N is the total number of nodes in the cluster, or the number of nodes with the role that is used for theReplicator.

For example, in a 7 node cluster this these consistency properties are achieved by writing to 4 nodes and reading from 4 nodes, or writing to 5 nodes and reading from 3 nodes.

By combining WriteMajority and ReadMajority levels a read always reflects the most recent write. TheReplicator writes and reads to a majority of replicas, i.e. N / 2 + 1. For example, in a 5 node cluster it writes to 3 nodes and reads from 3 nodes. In a 6 node cluster it writes to 4 nodes and reads from 4 nodes.

Here is an example of using WriteMajority and ReadMajority:

  1. private val timeout = 3.seconds
  2. private val readMajority = ReadMajority(timeout)
  3. private val writeMajority = WriteMajority(timeout)
  1. def receiveGetCart: Receive = {
  2. case GetCart =>
  3. replicator ! Get(DataKey, readMajority, Some(sender()))
  4.  
  5. case g @ GetSuccess(DataKey, Some(replyTo: ActorRef)) =>
  6. val data = g.get(DataKey)
  7. val cart = Cart(data.entries.values.toSet)
  8. replyTo ! cart
  9.  
  10. case NotFound(DataKey, Some(replyTo: ActorRef)) =>
  11. replyTo ! Cart(Set.empty)
  12.  
  13. case GetFailure(DataKey, Some(replyTo: ActorRef)) =>
  14. // ReadMajority failure, try again with local read
  15. replicator ! Get(DataKey, ReadLocal, Some(replyTo))
  16. }
  1. def receiveAddItem: Receive = {
  2. case cmd @ AddItem(item) =>
  3. val update = Update(DataKey, LWWMap.empty[LineItem], writeMajority, Some(cmd)) {
  4. cart => updateCart(cart, item)
  5. }
  6. replicator ! update
  7. }

In some rare cases, when performing an Update it is needed to first try to fetch latest data from other nodes. That can be done by first sending a Get with ReadMajority and then continue with the Update when the GetSuccess,GetFailure or NotFound reply is received. This might be needed when you need to base a decision on latest information or when removing entries from ORSet or ORMap. If an entry is added to an ORSet or ORMap from one node and removed from another node the entry will only be removed if the added entry is visible on the node where the removal is performed (hence the name observed-removed set).

The following example illustrates how to do that:

  1. def receiveRemoveItem: Receive = {
  2. case cmd @ RemoveItem(productId) =>
  3. // Try to fetch latest from a majority of nodes first, since ORMap
  4. // remove must have seen the item to be able to remove it.
  5. replicator ! Get(DataKey, readMajority, Some(cmd))
  6.  
  7. case GetSuccess(DataKey, Some(RemoveItem(productId))) =>
  8. replicator ! Update(DataKey, LWWMap(), writeMajority, None) {
  9. _ - productId
  10. }
  11.  
  12. case GetFailure(DataKey, Some(RemoveItem(productId))) =>
  13. // ReadMajority failed, fall back to best effort local value
  14. replicator ! Update(DataKey, LWWMap(), writeMajority, None) {
  15. _ - productId
  16. }
  17.  
  18. case NotFound(DataKey, Some(RemoveItem(productId))) =>
  19. // nothing to remove
  20. }

Warning

Caveat: Even if you use WriteMajority and ReadMajority there is small risk that you may read stale data if the cluster membership has changed between the Update and the Get. For example, in cluster of 5 nodes when you Update and that change is written to 3 nodes: n1, n2, n3. Then 2 more nodes are added and a Get request is reading from 4 nodes, which happens to be n4, n5, n6, n7, i.e. the value on n1, n2, n3 is not seen in the response of the Get request.

Subscribe

You may also register interest in change notifications by sending Replicator.Subscribe message to theReplicator. It will send Replicator.Changed messages to the registered subscriber when the data for the subscribed key is updated. Subscribers will be notified periodically with the configured notify-subscribers-interval, and it is also possible to send an explicit Replicator.FlushChanges message to the Replicator to notify the subscribers immediately.

The subscriber is automatically removed if the subscriber is terminated. A subscriber can also be deregistered with theReplicator.Unsubscribe message.

  1. val replicator = DistributedData(system).replicator
  2. val Counter1Key = PNCounterKey("counter1")
  3. // subscribe to changes of the Counter1Key value
  4. replicator ! Subscribe(Counter1Key, self)
  5. var currentValue = BigInt(0)
  6.  
  7. def receive: Receive = {
  8. case c @ Changed(Counter1Key) =>
  9. currentValue = c.get(Counter1Key).value
  10. case "get-count" =>
  11. // incoming request to retrieve current value of the counter
  12. sender() ! currentValue
  13. }

Delete

A data entry can be deleted by sending a Replicator.Delete message to the local local Replicator. As reply of theDelete a Replicator.DeleteSuccess is sent to the sender of the Delete if the value was successfully deleted according to the supplied consistency level within the supplied timeout. Otherwise aReplicator.ReplicationDeleteFailure is sent. Note that ReplicationDeleteFailure does not mean that the delete completely failed or was rolled back. It may still have been replicated to some nodes, and may eventually be replicated to all nodes.

A deleted key cannot be reused again, but it is still recommended to delete unused data entries because that reduces the replication overhead when new nodes join the cluster. Subsequent Delete, Update and Get requests will be replied with Replicator.DataDeleted. Subscribers will receive Replicator.DataDeleted.

  1. val replicator = DistributedData(system).replicator
  2. val Counter1Key = PNCounterKey("counter1")
  3. val Set2Key = ORSetKey[String]("set2")
  4.  
  5. replicator ! Delete(Counter1Key, WriteLocal)
  6.  
  7. val writeMajority = WriteMajority(timeout = 5.seconds)
  8. replicator ! Delete(Set2Key, writeMajority)

Warning

As deleted keys continue to be included in the stored data on each node as well as in gossip messages, a continuous series of updates and deletes of top-level entities will result in growing memory usage until an ActorSystem runs out of memory. To use Akka Distributed Data where frequent adds and removes are required, you should use a fixed number of top-level data types that support both updates and removals, for exampleORMap or ORSet.

Data Types

The data types must be convergent (stateful) CRDTs and implement the ReplicatedData trait, i.e. they provide a monotonic merge function and the state changes always converge.

You can use your own custom ReplicatedData types, and several types are provided by this package, such as:

Counters

GCounter is a "grow only counter". It only supports increments, no decrements.

It works in a similar way as a vector clock. It keeps track of one counter per node and the total value is the sum of these counters. The merge is implemented by taking the maximum count for each node.

If you need both increments and decrements you can use the PNCounter (positive/negative counter).

It is tracking the increments (P) separate from the decrements (N). Both P and N are represented as two internalGCounter. Merge is handled by merging the internal P and N counters. The value of the counter is the value of the P counter minus the value of the N counter.

  1. implicit val node = Cluster(system)
  2. val c0 = PNCounter.empty
  3. val c1 = c0 + 1
  4. val c2 = c1 + 7
  5. val c3: PNCounter = c2 - 2
  6. println(c3.value) // 6

Several related counters can be managed in a map with the PNCounterMap data type. When the counters are placed in a PNCounterMap as opposed to placing them as separate top level values they are guaranteed to be replicated together as one unit, which is sometimes necessary for related data.

  1. implicit val node = Cluster(system)
  2. val m0 = PNCounterMap.empty
  3. val m1 = m0.increment("a", 7)
  4. val m2 = m1.decrement("a", 2)
  5. val m3 = m2.increment("b", 1)
  6. println(m3.get("a")) // 5
  7. m3.entries.foreach { case (key, value) => println(s"$key -> $value") }

Sets

If you only need to add elements to a set and not remove elements the GSet (grow-only set) is the data type to use. The elements can be any type of values that can be serialized. Merge is simply the union of the two sets.

  1. val s0 = GSet.empty[String]
  2. val s1 = s0 + "a"
  3. val s2 = s1 + "b" + "c"
  4. if (s2.contains("a"))
  5. println(s2.elements) // a, b, c

If you need add and remove operations you should use the ORSet (observed-remove set). Elements can be added and removed any number of times. If an element is concurrently added and removed, the add will win. You cannot remove an element that you have not seen.

The ORSet has a version vector that is incremented when an element is added to the set. The version for the node that added the element is also tracked for each element in a so called "birth dot". The version vector and the dots are used by the merge function to track causality of the operations and resolve concurrent updates.

  1. implicit val node = Cluster(system)
  2. val s0 = ORSet.empty[String]
  3. val s1 = s0 + "a"
  4. val s2 = s1 + "b"
  5. val s3 = s2 - "a"
  6. println(s3.elements) // b

Maps

ORMap (observed-remove map) is a map with String keys and the values are ReplicatedData types themselves. It supports add, remove and delete any number of times for a map entry.

If an entry is concurrently added and removed, the add will win. You cannot remove an entry that you have not seen. This is the same semantics as for the ORSet.

If an entry is concurrently updated to different values the values will be merged, hence the requirement that the values must be ReplicatedData types.

It is rather inconvenient to use the ORMap directly since it does not expose specific types of the values. The ORMap is intended as a low level tool for building more specific maps, such as the following specialized maps.

ORMultiMap (observed-remove multi-map) is a multi-map implementation that wraps an ORMap with an ORSet for the map's value.

PNCounterMap (positive negative counter map) is a map of named counters. It is a specialized ORMap withPNCounter values.

LWWMap (last writer wins map) is a specialized ORMap with LWWRegister (last writer wins register) values.

  1. implicit val node = Cluster(system)
  2. val m0 = ORMultiMap.empty[Int]
  3. val m1 = m0 + ("a" -> Set(1, 2, 3))
  4. val m2 = m1.addBinding("a", 4)
  5. val m3 = m2.removeBinding("a", 2)
  6. val m4 = m3.addBinding("b", 1)
  7. println(m4.entries)

Note that LWWRegister and therefore LWWMap relies on synchronized clocks and should only be used when the choice of value is not important for concurrent updates occurring within the clock skew.

Instead of using timestamps based on System.currentTimeMillis() time it is possible to use a timestamp value based on something else, for example an increasing version number from a database record that is used for optimistic concurrency control.

When a data entry is changed the full state of that entry is replicated to other nodes, i.e. when you update a map the whole map is replicated. Therefore, instead of using one ORMap with 1000 elements it is more efficient to split that up in 10 top level ORMap entries with 100 elements each. Top level entries are replicated individually, which has the trade-off that different entries may not be replicated at the same time and you may see inconsistencies between related entries. Separate top level entries cannot be updated atomically together.

Flags and Registers

Flag is a data type for a boolean value that is initialized to false and can be switched to true. Thereafter it cannot be changed. true wins over false in merge.

  1. val f0 = Flag.empty
  2. val f1 = f0.switchOn
  3. println(f1.enabled)

LWWRegister (last writer wins register) can hold any (serializable) value.

Merge of a LWWRegister takes the register with highest timestamp. Note that this relies on synchronized clocks.LWWRegister should only be used when the choice of value is not important for concurrent updates occurring within the clock skew.

Merge takes the register updated by the node with lowest address (UniqueAddress is ordered) if the timestamps are exactly the same.

  1. implicit val node = Cluster(system)
  2. val r1 = LWWRegister("Hello")
  3. val r2 = r1.withValue("Hi")
  4. println(s"${r1.value} by ${r1.updatedBy} at ${r1.timestamp}")

Instead of using timestamps based on System.currentTimeMillis() time it is possible to use a timestamp value based on something else, for example an increasing version number from a database record that is used for optimistic concurrency control.

  1. case class Record(version: Int, name: String, address: String)
  2.  
  3. implicit val node = Cluster(system)
  4. implicit val recordClock = new LWWRegister.Clock[Record] {
  5. override def apply(currentTimestamp: Long, value: Record): Long =
  6. value.version
  7. }
  8.  
  9. val record1 = Record(version = 1, "Alice", "Union Square")
  10. val r1 = LWWRegister(record1)
  11.  
  12. val record2 = Record(version = 2, "Alice", "Madison Square")
  13. val r2 = LWWRegister(record2)
  14.  
  15. val r3 = r1.merge(r2)
  16. println(r3.value)

For first-write-wins semantics you can use the LWWRegister#reverseClock instead of theLWWRegister#defaultClock.

Custom Data Type

You can rather easily implement your own data types. The only requirement is that it implements the merge function of the ReplicatedData trait.

A nice property of stateful CRDTs is that they typically compose nicely, i.e. you can combine several smaller data types to build richer data structures. For example, the PNCounter is composed of two internal GCounter instances to keep track of increments and decrements separately.

Here is s simple implementation of a custom TwoPhaseSet that is using two internal GSet types to keep track of addition and removals. A TwoPhaseSet is a set where an element may be added and removed, but never added again thereafter.

  1. case class TwoPhaseSet(
  2. adds: GSet[String] = GSet.empty,
  3. removals: GSet[String] = GSet.empty)
  4. extends ReplicatedData {
  5. type T = TwoPhaseSet
  6.  
  7. def add(element: String): TwoPhaseSet =
  8. copy(adds = adds.add(element))
  9.  
  10. def remove(element: String): TwoPhaseSet =
  11. copy(removals = removals.add(element))
  12.  
  13. def elements: Set[String] = adds.elements diff removals.elements
  14.  
  15. override def merge(that: TwoPhaseSet): TwoPhaseSet =
  16. copy(
  17. adds = this.adds.merge(that.adds),
  18. removals = this.removals.merge(that.removals))
  19. }

Data types should be immutable, i.e. "modifying" methods should return a new instance.

Serialization

The data types must be serializable with an Akka Serializer. It is highly recommended that you implement efficient serialization with Protobuf or similar for your custom data types. The built in data types are marked withReplicatedDataSerialization and serialized withakka.cluster.ddata.protobuf.ReplicatedDataSerializer.

Serialization of the data types are used in remote messages and also for creating message digests (SHA-1) to detect changes. Therefore it is important that the serialization is efficient and produce the same bytes for the same content. For example sets and maps should be sorted deterministically in the serialization.

This is a protobuf representation of the above TwoPhaseSet:

  1. option java_package = "docs.ddata.protobuf.msg";
  2. option optimize_for = SPEED;
  3.  
  4. message TwoPhaseSet {
  5. repeated string adds = 1;
  6. repeated string removals = 2;
  7. }

The serializer for the TwoPhaseSet:

  1. import java.util.ArrayList
  2. import java.util.Collections
  3. import scala.collection.JavaConverters._
  4. import akka.actor.ExtendedActorSystem
  5. import akka.cluster.ddata.GSet
  6. import akka.cluster.ddata.protobuf.SerializationSupport
  7. import akka.serialization.Serializer
  8. import docs.ddata.TwoPhaseSet
  9. import docs.ddata.protobuf.msg.TwoPhaseSetMessages
  10.  
  11. class TwoPhaseSetSerializer(val system: ExtendedActorSystem)
  12. extends Serializer with SerializationSupport {
  13.  
  14. override def includeManifest: Boolean = false
  15.  
  16. override def identifier = 99999
  17.  
  18. override def toBinary(obj: AnyRef): Array[Byte] = obj match {
  19. case m: TwoPhaseSet => twoPhaseSetToProto(m).toByteArray
  20. case _ => throw new IllegalArgumentException(
  21. s"Can't serialize object of type ${obj.getClass}")
  22. }
  23.  
  24. override def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = {
  25. twoPhaseSetFromBinary(bytes)
  26. }
  27.  
  28. def twoPhaseSetToProto(twoPhaseSet: TwoPhaseSet): TwoPhaseSetMessages.TwoPhaseSet = {
  29. val b = TwoPhaseSetMessages.TwoPhaseSet.newBuilder()
  30. // using java collections and sorting for performance (avoid conversions)
  31. val adds = new ArrayList[String]
  32. twoPhaseSet.adds.elements.foreach(adds.add)
  33. if (!adds.isEmpty) {
  34. Collections.sort(adds)
  35. b.addAllAdds(adds)
  36. }
  37. val removals = new ArrayList[String]
  38. twoPhaseSet.removals.elements.foreach(removals.add)
  39. if (!removals.isEmpty) {
  40. Collections.sort(removals)
  41. b.addAllRemovals(removals)
  42. }
  43. b.build()
  44. }
  45.  
  46. def twoPhaseSetFromBinary(bytes: Array[Byte]): TwoPhaseSet = {
  47. val msg = TwoPhaseSetMessages.TwoPhaseSet.parseFrom(bytes)
  48. TwoPhaseSet(
  49. adds = GSet(msg.getAddsList.iterator.asScala.toSet),
  50. removals = GSet(msg.getRemovalsList.iterator.asScala.toSet))
  51. }
  52. }

Note that the elements of the sets are sorted so the SHA-1 digests are the same for the same elements.

You register the serializer in configuration:

  1. akka.actor {
  2. serializers {
  3. two-phase-set = "docs.ddata.protobuf.TwoPhaseSetSerializer"
  4. }
  5. serialization-bindings {
  6. "docs.ddata.TwoPhaseSet" = two-phase-set
  7. }
  8. }

Using compression can sometimes be a good idea to reduce the data size. Gzip compression is provided by theakka.cluster.ddata.protobuf.SerializationSupport trait:

  1. override def toBinary(obj: AnyRef): Array[Byte] = obj match {
  2. case m: TwoPhaseSet => compress(twoPhaseSetToProto(m))
  3. case _ => throw new IllegalArgumentException(
  4. s"Can't serialize object of type ${obj.getClass}")
  5. }
  6.  
  7. override def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = {
  8. twoPhaseSetFromBinary(decompress(bytes))
  9. }

The two embedded GSet can be serialized as illustrated above, but in general when composing new data types from the existing built in types it is better to make use of the existing serializer for those types. This can be done by declaring those as bytes fields in protobuf:

  1. message TwoPhaseSet2 {
  2. optional bytes adds = 1;
  3. optional bytes removals = 2;
  4. }

and use the methods otherMessageToProto and otherMessageFromBinary that are provided by theSerializationSupport trait to serialize and deserialize the GSet instances. This works with any type that has a registered Akka serializer. This is how such an serializer would look like for the TwoPhaseSet:

  1. import akka.actor.ExtendedActorSystem
  2. import akka.cluster.ddata.GSet
  3. import akka.cluster.ddata.protobuf.ReplicatedDataSerializer
  4. import akka.cluster.ddata.protobuf.SerializationSupport
  5. import akka.serialization.Serializer
  6. import docs.ddata.TwoPhaseSet
  7. import docs.ddata.protobuf.msg.TwoPhaseSetMessages
  8.  
  9. class TwoPhaseSetSerializer2(val system: ExtendedActorSystem)
  10. extends Serializer with SerializationSupport {
  11.  
  12. override def includeManifest: Boolean = false
  13.  
  14. override def identifier = 99999
  15.  
  16. val replicatedDataSerializer = new ReplicatedDataSerializer(system)
  17.  
  18. override def toBinary(obj: AnyRef): Array[Byte] = obj match {
  19. case m: TwoPhaseSet => twoPhaseSetToProto(m).toByteArray
  20. case _ => throw new IllegalArgumentException(
  21. s"Can't serialize object of type ${obj.getClass}")
  22. }
  23.  
  24. override def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = {
  25. twoPhaseSetFromBinary(bytes)
  26. }
  27.  
  28. def twoPhaseSetToProto(twoPhaseSet: TwoPhaseSet): TwoPhaseSetMessages.TwoPhaseSet2 = {
  29. val b = TwoPhaseSetMessages.TwoPhaseSet2.newBuilder()
  30. if (!twoPhaseSet.adds.isEmpty)
  31. b.setAdds(otherMessageToProto(twoPhaseSet.adds).toByteString())
  32. if (!twoPhaseSet.removals.isEmpty)
  33. b.setRemovals(otherMessageToProto(twoPhaseSet.removals).toByteString())
  34. b.build()
  35. }
  36.  
  37. def twoPhaseSetFromBinary(bytes: Array[Byte]): TwoPhaseSet = {
  38. val msg = TwoPhaseSetMessages.TwoPhaseSet2.parseFrom(bytes)
  39. val adds =
  40. if (msg.hasAdds)
  41. otherMessageFromBinary(msg.getAdds.toByteArray).asInstanceOf[GSet[String]]
  42. else
  43. GSet.empty[String]
  44. val removals =
  45. if (msg.hasRemovals)
  46. otherMessageFromBinary(msg.getRemovals.toByteArray).asInstanceOf[GSet[String]]
  47. else
  48. GSet.empty[String]
  49. TwoPhaseSet(adds, removals)
  50. }
  51. }

CRDT Garbage

One thing that can be problematic with CRDTs is that some data types accumulate history (garbage). For example aGCounter keeps track of one counter per node. If a GCounter has been updated from one node it will associate the identifier of that node forever. That can become a problem for long running systems with many cluster nodes being added and removed. To solve this problem the Replicator performs pruning of data associated with nodes that have been removed from the cluster. Data types that need pruning have to implement the RemovedNodePruning trait.

Samples

Several interesting samples are included and described in the Lightbend Activator tutorial named Akka Distributed Data Samples with Scala.

Limitations

There are some limitations that you should be aware of.

CRDTs cannot be used for all types of problems, and eventual consistency does not fit all domains. Sometimes you need strong consistency.

It is not intended for Big Data. The number of top level entries should not exceed 100000. When a new node is added to the cluster all these entries are transferred (gossiped) to the new node. The entries are split up in chunks and all existing nodes collaborate in the gossip, but it will take a while (tens of seconds) to transfer all entries and this means that you cannot have too many top level entries. The current recommended limit is 100000. We will be able to improve this if needed, but the design is still not intended for billions of entries.

All data is held in memory, which is another reason why it is not intended for Big Data.

When a data entry is changed the full state of that entry is replicated to other nodes. For example, if you add one element to a Set with 100 existing elements, all 101 elements are transferred to other nodes. This means that you cannot have too large data entries, because then the remote message size will be too large. We might be able to make this more efficient by implementing Efficient State-based CRDTs by Delta-Mutation.

The data is only kept in memory. It is redundant since it is replicated to other nodes in the cluster, but if you stop all nodes the data is lost, unless you have saved it elsewhere. Making the data durable is a possible future feature, but even if we implement that it is not intended to be a full featured database.

Learn More about CRDTs

Dependencies

To use Distributed Data you must add the following dependency in your project.

sbt:

  1. "com.typesafe.akka" %% "akka-distributed-data-experimental" % "2.4.10"

maven:

  1. <dependency>
  2. <groupId>com.typesafe.akka</groupId>
  3. <artifactId>akka-distributed-data-experimental_2.11</artifactId>
  4. <version>2.4.10</version>
  5. </dependency>

Configuration

The DistributedData extension can be configured with the following properties:

  1. # Settings for the DistributedData extension
  2. akka.cluster.distributed-data {
  3. # Actor name of the Replicator actor, /system/ddataReplicator
  4. name = ddataReplicator
  5.  
  6. # Replicas are running on members tagged with this role.
  7. # All members are used if undefined or empty.
  8. role = ""
  9.  
  10. # How often the Replicator should send out gossip information
  11. gossip-interval = 2 s
  12.  
  13. # How often the subscribers will be notified of changes, if any
  14. notify-subscribers-interval = 500 ms
  15.  
  16. # Maximum number of entries to transfer in one gossip message when synchronizing
  17. # the replicas. Next chunk will be transferred in next round of gossip.
  18. max-delta-elements = 1000
  19. # The id of the dispatcher to use for Replicator actors. If not specified
  20. # default dispatcher is used.
  21. # If specified you need to define the settings of the actual dispatcher.
  22. use-dispatcher = ""
  23.  
  24. # How often the Replicator checks for pruning of data associated with
  25. # removed cluster nodes.
  26. pruning-interval = 30 s
  27. # How long time it takes (worst case) to spread the data to all other replica nodes.
  28. # This is used when initiating and completing the pruning process of data associated
  29. # with removed cluster nodes. The time measurement is stopped when any replica is
  30. # unreachable, so it should be configured to worst case in a healthy cluster.
  31. max-pruning-dissemination = 60 s
  32. # Serialized Write and Read messages are cached when they are sent to
  33. # several nodes. If no further activity they are removed from the cache
  34. # after this duration.
  35. serializer-cache-time-to-live = 10s
  36. }

Remoting

For an introduction of remoting capabilities of Akka please see Location Transparency.

Note

As explained in that chapter Akka remoting is designed for communication in a peer-to-peer fashion and it has limitations for client-server setups. In particular Akka Remoting does not work transparently with Network Address Translation, Load Balancers, or in Docker containers. For symmetric communication in these situations network and/or Akka configuration will have to be changed as described in Akka behind NAT or in a Docker container.

Preparing your ActorSystem for Remoting

The Akka remoting is a separate jar file. Make sure that you have the following dependency in your project:

  1. "com.typesafe.akka" %% "akka-remote" % "2.4.10"

To enable remote capabilities in your Akka project you should, at a minimum, add the following changes to yourapplication.conf file:

  1. akka {
  2. actor {
  3. provider = "akka.remote.RemoteActorRefProvider"
  4. }
  5. remote {
  6. enabled-transports = ["akka.remote.netty.tcp"]
  7. netty.tcp {
  8. hostname = "127.0.0.1"
  9. port = 2552
  10. }
  11. }
  12. }

As you can see in the example above there are four things you need to add to get started:

Note

The port number needs to be unique for each actor system on the same machine even if the actor systems have different names. This is because each actor system has its own networking subsystem listening for connections and handling messages as not to interfere with other actor systems.

The example above only illustrates the bare minimum of properties you have to add to enable remoting. All settings are described in Remote Configuration.

Types of Remote Interaction

Akka has two ways of using remoting:

In the next sections the two alternatives are described in detail.

Looking up Remote Actors

actorSelection(path) will obtain an ActorSelection to an Actor on a remote node, e.g.:

  1. val selection =
  2. context.actorSelection("akka.tcp://actorSystemName@10.0.0.1:2552/user/actorName")

As you can see from the example above the following pattern is used to find an actor on a remote node:

  1. akka.<protocol>://<actor system>@<hostname>:<port>/<actor path>

Once you obtained a selection to the actor you can interact with it in the same way you would with a local actor, e.g.:

  1. selection ! "Pretty awesome feature"

To acquire an ActorRef for an ActorSelection you need to send a message to the selection and use the senderreference of the reply from the actor. There is a built-in Identify message that all Actors will understand and automatically reply to with a ActorIdentity message containing the ActorRef. This can also be done with theresolveOne method of the ActorSelection, which returns a Future of the matching ActorRef.

Note

For more details on how actor addresses and paths are formed and used, please refer to Actor References, Paths and Addresses.

Note

Message sends to actors that are actually in the sending actor system do not get delivered via the remote actor ref provider. They're delivered directly, by the local actor ref provider.

Aside from providing better performance, this also means that if the hostname you configure remoting to listen as cannot actually be resolved from within the very same actor system, such messages will (perhaps counterintuitively) be delivered just fine.

Creating Actors Remotely

If you want to use the creation functionality in Akka remoting you have to further amend the application.conf file in the following way (only showing deployment section):

  1. akka {
  2. actor {
  3. deployment {
  4. /sampleActor {
  5. remote = "akka.tcp://sampleActorSystem@127.0.0.1:2553"
  6. }
  7. }
  8. }
  9. }

The configuration above instructs Akka to react when an actor with path /sampleActor is created, i.e. usingsystem.actorOf(Props(...), "sampleActor"). This specific actor will not be directly instantiated, but instead the remote daemon of the remote system will be asked to create the actor, which in this sample corresponds tosampleActorSystem@127.0.0.1:2553.

Once you have configured the properties above you would do the following in code:

  1. val actor = system.actorOf(Props[SampleActor], "sampleActor")
  2. actor ! "Pretty slick"

The actor class SampleActor has to be available to the runtimes using it, i.e. the classloader of the actor systems has to have a JAR containing the class.

Note

In order to ensure serializability of Props when passing constructor arguments to the actor being created, do not make the factory an inner class: this will inherently capture a reference to its enclosing object, which in most cases is not serializable. It is best to create a factory method in the companion object of the actor’s class.

Serializability of all Props can be tested by setting the configuration item akka.actor.serialize-creators=on. Only Props whose deploy has LocalScope are exempt from this check.

Note

You can use asterisks as wildcard matches for the actor paths, so you could specify: /*/sampleActor and that would match all sampleActor on that level in the hierarchy. You can also use wildcard in the last position to match all actors at a certain level: /someParent/*. Non-wildcard matches always have higher priority to match than wildcards, so: /foo/bar is considered more specific than /foo/* and only the highest priority match is used. Please note that it cannot be used to partially match section, like this: /foo*/bar, /f*o/bar etc.

Programmatic Remote Deployment

To allow dynamically deployed systems, it is also possible to include deployment configuration in the Props which are used to create an actor: this information is the equivalent of a deployment section from the configuration file, and if both are given, the external configuration takes precedence.

With these imports:

  1. import akka.actor.{ Props, Deploy, Address, AddressFromURIString }
  2. import akka.remote.RemoteScope

and a remote address like this:

  1. val one = AddressFromURIString("akka.tcp://sys@host:1234")
  2. val two = Address("akka.tcp", "sys", "host", 1234) // this gives the same

you can advise the system to create a child on that remote node like so:

  1. val ref = system.actorOf(Props[SampleActor].
  2. withDeploy(Deploy(scope = RemoteScope(address))))

Lifecycle and Failure Recovery Model

../_images/association_lifecycle1.png

Each link with a remote system can be in one of the four states as illustrated above. Before any communication happens with a remote system at a given Address the state of the association is Idle. The first time a message is attempted to be sent to the remote system or an inbound connection is accepted the state of the link transitions to Activedenoting that the two systems has messages to send or receive and no failures were encountered so far. When a communication failure happens and the connection is lost between the two systems the link becomes Gated.

In this state the system will not attempt to connect to the remote host and all outbound messages will be dropped. The time while the link is in the Gated state is controlled by the setting akka.remote.retry-gate-closed-for: after this time elapses the link state transitions to Idle again. Gate is one-sided in the sense that whenever a successfulinbound connection is accepted from a remote system during Gate it automatically transitions to Active and communication resumes immediately.

In the face of communication failures that are unrecoverable because the state of the participating systems are inconsistent, the remote system becomes Quarantined. Unlike Gate, quarantining is permanent and lasts until one of the systems is restarted. After a restart communication can be resumed again and the link can become Activeagain.

Watching Remote Actors

Watching a remote actor is not different than watching a local actor, as described in Lifecycle Monitoring aka DeathWatch.

Failure Detector

Under the hood remote death watch uses heartbeat messages and a failure detector to generate Terminated message from network failures and JVM crashes, in addition to graceful termination of watched actor.

The heartbeat arrival times is interpreted by an implementation of The Phi Accrual Failure Detector.

The suspicion level of failure is given by a value called phi. The basic idea of the phi failure detector is to express the value of phi on a scale that is dynamically adjusted to reflect current network conditions.

The value of phi is calculated as:

  1. phi = -log10(1 - F(timeSinceLastHeartbeat))

where F is the cumulative distribution function of a normal distribution with mean and standard deviation estimated from historical heartbeat inter-arrival times.

In the Remote Configuration you can adjust the akka.remote.watch-failure-detector.threshold to define when a phi value is considered to be a failure.

A low threshold is prone to generate many false positives but ensures a quick detection in the event of a real crash. Conversely, a high threshold generates fewer mistakes but needs more time to detect actual crashes. The defaultthreshold is 10 and is appropriate for most situations. However in cloud environments, such as Amazon EC2, the value could be increased to 12 in order to account for network issues that sometimes occur on such platforms.

The following chart illustrates how phi increase with increasing time since the previous heartbeat.

../_images/phi11.png

Phi is calculated from the mean and standard deviation of historical inter arrival times. The previous chart is an example for standard deviation of 200 ms. If the heartbeats arrive with less deviation the curve becomes steeper, i.e. it is possible to determine failure more quickly. The curve looks like this for a standard deviation of 100 ms.

../_images/phi21.png

To be able to survive sudden abnormalities, such as garbage collection pauses and transient network failures the failure detector is configured with a margin, akka.remote.watch-failure-detector.acceptable-heartbeat-pause. You may want to adjust the Remote Configuration of this depending on you environment. This is how the curve looks like foracceptable-heartbeat-pause configured to 3 seconds.

../_images/phi31.png

Serialization

When using remoting for actors you must ensure that the props and messages used for those actors are serializable. Failing to do so will cause the system to behave in an unintended way.

For more information please see Serialization.

Routers with Remote Destinations

It is absolutely feasible to combine remoting with Routing.

A pool of remote deployed routees can be configured as:

  1. akka.actor.deployment {
  2. /parent/remotePool {
  3. router = round-robin-pool
  4. nr-of-instances = 10
  5. target.nodes = ["akka.tcp://app@10.0.0.2:2552", "akka://app@10.0.0.3:2552"]
  6. }
  7. }

This configuration setting will clone the actor defined in the Props of the remotePool 10 times and deploy it evenly distributed across the two given target nodes.

A group of remote actors can be configured as:

  1. akka.actor.deployment {
  2. /parent/remoteGroup {
  3. router = round-robin-group
  4. routees.paths = [
  5. "akka.tcp://app@10.0.0.1:2552/user/workers/w1",
  6. "akka.tcp://app@10.0.0.2:2552/user/workers/w1",
  7. "akka.tcp://app@10.0.0.3:2552/user/workers/w1"]
  8. }
  9. }

This configuration setting will send messages to the defined remote actor paths. It requires that you create the destination actors on the remote nodes with matching paths. That is not done by the router.

Remoting Sample

There is a more extensive remote example that comes with Lightbend Activator. The tutorial named Akka Remote Samples with Scala demonstrates both remote deployment and look-up of remote actors.

Pluggable transport support

Akka can be configured to use various transports to communicate with remote systems. The core component of this feature is the akka.remote.transport.Transport SPI. Transport implementations must extend this trait. Transports can be loaded by setting the akka.remote.enabled-transports configuration key to point to one or more configuration sections containing driver descriptions.

An example of setting up the default Netty based SSL driver as default:

  1. akka {
  2. remote {
  3. enabled-transports = [akka.remote.netty.ssl]
  4.  
  5. netty.ssl.security {
  6. key-store = "mykeystore"
  7. trust-store = "mytruststore"
  8. key-store-password = "changeme"
  9. key-password = "changeme"
  10. trust-store-password = "changeme"
  11. protocol = "TLSv1.2"
  12. random-number-generator = "AES128CounterSecureRNG"
  13. enabled-algorithms = [TLS_RSA_WITH_AES_128_CBC_SHA]
  14. }
  15. }
  16. }

An example of setting up a custom transport implementation:

  1. akka {
  2. remote {
  3. applied-transports = ["akka.remote.mytransport"]
  4.  
  5. mytransport {
  6. # The transport-class configuration entry is required, and
  7. # it must contain the fully qualified name of the transport
  8. # implementation
  9. transport-class = "my.package.MyTransport"
  10.  
  11. # It is possible to decorate Transports with additional services.
  12. # Adapters should be registered in the "adapters" sections to
  13. # be able to apply them to transports
  14. applied-adapters = []
  15.  
  16. # Driver specific configuration options has to be in the same
  17. # section:
  18. some-config = foo
  19. another-config = bar
  20. }

Remote Events

It is possible to listen to events that occur in Akka Remote, and to subscribe/unsubscribe to these events you simply register as listener to the below described types in on the ActorSystem.eventStream.

Note

To subscribe to any remote event, subscribe to RemotingLifecycleEvent. To subscribe to events related only to the lifecycle of associations, subscribe to akka.remote.AssociationEvent.

Note

The use of term "Association" instead of "Connection" reflects that the remoting subsystem may use connectionless transports, but an association similar to transport layer connections is maintained between endpoints by the Akka protocol.

By default an event listener is registered which logs all of the events described below. This default was chosen to help setting up a system, but it is quite common to switch this logging off once that phase of the project is finished.

Note

In order to switch off the logging, set akka.remote.log-remote-lifecycle-events = off in yourapplication.conf.

To be notified when an association is over ("disconnected") listen to DisassociatedEvent which holds the direction of the association (inbound or outbound) and the addresses of the involved parties.

To be notified when an association is successfully established ("connected") listen to AssociatedEvent which holds the direction of the association (inbound or outbound) and the addresses of the involved parties.

To intercept errors directly related to associations, listen to AssociationErrorEvent which holds the direction of the association (inbound or outbound), the addresses of the involved parties and the Throwable cause.

To be notified when the remoting subsystem is ready to accept associations, listen to RemotingListenEvent which contains the addresses the remoting listens on.

To be notified when the current system is quarantined by the remote system, listen toThisActorSystemQuarantinedEvent, which includes the addresses of local and remote ActorSystems.

To be notified when the remoting subsystem has been shut down, listen to RemotingShutdownEvent.

To intercept generic remoting related errors, listen to RemotingErrorEvent which holds the Throwable cause.

Remote Security

Akka provides a couple of ways to enhance security between remote nodes (client/server):

Untrusted Mode

As soon as an actor system can connect to another remotely, it may in principle send any possible message to any actor contained within that remote system. One example may be sending a PoisonPill to the system guardian, shutting that system down. This is not always desired, and it can be disabled with the following setting:

  1. akka.remote.untrusted-mode = on

This disallows sending of system messages (actor life-cycle commands, DeathWatch, etc.) and any message extendingPossiblyHarmful to the system on which this flag is set. Should a client send them nonetheless they are dropped and logged (at DEBUG level in order to reduce the possibilities for a denial of service attack). PossiblyHarmful covers the predefined messages like PoisonPill and Kill, but it can also be added as a marker trait to user-defined messages.

Messages sent with actor selection are by default discarded in untrusted mode, but permission to receive actor selection messages can be granted to specific actors defined in configuration:

  1. akka.remote.trusted-selection-paths = ["/user/receptionist", "/user/namingService"]

The actual message must still not be of type PossiblyHarmful.

In summary, the following operations are ignored by a system configured in untrusted mode when incoming via the remoting layer:

Note

Enabling the untrusted mode does not remove the capability of the client to freely choose the target of its message sends, which means that messages not prohibited by the above rules can be sent to any actor in the remote system. It is good practice for a client-facing system to only contain a well-defined set of entry point actors, which then forward requests (possibly after performing validation) to another actor system containing the actual worker actors. If messaging between these two server-side systems is done using local ActorRef (they can be exchanged safely between actor systems within the same JVM), you can restrict the messages on this interface by marking them PossiblyHarmful so that a client cannot forge them.

SSL

SSL can be used as the remote transport by adding akka.remote.netty.ssl to the enabled-transportconfiguration section. See a description of the settings in the Remote Configuration section.

The SSL support is implemented with Java Secure Socket Extension, please consult the official Java Secure Socket Extension documentation and related resources for troubleshooting.

Note

When using SHA1PRNG on Linux it's recommended specify -Djava.security.egd=file:/dev/./urandom as argument to the JVM to prevent blocking. It is NOT as secure because it reuses the seed. Use '/dev/./urandom', not '/dev/urandom' as that doesn't work according to Bug ID: 6202721.

Remote Configuration

There are lots of configuration properties that are related to remoting in Akka. We refer to the reference configuration for more information.

Note

Setting properties like the listening IP and port number programmatically is best done by using something like the following:

  1. ConfigFactory.parseString("akka.remote.netty.tcp.hostname=\"1.2.3.4\"")
  2. .withFallback(ConfigFactory.load());

Akka behind NAT or in a Docker container

In setups involving Network Address Translation (NAT), Load Balancers or Docker containers the hostname and port pair that Akka binds to will be different than the "logical" host name and port pair that is used to connect to the system from the outside. This requires special configuration that sets both the logical and the bind pairs for remoting.

  1. akka {
  2. remote {
  3. netty.tcp {
  4. hostname = my.domain.com # external (logical) hostname
  5. port = 8000 # external (logical) port
  6.  
  7. bind-hostname = local.address # internal (bind) hostname
  8. bind-port = 2552 # internal (bind) port
  9. }
  10. }
  11. }

Serialization

Akka has a built-in Extension for serialization, and it is both possible to use the built-in serializers and to write your own.

The serialization mechanism is both used by Akka internally to serialize messages, and available for ad-hoc serialization of whatever you might need it for.

Usage

Configuration

For Akka to know which Serializer to use for what, you need edit your Configuration, in the "akka.actor.serializers"-section you bind names to implementations of the akka.serialization.Serializer you wish to use, like this:

  1. akka {
  2. actor {
  3. serializers {
  4. java = "akka.serialization.JavaSerializer"
  5. proto = "akka.remote.serialization.ProtobufSerializer"
  6. myown = "docs.serialization.MyOwnSerializer"
  7. }
  8. }
  9. }

After you've bound names to different implementations of Serializer you need to wire which classes should be serialized using which Serializer, this is done in the "akka.actor.serialization-bindings"-section:

  1. akka {
  2. actor {
  3. serializers {
  4. java = "akka.serialization.JavaSerializer"
  5. proto = "akka.remote.serialization.ProtobufSerializer"
  6. myown = "docs.serialization.MyOwnSerializer"
  7. }
  8.  
  9. serialization-bindings {
  10. "java.lang.String" = java
  11. "docs.serialization.Customer" = java
  12. "com.google.protobuf.Message" = proto
  13. "docs.serialization.MyOwnSerializable" = myown
  14. "java.lang.Boolean" = myown
  15. }
  16. }
  17. }

You only need to specify the name of an interface or abstract base class of the messages. In case of ambiguity, i.e. the message implements several of the configured classes, the most specific configured class will be used, i.e. the one of which all other candidates are superclasses. If this condition cannot be met, because e.g. java.io.Serializable andMyOwnSerializable both apply and neither is a subtype of the other, a warning will be issued

Note

If your messages are contained inside of a Scala object, then in order to reference those messages, you will need use the fully qualified Java class name. For a message named Message contained inside the object namedWrapper you would need to reference it as Wrapper$Message instead of Wrapper.Message.

Akka provides serializers for java.io.Serializable and protobuf com.google.protobuf.GeneratedMessage by default (the latter only if depending on the akka-remote module), so normally you don't need to add configuration for that; since com.google.protobuf.GeneratedMessage implements java.io.Serializable, protobuf messages will always be serialized using the protobuf protocol unless specifically overridden. In order to disable a default serializer, map its marker type to “none”:

  1. akka.actor.serialization-bindings {
  2. "java.io.Serializable" = none
  3. }

Verification

If you want to verify that your messages are serializable you can enable the following config option:

  1. akka {
  2. actor {
  3. serialize-messages = on
  4. }
  5. }

Warning

We only recommend using the config option turned on when you're running tests. It is completely pointless to have it turned on in other scenarios.

If you want to verify that your Props are serializable you can enable the following config option:

  1. akka {
  2. actor {
  3. serialize-creators = on
  4. }
  5. }

Warning

We only recommend using the config option turned on when you're running tests. It is completely pointless to have it turned on in other scenarios.

Programmatic

If you want to programmatically serialize/deserialize using Akka Serialization, here's some examples:

  1. import akka.actor.{ ActorRef, ActorSystem }
  2. import akka.serialization._
  3. import com.typesafe.config.ConfigFactory
  4.  
  5. val system = ActorSystem("example")
  6.  
  7. // Get the Serialization Extension
  8. val serialization = SerializationExtension(system)
  9.  
  10. // Have something to serialize
  11. val original = "woohoo"
  12.  
  13. // Find the Serializer for it
  14. val serializer = serialization.findSerializerFor(original)
  15.  
  16. // Turn it into bytes
  17. val bytes = serializer.toBinary(original)
  18.  
  19. // Turn it back into an object
  20. val back = serializer.fromBinary(bytes, manifest = None)
  21.  
  22. // Voilá!
  23. back should be(original)

For more information, have a look at the ScalaDoc for akka.serialization._

Customization

So, lets say that you want to create your own Serializer, you saw the docs.serialization.MyOwnSerializer in the config example above?

Creating new Serializers

First you need to create a class definition of your Serializer like so:

  1. import akka.actor.{ ActorRef, ActorSystem }
  2. import akka.serialization._
  3. import com.typesafe.config.ConfigFactory
  4.  
  5. class MyOwnSerializer extends Serializer {
  6.  
  7. // This is whether "fromBinary" requires a "clazz" or not
  8. def includeManifest: Boolean = true
  9.  
  10. // Pick a unique identifier for your Serializer,
  11. // you've got a couple of billions to choose from,
  12. // 0 - 16 is reserved by Akka itself
  13. def identifier = 1234567
  14.  
  15. // "toBinary" serializes the given object to an Array of Bytes
  16. def toBinary(obj: AnyRef): Array[Byte] = {
  17. // Put the code that serializes the object here
  18. // ... ...
  19. }
  20.  
  21. // "fromBinary" deserializes the given array,
  22. // using the type hint (if any, see "includeManifest" above)
  23. def fromBinary(
  24. bytes: Array[Byte],
  25. clazz: Option[Class[_]]): AnyRef = {
  26. // Put your code that deserializes here
  27. // ... ...
  28. }
  29. }

The manifest is a type hint so that the same serializer can be used for different classes. The manifest parameter infromBinary is the class of the object that was serialized. In fromBinary you can match on the class and deserialize the bytes to different objects.

Then you only need to fill in the blanks, bind it to a name in your Configuration and then list which classes that should be serialized using it.

Serializer with String Manifest

The Serializer illustrated above supports a class based manifest (type hint). For serialization of data that need to evolve over time the SerializerWithStringManifest is recommended instead of Serializer because the manifest (type hint) is a String instead of a Class. That means that the class can be moved/removed and the serializer can still deserialize old data by matching on the String. This is especially useful for Persistence.

The manifest string can also encode a version number that can be used in fromBinary to deserialize in different ways to migrate old data to new domain objects.

If the data was originally serialized with Serializer and in a later version of the system you change toSerializerWithStringManifest the manifest string will be the full class name if you usedincludeManifest=true, otherwise it will be the empty string.

This is how a SerializerWithStringManifest looks like:

  1. class MyOwnSerializer2 extends SerializerWithStringManifest {
  2.  
  3. val CustomerManifest = "customer"
  4. val UserManifest = "user"
  5. val UTF_8 = StandardCharsets.UTF_8.name()
  6.  
  7. // Pick a unique identifier for your Serializer,
  8. // you've got a couple of billions to choose from,
  9. // 0 - 16 is reserved by Akka itself
  10. def identifier = 1234567
  11.  
  12. // The manifest (type hint) that will be provided in the fromBinary method
  13. // Use `""` if manifest is not needed.
  14. def manifest(obj: AnyRef): String =
  15. obj match {
  16. case _: Customer => CustomerManifest
  17. case _: User => UserManifest
  18. }
  19.  
  20. // "toBinary" serializes the given object to an Array of Bytes
  21. def toBinary(obj: AnyRef): Array[Byte] = {
  22. // Put the real code that serializes the object here
  23. obj match {
  24. case Customer(name) => name.getBytes(UTF_8)
  25. case User(name) => name.getBytes(UTF_8)
  26. }
  27. }
  28.  
  29. // "fromBinary" deserializes the given array,
  30. // using the type hint
  31. def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = {
  32. // Put the real code that deserializes here
  33. manifest match {
  34. case CustomerManifest =>
  35. Customer(new String(bytes, UTF_8))
  36. case UserManifest =>
  37. User(new String(bytes, UTF_8))
  38. }
  39. }
  40. }

You must also bind it to a name in your Configuration and then list which classes that should be serialized using it.

Serializing ActorRefs

All ActorRefs are serializable using JavaSerializer, but in case you are writing your own serializer, you might want to know how to serialize and deserialize them properly. In the general case, the local address to be used depends on the type of remote address which shall be the recipient of the serialized information. UseSerialization.serializedActorPath(actorRef) like this:

  1. import akka.actor.{ ActorRef, ActorSystem }
  2. import akka.serialization._
  3. import com.typesafe.config.ConfigFactory
  4.  
  5. // Serialize
  6. // (beneath toBinary)
  7. val identifier: String = Serialization.serializedActorPath(theActorRef)
  8.  
  9. // Then just serialize the identifier however you like
  10.  
  11. // Deserialize
  12. // (beneath fromBinary)
  13. val deserializedActorRef = extendedSystem.provider.resolveActorRef(identifier)
  14. // Then just use the ActorRef

This assumes that serialization happens in the context of sending a message through the remote transport. There are other uses of serialization, though, e.g. storing actor references outside of an actor application (database, etc.). In this case, it is important to keep in mind that the address part of an actor’s path determines how that actor is communicated with. Storing a local actor path might be the right choice if the retrieval happens in the same logical context, but it is not enough when deserializing it on a different network host: for that it would need to include the system’s remote transport address. An actor system is not limited to having just one remote transport per se, which makes this question a bit more interesting. To find out the appropriate address to use when sending to remoteAddr you can useActorRefProvider.getExternalAddressFor(remoteAddr) like this:

  1. object ExternalAddress extends ExtensionKey[ExternalAddressExt]
  2.  
  3. class ExternalAddressExt(system: ExtendedActorSystem) extends Extension {
  4. def addressFor(remoteAddr: Address): Address =
  5. system.provider.getExternalAddressFor(remoteAddr) getOrElse
  6. (throw new UnsupportedOperationException("cannot send to " + remoteAddr))
  7. }
  8.  
  9. def serializeTo(ref: ActorRef, remote: Address): String =
  10. ref.path.toSerializationFormatWithAddress(ExternalAddress(extendedSystem).
  11. addressFor(remote))

Note

ActorPath.toSerializationFormatWithAddress differs from toString if the address does not already havehost and port components, i.e. it only inserts address information for local addresses.

toSerializationFormatWithAddress also adds the unique id of the actor, which will change when the actor is stopped and then created again with the same name. Sending messages to a reference pointing the old actor will not be delivered to the new actor. If you don't want this behavior, e.g. in case of long term storage of the reference, you can use toStringWithAddress, which doesn't include the unique id.

This requires that you know at least which type of address will be supported by the system which will deserialize the resulting actor reference; if you have no concrete address handy you can create a dummy one for the right protocol using Address(protocol, "", "", 0) (assuming that the actual transport used is as lenient as Akka’s RemoteActorRefProvider).

There is also a default remote address which is the one used by cluster support (and typical systems have just this one); you can get it like this:

  1. object ExternalAddress extends ExtensionKey[ExternalAddressExt]
  2.  
  3. class ExternalAddressExt(system: ExtendedActorSystem) extends Extension {
  4. def addressForAkka: Address = system.provider.getDefaultAddress
  5. }
  6.  
  7. def serializeAkkaDefault(ref: ActorRef): String =
  8. ref.path.toSerializationFormatWithAddress(ExternalAddress(theActorSystem).
  9. addressForAkka)

Deep serialization of Actors

The recommended approach to do deep serialization of internal actor state is to use Akka Persistence.

A Word About Java Serialization

When using Java serialization without employing the JavaSerializer for the task, you must make sure to supply a valid ExtendedActorSystem in the dynamic variable JavaSerializer.currentSystem. This is used when reading in the representation of an ActorRef for turning the string representation into a real reference. DynamicVariable is a thread-local variable, so be sure to have it set while deserializing anything which might contain actor references.

External Akka Serializers

Akka-protostuff by Roman Levenstein

Akka-quickser by Roman Levenstein

Akka-kryo by Roman Levenstein

Twitter Chill Scala extensions for Kryo (based on Akka Version 2.3.x but due to backwards compatibility of the Serializer Interface this extension also works with 2.4.x)

I/O

Introduction

The akka.io package has been developed in collaboration between the Akka and spray.io teams. Its design combines experiences from the spray-io module with improvements that were jointly developed for more general consumption as an actor-based service.

The guiding design goal for this I/O implementation was to reach extreme scalability, make no compromises in providing an API correctly matching the underlying transport mechanism and to be fully event-driven, non-blocking and asynchronous. The API is meant to be a solid foundation for the implementation of network protocols and building higher abstractions; it is not meant to be a full-service high-level NIO wrapper for end users.

Terminology, Concepts

The I/O API is completely actor based, meaning that all operations are implemented with message passing instead of direct method calls. Every I/O driver (TCP, UDP) has a special actor, called a manager that serves as an entry point for the API. I/O is broken into several drivers. The manager for a particular driver is accessible through the IO entry point. For example the following code looks up the TCP manager and returns its ActorRef:

  1. import akka.io.{ IO, Tcp }
  2. import context.system // implicitly used by IO(Tcp)
  3.  
  4. val manager = IO(Tcp)

The manager receives I/O command messages and instantiates worker actors in response. The worker actors present themselves to the API user in the reply to the command that was sent. For example after a Connect command sent to the TCP manager the manager creates an actor representing the TCP connection. All operations related to the given TCP connections can be invoked by sending messages to the connection actor which announces itself by sending aConnected message.

DeathWatch and Resource Management

I/O worker actors receive commands and also send out events. They usually need a user-side counterpart actor listening for these events (such events could be inbound connections, incoming bytes or acknowledgements for writes). These worker actors watch their listener counterparts. If the listener stops then the worker will automatically release any resources that it holds. This design makes the API more robust against resource leaks.

Thanks to the completely actor based approach of the I/O API the opposite direction works as well: a user actor responsible for handling a connection can watch the connection actor to be notified if it unexpectedly terminates.

Write models (Ack, Nack)

I/O devices have a maximum throughput which limits the frequency and size of writes. When an application tries to push more data than a device can handle, the driver has to buffer bytes until the device is able to write them. With buffering it is possible to handle short bursts of intensive writes --- but no buffer is infinite. "Flow control" is needed to avoid overwhelming device buffers.

Akka supports two types of flow control:

Each of these models is available in both the TCP and the UDP implementations of Akka I/O.

Individual writes can be acknowledged by providing an ack object in the write message (Write in the case of TCP andSend for UDP). When the write is complete the worker will send the ack object to the writing actor. This can be used to implement ack-based flow control; sending new data only when old data has been acknowledged.

If a write (or any other command) fails, the driver notifies the actor that sent the command with a special message (CommandFailed in the case of UDP and TCP). This message will also notify the writer of a failed write, serving as a nack for that write. Please note, that in a nack-based flow-control setting the writer has to be prepared for the fact that the failed write might not be the most recent write it sent. For example, the failure notification for a write W1 might arrive after additional write commands W2 and W3 have been sent. If the writer wants to resend any nacked messages it may need to keep a buffer of pending messages.

Warning

An acknowledged write does not mean acknowledged delivery or storage; receiving an ack for a write simply signals that the I/O driver has successfully processed the write. The Ack/Nack protocol described here is a means of flow control not error handling. In other words, data may still be lost, even if every write is acknowledged.

ByteString

To maintain isolation, actors should communicate with immutable objects only. ByteString is an immutable container for bytes. It is used by Akka's I/O system as an efficient, immutable alternative the traditional byte containers used for I/O on the JVM, such as Array[Byte] and ByteBuffer.

ByteString is a rope-like data structure that is immutable and provides fast concatenation and slicing operations (perfect for I/O). When two ByteStrings are concatenated together they are both stored within the resultingByteString instead of copying both to a new Array. Operations such as drop and take return ByteStrings that still reference the original Array, but just change the offset and length that is visible. Great care has also been taken to make sure that the internal Array cannot be modified. Whenever a potentially unsafe Array is used to create a new ByteString a defensive copy is created. If you require a ByteString that only blocks as much memory as necessary for it's content, use the compact method to get a CompactByteString instance. If the ByteStringrepresented only a slice of the original array, this will result in copying all bytes in that slice.

ByteString inherits all methods from IndexedSeq, and it also has some new ones. For more information, look up the akka.util.ByteString class and it's companion object in the ScalaDoc.

ByteString also comes with its own optimized builder and iterator classes ByteStringBuilder andByteIterator which provide extra features in addition to those of normal builders and iterators.

Compatibility with java.io

A ByteStringBuilder can be wrapped in a java.io.OutputStream via the asOutputStream method. Likewise,ByteIterator can be wrapped in a java.io.InputStream via asInputStream. Using these, akka.ioapplications can integrate legacy code based on java.io streams.

Architecture in-depth

For further details on the design and internal architecture see I/O Layer Design.

Using TCP

The code snippets through-out this section assume the following imports:

  1. import akka.actor.{ Actor, ActorRef, Props }
  2. import akka.io.{ IO, Tcp }
  3. import akka.util.ByteString
  4. import java.net.InetSocketAddress

All of the Akka I/O APIs are accessed through manager objects. When using an I/O API, the first step is to acquire a reference to the appropriate manager. The code below shows how to acquire a reference to the Tcp manager.

  1. import akka.io.{ IO, Tcp }
  2. import context.system // implicitly used by IO(Tcp)
  3.  
  4. val manager = IO(Tcp)

The manager is an actor that handles the underlying low level I/O resources (selectors, channels) and instantiates workers for specific tasks, such as listening to incoming connections.

Connecting

  1. object Client {
  2. def props(remote: InetSocketAddress, replies: ActorRef) =
  3. Props(classOf[Client], remote, replies)
  4. }
  5.  
  6. class Client(remote: InetSocketAddress, listener: ActorRef) extends Actor {
  7.  
  8. import Tcp._
  9. import context.system
  10.  
  11. IO(Tcp) ! Connect(remote)
  12.  
  13. def receive = {
  14. case CommandFailed(_: Connect) =>
  15. listener ! "connect failed"
  16. context stop self
  17.  
  18. case c @ Connected(remote, local) =>
  19. listener ! c
  20. val connection = sender()
  21. connection ! Register(self)
  22. context become {
  23. case data: ByteString =>
  24. connection ! Write(data)
  25. case CommandFailed(w: Write) =>
  26. // O/S buffer was full
  27. listener ! "write failed"
  28. case Received(data) =>
  29. listener ! data
  30. case "close" =>
  31. connection ! Close
  32. case _: ConnectionClosed =>
  33. listener ! "connection closed"
  34. context stop self
  35. }
  36. }
  37. }

The first step of connecting to a remote address is sending a Connect message to the TCP manager; in addition to the simplest form shown above there is also the possibility to specify a local InetSocketAddress to bind to and a list of socket options to apply.

Note

The SO_NODELAY (TCP_NODELAY on Windows) socket option defaults to true in Akka, independently of the OS default settings. This setting disables Nagle's algorithm, considerably improving latency for most applications. This setting could be overridden by passing SO.TcpNoDelay(false) in the list of socket options of the Connectmessage.

The TCP manager will then reply either with a CommandFailed or it will spawn an internal actor representing the new connection. This new actor will then send a Connected message to the original sender of the Connect message.

In order to activate the new connection a Register message must be sent to the connection actor, informing that one about who shall receive data from the socket. Before this step is done the connection cannot be used, and there is an internal timeout after which the connection actor will shut itself down if no Register message is received.

The connection actor watches the registered handler and closes the connection when that one terminates, thereby cleaning up all internal resources associated with that connection.

The actor in the example above uses become to switch from unconnected to connected operation, demonstrating the commands and events which are observed in that state. For a discussion on CommandFailed see Throttling Reads and Writes below. ConnectionClosed is a trait, which marks the different connection close events. The last line handles all connection close events in the same way. It is possible to listen for more fine-grained connection close events, seeClosing Connections below.

Accepting connections

  1. class Server extends Actor {
  2.  
  3. import Tcp._
  4. import context.system
  5.  
  6. IO(Tcp) ! Bind(self, new InetSocketAddress("localhost", 0))
  7.  
  8. def receive = {
  9. case b @ Bound(localAddress) =>
  10. // do some logging or setup ...
  11.  
  12. case CommandFailed(_: Bind) => context stop self
  13.  
  14. case c @ Connected(remote, local) =>
  15. val handler = context.actorOf(Props[SimplisticHandler])
  16. val connection = sender()
  17. connection ! Register(handler)
  18. }
  19.  
  20. }

To create a TCP server and listen for inbound connections, a Bind command has to be sent to the TCP manager. This will instruct the TCP manager to listen for TCP connections on a particular InetSocketAddress; the port may be specified as 0 in order to bind to a random port.

The actor sending the Bind message will receive a Bound message signaling that the server is ready to accept incoming connections; this message also contains the InetSocketAddress to which the socket was actually bound (i.e. resolved IP address and correct port number).

From this point forward the process of handling connections is the same as for outgoing connections. The example demonstrates that handling the reads from a certain connection can be delegated to another actor by naming it as the handler when sending the Register message. Writes can be sent from any actor in the system to the connection actor (i.e. the actor which sent the Connected message). The simplistic handler is defined as:

  1. class SimplisticHandler extends Actor {
  2. import Tcp._
  3. def receive = {
  4. case Received(data) => sender() ! Write(data)
  5. case PeerClosed => context stop self
  6. }
  7. }

For a more complete sample which also takes into account the possibility of failures when sending please see Throttling Reads and Writes below.

The only difference to outgoing connections is that the internal actor managing the listen port—the sender of theBound message—watches the actor which was named as the recipient for Connected messages in the Bindmessage. When that actor terminates the listen port will be closed and all resources associated with it will be released; existing connections will not be terminated at this point.

Closing connections

A connection can be closed by sending one of the commands Close, ConfirmedClose or Abort to the connection actor.

Close will close the connection by sending a FIN message, but without waiting for confirmation from the remote endpoint. Pending writes will be flushed. If the close is successful, the listener will be notified with Closed.

ConfirmedClose will close the sending direction of the connection by sending a FIN message, but data will continue to be received until the remote endpoint closes the connection, too. Pending writes will be flushed. If the close is successful, the listener will be notified with ConfirmedClosed.

Abort will immediately terminate the connection by sending a RST message to the remote endpoint. Pending writes will be not flushed. If the close is successful, the listener will be notified with Aborted.

PeerClosed will be sent to the listener if the connection has been closed by the remote endpoint. Per default, the connection will then automatically be closed from this endpoint as well. To support half-closed connections set thekeepOpenOnPeerClosed member of the Register message to true in which case the connection stays open until it receives one of the above close commands.

ErrorClosed will be sent to the listener whenever an error happened that forced the connection to be closed.

All close notifications are sub-types of ConnectionClosed so listeners who do not need fine-grained close events may handle all close events in the same way.

Writing to a connection

Once a connection has been established data can be sent to it from any actor in the form of a Tcp.WriteCommand.Tcp.WriteCommand is an abstract class with three concrete implementations:

Tcp.Write
The simplest WriteCommand implementation which wraps a ByteString instance and an "ack" event. A ByteString(as explained in this section) models one or more chunks of immutable in-memory data with a maximum (total) size of 2 GB (2^31 bytes).
Tcp.WriteFile
If you want to send "raw" data from a file you can do so efficiently with the Tcp.WriteFile command. This allows you do designate a (contiguous) chunk of on-disk bytes for sending across the connection without the need to first load them into the JVM memory. As such Tcp.WriteFile can "hold" more than 2GB of data and an "ack" event if required.
Tcp.CompoundWrite

Sometimes you might want to group (or interleave) several Tcp.Write and/or Tcp.WriteFile commands into one atomic write command which gets written to the connection in one go. The Tcp.CompoundWrite allows you to do just that and offers three benefits:

  1. As explained in the following section the TCP connection actor can only handle one single write command at a time. By combining several writes into one CompoundWrite you can have them be sent across the connection with minimum overhead and without the need to spoon feed them to the connection actor via an ACK-based message protocol.
  2. Because a WriteCommand is atomic you can be sure that no other actor can "inject" other writes into your series of writes if you combine them into one single CompoundWrite. In scenarios where several actors write to the same connection this can be an important feature which can be somewhat hard to achieve otherwise.
  3. The "sub writes" of a CompoundWrite are regular Write or WriteFile commands that themselves can request "ack" events. These ACKs are sent out as soon as the respective "sub write" has been completed. This allows you to attach more than one ACK to a Write or WriteFile (by combining it with an empty write that itself requests an ACK) or to have the connection actor acknowledge the progress of transmitting the CompoundWrite by sending out intermediate ACKs at arbitrary points.

Throttling Reads and Writes

The basic model of the TCP connection actor is that it has no internal buffering (i.e. it can only process one write at a time, meaning it can buffer one write until it has been passed on to the O/S kernel in full). Congestion needs to be handled at the user level, for both writes and reads.

For back-pressuring writes there are three modes of operation

These write back-pressure models (with the exception of the second which is rather specialised) are demonstrated in complete examples below. The full and contiguous source is available on GitHub.

For back-pressuring reads there are two modes of operation

Note

It should be obvious that all these flow control schemes only work between one writer/reader and one connection actor; as soon as multiple actors send write commands to a single connection no consistent result can be achieved.

ACK-Based Write Back-Pressure

For proper function of the following example it is important to configure the connection to remain half-open when the remote side closed its writing end: this allows the example EchoHandler to write all outstanding data back to the client before fully closing the connection. This is enabled using a flag upon connection activation (observe theRegister message):

  1. case Connected(remote, local) =>
  2. log.info("received connection from {}", remote)
  3. val handler = context.actorOf(Props(handlerClass, sender(), remote))
  4. sender() ! Register(handler, keepOpenOnPeerClosed = true)

With this preparation let us dive into the handler itself:

  1. // storage omitted ...
  2. class SimpleEchoHandler(connection: ActorRef, remote: InetSocketAddress)
  3. extends Actor with ActorLogging {
  4.  
  5. import Tcp._
  6.  
  7. // sign death pact: this actor terminates when connection breaks
  8. context watch connection
  9.  
  10. case object Ack extends Event
  11.  
  12. def receive = {
  13. case Received(data) =>
  14. buffer(data)
  15. connection ! Write(data, Ack)
  16.  
  17. context.become({
  18. case Received(data) => buffer(data)
  19. case Ack => acknowledge()
  20. case PeerClosed => closing = true
  21. }, discardOld = false)
  22.  
  23. case PeerClosed => context stop self
  24. }
  25.  
  26. // storage omitted ...
  27. }

The principle is simple: when having written a chunk always wait for the Ack to come back before sending the next chunk. While waiting we switch behavior such that new incoming data are buffered. The helper functions used are a bit lengthy but not complicated:

  1. private def buffer(data: ByteString): Unit = {
  2. storage :+= data
  3. stored += data.size
  4.  
  5. if (stored > maxStored) {
  6. log.warning(s"drop connection to [$remote] (buffer overrun)")
  7. context stop self
  8.  
  9. } else if (stored > highWatermark) {
  10. log.debug(s"suspending reading")
  11. connection ! SuspendReading
  12. suspended = true
  13. }
  14. }
  15.  
  16. private def acknowledge(): Unit = {
  17. require(storage.nonEmpty, "storage was empty")
  18.  
  19. val size = storage(0).size
  20. stored -= size
  21. transferred += size
  22.  
  23. storage = storage drop 1
  24.  
  25. if (suspended && stored < lowWatermark) {
  26. log.debug("resuming reading")
  27. connection ! ResumeReading
  28. suspended = false
  29. }
  30.  
  31. if (storage.isEmpty) {
  32. if (closing) context stop self
  33. else context.unbecome()
  34. } else connection ! Write(storage(0), Ack)
  35. }

The most interesting part is probably the last: an Ack removes the oldest data chunk from the buffer, and if that was the last chunk then we either close the connection (if the peer closed its half already) or return to the idle behavior; otherwise we just send the next buffered chunk and stay waiting for the next Ack.

Back-pressure can be propagated also across the reading side back to the writer on the other end of the connection by sending the SuspendReading command to the connection actor. This will lead to no data being read from the socket anymore (although this does happen after a delay because it takes some time until the connection actor processes this command, hence appropriate head-room in the buffer should be present), which in turn will lead to the O/S kernel buffer filling up on our end, then the TCP window mechanism will stop the remote side from writing, filling up its write buffer, until finally the writer on the other side cannot push any data into the socket anymore. This is how end-to-end back-pressure is realized across a TCP connection.

NACK-Based Write Back-Pressure with Suspending

  1. object EchoHandler {
  2. final case class Ack(offset: Int) extends Tcp.Event
  3.  
  4. def props(connection: ActorRef, remote: InetSocketAddress): Props =
  5. Props(classOf[EchoHandler], connection, remote)
  6. }
  7.  
  8. class EchoHandler(connection: ActorRef, remote: InetSocketAddress)
  9. extends Actor with ActorLogging {
  10.  
  11. import Tcp._
  12. import EchoHandler._
  13.  
  14. // sign death pact: this actor terminates when connection breaks
  15. context watch connection
  16.  
  17. // start out in optimistic write-through mode
  18. def receive = writing
  19.  
  20. def writing: Receive = {
  21. case Received(data) =>
  22. connection ! Write(data, Ack(currentOffset))
  23. buffer(data)
  24.  
  25. case Ack(ack) =>
  26. acknowledge(ack)
  27.  
  28. case CommandFailed(Write(_, Ack(ack))) =>
  29. connection ! ResumeWriting
  30. context become buffering(ack)
  31.  
  32. case PeerClosed =>
  33. if (storage.isEmpty) context stop self
  34. else context become closing
  35. }
  36.  
  37. // buffering ...
  38.  
  39. // closing ...
  40.  
  41. override def postStop(): Unit = {
  42. log.info(s"transferred $transferred bytes from/to [$remote]")
  43. }
  44.  
  45. // storage omitted ...
  46. }
  47. // storage omitted ...

The principle here is to keep writing until a CommandFailed is received, using acknowledgements only to prune the resend buffer. When a such a failure was received, transition into a different state for handling and handle resending of all queued data:

  1. def buffering(nack: Int): Receive = {
  2. var toAck = 10
  3. var peerClosed = false
  4.  
  5. {
  6. case Received(data) => buffer(data)
  7. case WritingResumed => writeFirst()
  8. case PeerClosed => peerClosed = true
  9. case Ack(ack) if ack < nack => acknowledge(ack)
  10. case Ack(ack) =>
  11. acknowledge(ack)
  12. if (storage.nonEmpty) {
  13. if (toAck > 0) {
  14. // stay in ACK-based mode for a while
  15. writeFirst()
  16. toAck -= 1
  17. } else {
  18. // then return to NACK-based again
  19. writeAll()
  20. context become (if (peerClosed) closing else writing)
  21. }
  22. } else if (peerClosed) context stop self
  23. else context become writing
  24. }
  25. }

It should be noted that all writes which are currently buffered have also been sent to the connection actor upon entering this state, which means that the ResumeWriting message is enqueued after those writes, leading to the reception of all outstanding CommandFailed messages (which are ignored in this state) before receiving the WritingResumedsignal. That latter message is sent by the connection actor only once the internally queued write has been fully completed, meaning that a subsequent write will not fail. This is exploited by the EchoHandler to switch to an ACK-based approach for the first ten writes after a failure before resuming the optimistic write-through behavior.

  1. def closing: Receive = {
  2. case CommandFailed(_: Write) =>
  3. connection ! ResumeWriting
  4. context.become({
  5.  
  6. case WritingResumed =>
  7. writeAll()
  8. context.unbecome()
  9.  
  10. case ack: Int => acknowledge(ack)
  11.  
  12. }, discardOld = false)
  13.  
  14. case Ack(ack) =>
  15. acknowledge(ack)
  16. if (storage.isEmpty) context stop self
  17. }

Closing the connection while still sending all data is a bit more involved than in the ACK-based approach: the idea is to always send all outstanding messages and acknowledge all successful writes, and if a failure happens then switch behavior to await the WritingResumed event and start over.

The helper functions are very similar to the ACK-based case:

  1. private def buffer(data: ByteString): Unit = {
  2. storage :+= data
  3. stored += data.size
  4.  
  5. if (stored > maxStored) {
  6. log.warning(s"drop connection to [$remote] (buffer overrun)")
  7. context stop self
  8.  
  9. } else if (stored > highWatermark) {
  10. log.debug(s"suspending reading at $currentOffset")
  11. connection ! SuspendReading
  12. suspended = true
  13. }
  14. }
  15.  
  16. private def acknowledge(ack: Int): Unit = {
  17. require(ack == storageOffset, s"received ack $ack at $storageOffset")
  18. require(storage.nonEmpty, s"storage was empty at ack $ack")
  19.  
  20. val size = storage(0).size
  21. stored -= size
  22. transferred += size
  23.  
  24. storageOffset += 1
  25. storage = storage drop 1
  26.  
  27. if (suspended && stored < lowWatermark) {
  28. log.debug("resuming reading")
  29. connection ! ResumeReading
  30. suspended = false
  31. }
  32. }

Read Back-Pressure with Pull Mode

When using push based reading, data coming from the socket is sent to the actor as soon as it is available. In the case of the previous Echo server example this meant that we needed to maintain a buffer of incoming data to keep it around since the rate of writing might be slower than the rate of the arrival of new data.

With the Pull mode this buffer can be completely eliminated as the following snippet demonstrates:

  1. override def preStart: Unit = connection ! ResumeReading
  2.  
  3. def receive = {
  4. case Received(data) => connection ! Write(data, Ack)
  5. case Ack => connection ! ResumeReading
  6. }

The idea here is that reading is not resumed until the previous write has been completely acknowledged by the connection actor. Every pull mode connection actor starts from suspended state. To start the flow of data we send aResumeReading in the preStart method to tell the connection actor that we are ready to receive the first chunk of data. Since we only resume reading when the previous data chunk has been completely written there is no need for maintaining a buffer.

To enable pull reading on an outbound connection the pullMode parameter of the Connect should be set to true:

  1. IO(Tcp) ! Connect(listenAddress, pullMode = true)

Pull Mode Reading for Inbound Connections

The previous section demonstrated how to enable pull reading mode for outbound connections but it is possible to create a listener actor with this mode of reading by setting the pullMode parameter of the Bind command to true:

  1. IO(Tcp) ! Bind(self, new InetSocketAddress("localhost", 0), pullMode = true)

One of the effects of this setting is that all connections accepted by this listener actor will use pull mode reading.

Another effect of this setting is that in addition of setting all inbound connections to pull mode, accepting connections becomes pull based, too. This means that after handling one (or more) Connected events the listener actor has to be resumed by sending it a ResumeAccepting message.

Listener actors with pull mode start suspended so to start accepting connections a ResumeAccepting command has to be sent to the listener actor after binding was successful:

  1. case Bound(localAddress) =>
  2. // Accept connections one by one
  3. sender() ! ResumeAccepting(batchSize = 1)
  4. context.become(listening(sender()))

After handling an incoming connection we need to resume accepting again:

  1. def listening(listener: ActorRef): Receive = {
  2. case Connected(remote, local) =>
  3. val handler = context.actorOf(Props(classOf[PullEcho], sender()))
  4. sender() ! Register(handler, keepOpenOnPeerClosed = true)
  5. listener ! ResumeAccepting(batchSize = 1)
  6. }

The ResumeAccepting accepts a batchSize parameter that specifies how many new connections are accepted before a next ResumeAccepting message is needed to resume handling of new connections.

Using UDP

UDP is a connectionless datagram protocol which offers two different ways of communication on the JDK level:

In the low-level API the distinction is made—confusingly—by whether or not connect has been called on the socket (even when connect has been called the protocol is still connectionless). These two forms of UDP usage are offered using distinct IO extensions described below.

Unconnected UDP

Simple Send

  1. class SimpleSender(remote: InetSocketAddress) extends Actor {
  2. import context.system
  3. IO(Udp) ! Udp.SimpleSender
  4.  
  5. def receive = {
  6. case Udp.SimpleSenderReady =>
  7. context.become(ready(sender()))
  8. }
  9.  
  10. def ready(send: ActorRef): Receive = {
  11. case msg: String =>
  12. send ! Udp.Send(ByteString(msg), remote)
  13. }
  14. }

The simplest form of UDP usage is to just send datagrams without the need of getting a reply. To this end a “simple sender” facility is provided as demonstrated above. The UDP extension is queried using the SimpleSender message, which is answered by a SimpleSenderReady notification. The sender of this message is the newly created sender actor which from this point onward can be used to send datagrams to arbitrary destinations; in this example it will just send any UTF-8 encoded String it receives to a predefined remote address.

Note

The simple sender will not shut itself down because it cannot know when you are done with it. You will need to send it a PoisonPill when you want to close the ephemeral port the sender is bound to.

Bind (and Send)

  1. class Listener(nextActor: ActorRef) extends Actor {
  2. import context.system
  3. IO(Udp) ! Udp.Bind(self, new InetSocketAddress("localhost", 0))
  4.  
  5. def receive = {
  6. case Udp.Bound(local) =>
  7. context.become(ready(sender()))
  8. }
  9.  
  10. def ready(socket: ActorRef): Receive = {
  11. case Udp.Received(data, remote) =>
  12. val processed = // parse data etc., e.g. using PipelineStage
  13. socket ! Udp.Send(data, remote) // example server echoes back
  14. nextActor ! processed
  15. case Udp.Unbind => socket ! Udp.Unbind
  16. case Udp.Unbound => context.stop(self)
  17. }
  18. }

If you want to implement a UDP server which listens on a socket for incoming datagrams then you need to use theBind command as shown above. The local address specified may have a zero port in which case the operating system will automatically choose a free port and assign it to the new socket. Which port was actually bound can be found out by inspecting the Bound message.

The sender of the Bound message is the actor which manages the new socket. Sending datagrams is achieved by using the Send message type and the socket can be closed by sending a Unbind command, in which case the socket actor will reply with a Unbound notification.

Received datagrams are sent to the actor designated in the Bind message, whereas the Bound message will be sent to the sender of the Bind.

Connected UDP

The service provided by the connection based UDP API is similar to the bind-and-send service we saw earlier, but the main difference is that a connection is only able to send to the remoteAddress it was connected to, and will receive datagrams only from that address.

  1. class Connected(remote: InetSocketAddress) extends Actor {
  2. import context.system
  3. IO(UdpConnected) ! UdpConnected.Connect(self, remote)
  4.  
  5. def receive = {
  6. case UdpConnected.Connected =>
  7. context.become(ready(sender()))
  8. }
  9.  
  10. def ready(connection: ActorRef): Receive = {
  11. case UdpConnected.Received(data) =>
  12. // process data, send it on, etc.
  13. case msg: String =>
  14. connection ! UdpConnected.Send(ByteString(msg))
  15. case UdpConnected.Disconnect =>
  16. connection ! UdpConnected.Disconnect
  17. case UdpConnected.Disconnected => context.stop(self)
  18. }
  19. }

Consequently the example shown here looks quite similar to the previous one, the biggest difference is the absence of remote address information in Send and Received messages.

Note

There is a small performance benefit in using connection based UDP API over the connectionless one. If there is a SecurityManager enabled on the system, every connectionless message send has to go through a security check, while in the case of connection-based UDP the security check is cached after connect, thus writes do not suffer an additional performance penalty.

UDP Multicast

If you want to use UDP multicast you will need to use Java 7. Akka provides a way to control various options ofDatagramChannel through the akka.io.Inet.SocketOption interface. The example below shows how to setup a receiver of multicast messages using IPv6 protocol.

To select a Protocol Family you must extend akka.io.Inet.DatagramChannelCreator class which extendsakka.io.Inet.SocketOption. Provide custom logic for opening a datagram channel by overriding create method.

  1. final case class Inet6ProtocolFamily() extends DatagramChannelCreator {
  2. override def create() =
  3. DatagramChannel.open(StandardProtocolFamily.INET6)
  4. }

Another socket option will be needed to join a multicast group.

  1. final case class MulticastGroup(address: String, interface: String) extends SocketOptionV2 {
  2. override def afterBind(s: DatagramSocket) {
  3. val group = InetAddress.getByName(address)
  4. val networkInterface = NetworkInterface.getByName(interface)
  5. s.getChannel.join(group, networkInterface)
  6. }
  7. }

Socket options must be provided to UdpMessage.Bind message.

  1. import context.system
  2. val opts = List(Inet6ProtocolFamily(), MulticastGroup(group, iface))
  3. IO(Udp) ! Udp.Bind(self, new InetSocketAddress(port), opts)

Camel

Introduction

The akka-camel module allows Untyped Actors to receive and send messages over a great variety of protocols and APIs. In addition to the native Scala and Java actor API, actors can now exchange messages with other systems over large number of protocols and APIs such as HTTP, SOAP, TCP, FTP, SMTP or JMS, to mention a few. At the moment, approximately 80 protocols and APIs are supported.

Apache Camel

The akka-camel module is based on Apache Camel, a powerful and light-weight integration framework for the JVM. For an introduction to Apache Camel you may want to read this Apache Camel article. Camel comes with a large number ofcomponents that provide bindings to different protocols and APIs. The camel-extra project provides further components.

Consumer

Usage of Camel's integration components in Akka is essentially a one-liner. Here's an example.

  1. import akka.camel.{ CamelMessage, Consumer }
  2.  
  3. class MyEndpoint extends Consumer {
  4. def endpointUri = "mina2:tcp://localhost:6200?textline=true"
  5.  
  6. def receive = {
  7. case msg: CamelMessage => { /* ... */ }
  8. case _ => { /* ... */ }
  9. }
  10. }
  11.  
  12. // start and expose actor via tcp
  13. import akka.actor.{ ActorSystem, Props }
  14.  
  15. val system = ActorSystem("some-system")
  16. val mina = system.actorOf(Props[MyEndpoint])

The above example exposes an actor over a TCP endpoint via Apache Camel's Mina component. The actor implements the endpointUri method to define an endpoint from which it can receive messages. After starting the actor, TCP clients can immediately send messages to and receive responses from that actor. If the message exchange should go over HTTP (via Camel's Jetty component), only the actor's endpointUri method must be changed.

  1. import akka.camel.{ CamelMessage, Consumer }
  2.  
  3. class MyEndpoint extends Consumer {
  4. def endpointUri = "jetty:http://localhost:8877/example"
  5.  
  6. def receive = {
  7. case msg: CamelMessage => { /* ... */ }
  8. case _ => { /* ... */ }
  9. }
  10. }

Producer

Actors can also trigger message exchanges with external systems i.e. produce to Camel endpoints.

  1. import akka.actor.Actor
  2. import akka.camel.{ Producer, Oneway }
  3. import akka.actor.{ ActorSystem, Props }
  4.  
  5. class Orders extends Actor with Producer with Oneway {
  6. def endpointUri = "jms:queue:Orders"
  7. }
  8.  
  9. val sys = ActorSystem("some-system")
  10. val orders = sys.actorOf(Props[Orders])
  11.  
  12. orders ! <order amount="100" currency="PLN" itemId="12345"/>

In the above example, any message sent to this actor will be sent to the JMS queue orders. Producer actors may choose from the same set of Camel components as Consumer actors do.

CamelMessage

The number of Camel components is constantly increasing. The akka-camel module can support these in a plug-and-play manner. Just add them to your application's classpath, define a component-specific endpoint URI and use it to exchange messages over the component-specific protocols or APIs. This is possible because Camel components bind protocol-specific message formats to a Camel-specific normalized message format. The normalized message format hides protocol-specific details from Akka and makes it therefore very easy to support a large number of protocols through a uniform Camel component interface. The akka-camel module further converts mutable Camel messages into immutable representations which are used by Consumer and Producer actors for pattern matching, transformation, serialization or storage. In the above example of the Orders Producer, the XML message is put in the body of a newly created Camel Message with an empty set of headers. You can also create a CamelMessage yourself with the appropriate body and headers as you see fit.

CamelExtension

The akka-camel module is implemented as an Akka Extension, the CamelExtension object. Extensions will only be loaded once per ActorSystem, which will be managed by Akka. The CamelExtension object provides access to theCamel trait. The Camel trait in turn provides access to two important Apache Camel objects, the CamelContext and theProducerTemplate. Below you can see how you can get access to these Apache Camel objects.

  1. val system = ActorSystem("some-system")
  2. val camel = CamelExtension(system)
  3. val camelContext = camel.context
  4. val producerTemplate = camel.template

One CamelExtension is only loaded once for every one ActorSystem, which makes it safe to call theCamelExtension at any point in your code to get to the Apache Camel objects associated with it. There is oneCamelContext and one ProducerTemplate for every one ActorSystem that uses a CamelExtension. By Default, a new CamelContext is created when the CamelExtension starts. If you want to inject your own context instead, you can extend the ContextProvider trait and add the FQCN of your implementation in the config, as the value of the "akka.camel.context-provider". This interface define a single method getContext used to load the CamelContext.

Below an example on how to add the ActiveMQ component to the CamelContext, which is required when you would like to use the ActiveMQ component.

  1. // import org.apache.activemq.camel.component.ActiveMQComponent
  2. val system = ActorSystem("some-system")
  3. val camel = CamelExtension(system)
  4. val camelContext = camel.context
  5. // camelContext.addComponent("activemq", ActiveMQComponent.activeMQComponent(
  6. // "vm://localhost?broker.persistent=false"))

The CamelContext joins the lifecycle of the ActorSystem and CamelExtension it is associated with; theCamelContext is started when the CamelExtension is created, and it is shut down when the associatedActorSystem is shut down. The same is true for the ProducerTemplate.

The CamelExtension is used by both Producer and Consumer actors to interact with Apache Camel internally. You can access the CamelExtension inside a Producer or a Consumer using the camel definition, or get straight at theCamelContext using the camelContext definition. Actors are created and started asynchronously. When a Consumeractor is created, the Consumer is published at its Camel endpoint (more precisely, the route is added to theCamelContext from the Endpoint to the actor). When a Producer actor is created, a SendProcessor and Endpoint are created so that the Producer can send messages to it. Publication is done asynchronously; setting up an endpoint may still be in progress after you have requested the actor to be created. Some Camel components can take a while to startup, and in some cases you might want to know when the endpoints are activated and ready to be used. The Cameltrait allows you to find out when the endpoint is activated or deactivated.

  1. import akka.camel.{ CamelMessage, Consumer }
  2. import scala.concurrent.duration._
  3.  
  4. class MyEndpoint extends Consumer {
  5. def endpointUri = "mina2:tcp://localhost:6200?textline=true"
  6.  
  7. def receive = {
  8. case msg: CamelMessage => { /* ... */ }
  9. case _ => { /* ... */ }
  10. }
  11. }
  12. val system = ActorSystem("some-system")
  13. val camel = CamelExtension(system)
  14. val actorRef = system.actorOf(Props[MyEndpoint])
  15. // get a future reference to the activation of the endpoint of the Consumer Actor
  16. val activationFuture = camel.activationFutureFor(actorRef)(
  17. timeout = 10 seconds,
  18. executor = system.dispatcher)

The above code shows that you can get a Future to the activation of the route from the endpoint to the actor, or you can wait in a blocking fashion on the activation of the route. An ActivationTimeoutException is thrown if the endpoint could not be activated within the specified timeout. Deactivation works in a similar fashion:

  1. system.stop(actorRef)
  2. // get a future reference to the deactivation of the endpoint of the Consumer Actor
  3. val deactivationFuture = camel.deactivationFutureFor(actorRef)(
  4. timeout = 10 seconds,
  5. executor = system.dispatcher)

Deactivation of a Consumer or a Producer actor happens when the actor is terminated. For a Consumer, the route to the actor is stopped. For a Producer, the SendProcessor is stopped. A DeActivationTimeoutException is thrown if the associated camel objects could not be deactivated within the specified timeout.

Consumer Actors

For objects to receive messages, they must mixin the Consumer trait. For example, the following actor class (Consumer1) implements the endpointUri method, which is declared in the Consumer trait, in order to receive messages from the file:data/input/actor Camel endpoint.

  1. import akka.camel.{ CamelMessage, Consumer }
  2.  
  3. class Consumer1 extends Consumer {
  4. def endpointUri = "file:data/input/actor"
  5.  
  6. def receive = {
  7. case msg: CamelMessage => println("received %s" format msg.bodyAs[String])
  8. }
  9. }

Whenever a file is put into the data/input/actor directory, its content is picked up by the Camel file component and sent as message to the actor. Messages consumed by actors from Camel endpoints are of type CamelMessage. These are immutable representations of Camel messages.

Here's another example that sets the endpointUri to jetty:http://localhost:8877/camel/default. It causes Camel's Jetty component to start an embedded Jetty server, accepting HTTP connections from localhost on port 8877.

  1. import akka.camel.{ CamelMessage, Consumer }
  2.  
  3. class Consumer2 extends Consumer {
  4. def endpointUri = "jetty:http://localhost:8877/camel/default"
  5.  
  6. def receive = {
  7. case msg: CamelMessage => sender() ! ("Hello %s" format msg.bodyAs[String])
  8. }
  9. }

After starting the actor, clients can send messages to that actor by POSTing tohttp://localhost:8877/camel/default. The actor sends a response by using the sender ! method. For returning a message body and headers to the HTTP client the response type should be CamelMessage. For any other response type, a new CamelMessage object is created by akka-camel with the actor response as message body.

Delivery acknowledgements

With in-out message exchanges, clients usually know that a message exchange is done when they receive a reply from a consumer actor. The reply message can be a CamelMessage (or any object which is then internally converted to a CamelMessage) on success, and a Failure message on failure.

With in-only message exchanges, by default, an exchange is done when a message is added to the consumer actor's mailbox. Any failure or exception that occurs during processing of that message by the consumer actor cannot be reported back to the endpoint in this case. To allow consumer actors to positively or negatively acknowledge the receipt of a message from an in-only message exchange, they need to override the autoAck method to return false. In this case, consumer actors must reply either with a special akka.camel.Ack message (positive acknowledgement) or a akka.actor.Status.Failure (negative acknowledgement).

  1. import akka.camel.{ CamelMessage, Consumer }
  2. import akka.camel.Ack
  3. import akka.actor.Status.Failure
  4.  
  5. class Consumer3 extends Consumer {
  6. override def autoAck = false
  7.  
  8. def endpointUri = "jms:queue:test"
  9.  
  10. def receive = {
  11. case msg: CamelMessage =>
  12. sender() ! Ack
  13. // on success
  14. // ..
  15. val someException = new Exception("e1")
  16. // on failure
  17. sender() ! Failure(someException)
  18. }
  19. }

Consumer timeout

Camel Exchanges (and their corresponding endpoints) that support two-way communications need to wait for a response from an actor before returning it to the initiating client. For some endpoint types, timeout values can be defined in an endpoint-specific way which is described in the documentation of the individual Camel components. Another option is to configure timeouts on the level of consumer actors.

Two-way communications between a Camel endpoint and an actor are initiated by sending the request message to the actor with the ask pattern and the actor replies to the endpoint when the response is ready. The ask request to the actor can timeout, which will result in the Exchange failing with a TimeoutException set on the failure of the Exchange. The timeout on the consumer actor can be overridden with the replyTimeout, as shown below.

  1. import akka.camel.{ CamelMessage, Consumer }
  2. import scala.concurrent.duration._
  3.  
  4. class Consumer4 extends Consumer {
  5. def endpointUri = "jetty:http://localhost:8877/camel/default"
  6. override def replyTimeout = 500 millis
  7. def receive = {
  8. case msg: CamelMessage => sender() ! ("Hello %s" format msg.bodyAs[String])
  9. }
  10. }

Producer Actors

For sending messages to Camel endpoints, actors need to mixin the Producer trait and implement the endpointUri method.

  1. import akka.actor.Actor
  2. import akka.actor.{ Props, ActorSystem }
  3. import akka.camel.{ Producer, CamelMessage }
  4. import akka.util.Timeout
  5.  
  6. class Producer1 extends Actor with Producer {
  7. def endpointUri = "http://localhost:8080/news"
  8. }

Producer1 inherits a default implementation of the receive method from the Producer trait. To customize a producer actor's default behavior you must override the Producer.transformResponse and Producer.transformOutgoingMessage methods. This is explained later in more detail. Producer Actors cannot override the default Producer.receive method.

Any message sent to a Producer actor will be sent to the associated Camel endpoint, in the above example tohttp://localhost:8080/news. The Producer always sends messages asynchronously. Response messages (if supported by the configured endpoint) will, by default, be returned to the original sender. The following example uses the ask pattern to send a message to a Producer actor and waits for a response.

  1. import akka.pattern.ask
  2. import scala.concurrent.duration._
  3. implicit val timeout = Timeout(10 seconds)
  4.  
  5. val system = ActorSystem("some-system")
  6. val producer = system.actorOf(Props[Producer1])
  7. val future = producer.ask("some request").mapTo[CamelMessage]

The future contains the response CamelMessage, or an AkkaCamelException when an error occurred, which contains the headers of the response.

Custom Processing

Instead of replying to the initial sender, producer actors can implement custom response processing by overriding the routeResponse method. In the following example, the response message is forwarded to a target actor instead of being replied to the original sender.

  1. import akka.actor.{ Actor, ActorRef }
  2. import akka.camel.{ Producer, CamelMessage }
  3. import akka.actor.{ Props, ActorSystem }
  4.  
  5. class ResponseReceiver extends Actor {
  6. def receive = {
  7. case msg: CamelMessage =>
  8. // do something with the forwarded response
  9. }
  10. }
  11.  
  12. class Forwarder(uri: String, target: ActorRef) extends Actor with Producer {
  13. def endpointUri = uri
  14.  
  15. override def routeResponse(msg: Any) { target forward msg }
  16. }
  17. val system = ActorSystem("some-system")
  18. val receiver = system.actorOf(Props[ResponseReceiver])
  19. val forwardResponse = system.actorOf(
  20. Props(classOf[Forwarder], this, "http://localhost:8080/news/akka", receiver))
  21. // the Forwarder sends out a request to the web page and forwards the response to
  22. // the ResponseReceiver
  23. forwardResponse ! "some request"

Before producing messages to endpoints, producer actors can pre-process them by overriding theProducer.transformOutgoingMessage method.

  1. import akka.actor.Actor
  2. import akka.camel.{ Producer, CamelMessage }
  3.  
  4. class Transformer(uri: String) extends Actor with Producer {
  5. def endpointUri = uri
  6.  
  7. def upperCase(msg: CamelMessage) = msg.mapBody {
  8. body: String => body.toUpperCase
  9. }
  10.  
  11. override def transformOutgoingMessage(msg: Any) = msg match {
  12. case msg: CamelMessage => upperCase(msg)
  13. }
  14. }

Producer configuration options

The interaction of producer actors with Camel endpoints can be configured to be one-way or two-way (by initiating in-only or in-out message exchanges, respectively). By default, the producer initiates an in-out message exchange with the endpoint. For initiating an in-only exchange, producer actors have to override the oneway method to return true.

  1. import akka.actor.{ Actor, Props, ActorSystem }
  2. import akka.camel.Producer
  3.  
  4. class OnewaySender(uri: String) extends Actor with Producer {
  5. def endpointUri = uri
  6. override def oneway: Boolean = true
  7. }
  8.  
  9. val system = ActorSystem("some-system")
  10. val producer = system.actorOf(Props(classOf[OnewaySender], this, "activemq:FOO.BAR"))
  11. producer ! "Some message"

Message correlation

To correlate request with response messages, applications can set the Message.MessageExchangeId message header.

  1. import akka.camel.{ Producer, CamelMessage }
  2. import akka.actor.Actor
  3. import akka.actor.{ Props, ActorSystem }
  4.  
  5. class Producer2 extends Actor with Producer {
  6. def endpointUri = "activemq:FOO.BAR"
  7. }
  8. val system = ActorSystem("some-system")
  9. val producer = system.actorOf(Props[Producer2])
  10.  
  11. producer ! CamelMessage("bar", Map(CamelMessage.MessageExchangeId -> "123"))

ProducerTemplate

The Producer trait is a very convenient way for actors to produce messages to Camel endpoints. Actors may also use a Camel ProducerTemplate for producing messages to endpoints.

  1. import akka.actor.Actor
  2. class MyActor extends Actor {
  3. def receive = {
  4. case msg =>
  5. val template = CamelExtension(context.system).template
  6. template.sendBody("direct:news", msg)
  7. }
  8. }

For initiating a two-way message exchange, one of the ProducerTemplate.request* methods must be used.

  1. import akka.actor.Actor
  2. class MyActor extends Actor {
  3. def receive = {
  4. case msg =>
  5. val template = CamelExtension(context.system).template
  6. sender() ! template.requestBody("direct:news", msg)
  7. }
  8. }

Asynchronous routing

In-out message exchanges between endpoints and actors are designed to be asynchronous. This is the case for both, consumer and producer actors.

However, asynchronous two-way message exchanges, without allocating a thread for the full duration of exchange, cannot be generically supported by Camel's asynchronous routing engine alone. This must be supported by the individual Camel components (from which endpoints are created) as well. They must be able to suspend any work started for request processing (thereby freeing threads to do other work) and resume processing when the response is ready. This is currently the case for a subset of components such as the Jetty component. All other Camel components can still be used, of course, but they will cause allocation of a thread for the duration of an in-out message exchange. There's also Examples that implements both, an asynchronous consumer and an asynchronous producer, with the jetty component.

If the used Camel component is blocking it might be necessary to use a separate dispatcher for the producer. The Camel processor is invoked by a child actor of the producer and the dispatcher can be defined in the deployment section of the configuration. For example, if your producer actor has path /user/integration/output the dispatcher of the child actor can be defined with:

  1. akka.actor.deployment {
  2. /integration/output/* {
  3. dispatcher = my-dispatcher
  4. }
  5. }

Custom Camel routes

In all the examples so far, routes to consumer actors have been automatically constructed by akka-camel, when the actor was started. Although the default route construction templates, used by akka-camel internally, are sufficient for most use cases, some applications may require more specialized routes to actors. The akka-camel module provides two mechanisms for customizing routes to actors, which will be explained in this section. These are:

Akka Camel components

Akka actors can be accessed from Camel routes using the actor Camel component. This component can be used to access any Akka actor (not only consumer actors) from Camel routes, as described in the following sections.

Access to actors

To access actors from custom Camel routes, the actor Camel component should be used. It fully supports Camel'sasynchronous routing engine.

This component accepts the following endpoint URI format:

where <actor-path> is the ActorPath to the actor. The <options> are name-value pairs separated by & (i.e.name1=value1&name2=value2&...).

URI options

The following URI options are supported:

Name Type Default Description
replyTimeout Duration false

The reply timeout, specified in the same way that you use the duration in akka, for instance 10 seconds except that in the url it is handy to use a + between the amount and the unit, like for example 200+millis

See also Consumer timeout.

autoAck Boolean true

If set to true, in-only message exchanges are auto-acknowledged when the message is added to the actor's mailbox. If set to false, actors must acknowledge the receipt of the message.

See also Delivery acknowledgements.

Here's an actor endpoint URI example containing an actor path:

  1. akka://some-system/user/myconsumer?autoAck=false&replyTimeout=100+millis

In the following example, a custom route to an actor is created, using the actor's path. the Akka camel package contains an implicit toActorRouteDefinition that allows for a route to reference an ActorRef directly as shown in the below example, The route starts from a Jetty endpoint and ends at the target actor.

  1. import akka.actor.{ Props, ActorSystem, Actor, ActorRef }
  2. import akka.camel.{ CamelMessage, CamelExtension }
  3. import org.apache.camel.builder.RouteBuilder
  4. import akka.camel._
  5. class Responder extends Actor {
  6. def receive = {
  7. case msg: CamelMessage =>
  8. sender() ! (msg.mapBody {
  9. body: String => "received %s" format body
  10. })
  11. }
  12. }
  13.  
  14. class CustomRouteBuilder(system: ActorSystem, responder: ActorRef)
  15. extends RouteBuilder {
  16. def configure {
  17. from("jetty:http://localhost:8877/camel/custom").to(responder)
  18. }
  19. }
  20. val system = ActorSystem("some-system")
  21. val camel = CamelExtension(system)
  22. val responder = system.actorOf(Props[Responder], name = "TestResponder")
  23. camel.context.addRoutes(new CustomRouteBuilder(system, responder))

When a message is received on the jetty endpoint, it is routed to the Responder actor, which in return replies back to the client of the HTTP request.

Intercepting route construction

The previous section, Akka Camel components, explained how to setup a route to an actor manually. It was the application's responsibility to define the route and add it to the current CamelContext. This section explains a more convenient way to define custom routes: akka-camel is still setting up the routes to consumer actors (and adds these routes to the current CamelContext) but applications can define extensions to these routes. Extensions can be defined with Camel's Java DSL or Scala DSL. For example, an extension could be a custom error handler that redelivers messages from an endpoint to an actor's bounded mailbox when the mailbox was full.

The following examples demonstrate how to extend a route to a consumer actor for handling exceptions thrown by that actor.

  1. import akka.camel.Consumer
  2.  
  3. import org.apache.camel.builder.Builder
  4. import org.apache.camel.model.RouteDefinition
  5.  
  6. class ErrorThrowingConsumer(override val endpointUri: String) extends Consumer {
  7. def receive = {
  8. case msg: CamelMessage => throw new Exception("error: %s" format msg.body)
  9. }
  10. override def onRouteDefinition = (rd) => rd.onException(classOf[Exception]).
  11. handled(true).transform(Builder.exceptionMessage).end
  12.  
  13. final override def preRestart(reason: Throwable, message: Option[Any]) {
  14. sender() ! Failure(reason)
  15. }
  16. }

The above ErrorThrowingConsumer sends the Failure back to the sender in preRestart because the Exception that is thrown in the actor would otherwise just crash the actor, by default the actor would be restarted, and the response would never reach the client of the Consumer.

The akka-camel module creates a RouteDefinition instance by calling from(endpointUri) on a Camel RouteBuilder (where endpointUri is the endpoint URI of the consumer actor) and passes that instance as argument to the route definition handler *). The route definition handler then extends the route and returns a ProcessorDefinition (in the above example, the ProcessorDefinition returned by the end method. See the org.apache.camel.model package for details). After executing the route definition handler, akka-camel finally calls a to(targetActorUri) on the returned ProcessorDefinition to complete the route to the consumer actor (where targetActorUri is the actor component URI as described in Access to actors). If the actor cannot be found, a ActorNotRegisteredException is thrown.

*) Before passing the RouteDefinition instance to the route definition handler, akka-camel may make some further modifications to it.

Examples

The Lightbend Activator tutorial named Akka Camel Samples with Scala contains 3 samples:

  • Asynchronous routing and transformation - This example demonstrates how to implement consumer and producer actors that support Asynchronous routing with their Camel endpoints.
  • Custom Camel route - Demonstrates the combined usage of a Producer and a Consumer actor as well as the inclusion of a custom Camel route.
  • Quartz Scheduler Example - Showing how simple is to implement a cron-style scheduler by using the Camel Quartz component

Configuration

There are several configuration properties for the Camel module, please refer to the reference configuration.

Additional Resources

For an introduction to akka-camel 2, see also the Peter Gabryanczyk's talk Migrating akka-camel module to Akka 2.x.

For an introduction to akka-camel 1, see also the Appendix E - Akka and Camel (pdf) of the book Camel in Action.

Other, more advanced external articles (for version 1) are: