akka – Cluster sharding uses shardRegionProxy to report an error Trying to register to coordinator
1.1, complete error report
An error is reported at the proxy node:
[WARN] [06/23/2022 11:59:55.449] [shardingSystem-akka.actor.internal-dispatcher-29] [akka://[email protected]:2553/system/sharding111111/CounterProxy] Counter: Trying to register to coordinator at [ActorSelection[Anchor(akka://[email protected]:2551/), Path(/system/sharding111111/CounterCoordinator/singleton/coordinator)]], but no acknowledgement.Total [1 ] buffered messages. [Coordinator [Member(akka://[email protected]:2551, Up)] is reachable.]
If you want to see the results directly, go directly to the conclusion of this chapter.
1.2, scene reproduction
Cluster and node relationship:
1.2.1, node configuration of two shardRegions
File name: testApplication.conf
akka { extensions = ["akka.cluster.client.ClusterClientReceptionist"] actor { provider = "cluster" } remote.artery { canonical { hostname = "127.0.0.1" port = 2551 } } cluster { seed-nodes = [ "akka://[email protected]:2551", "akka://[email protected]:2552" ] downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider" } } akka.actor.allow-java-serialization=on akka.cluster.sharding { # This configuration will affect the specific path akka://shardingSystem/system/sharding111111/Counter/121/111#-247227349 guardian-name = sharding111111 # ? }
1.2.2, worderActor and startup code of two shardRegion nodes
Create node1 and node2 nodes with port numbers 2551 and 2552 respectively
package com.test import akka.actor.AbstractActor import akka.actor.ActorRef import akka.actor.ActorSystem import akka.actor.Props import akka.cluster.client.ClusterClientReceptionist import akka.cluster.pubsub.DistributedPubSub import akka.cluster.pubsub.DistributedPubSubMediator import akka.cluster.sharding.ClusterSharding import akka.cluster.sharding.ClusterShardingSettings import akka.cluster.sharding.ShardCoordinator import akka.cluster.sharding.ShardRegion import com.typesafe.config.ConfigFactory import java.time.Duration class Counter: AbstractActor(), java.io.Serializable {<!-- --> class Get(var counterId: Long): java.io.Serializable override fun createReceive(): Receive {<!-- --> return receiveBuilder() .match(Get::class.java) {<!-- --> println("Can go here normally~~~~~~~~") } .build(); } } // Fragmentation rules, in order to reproduce the error, write it simply class MyMessageExtractor: ShardRegion.MessageExtractor {<!-- --> // Extract the entity ID, the actor corresponding to the entity override fun entityId(message: Any?): String? {<!-- --> return "2222" } // The message can be unpacked override fun entityMessage(message: Any?): Any? {<!-- --> return message } // Calculate the corresponding shard ID according to the entity ID (note that this is based on the entity ID) override fun shardId(message: Any?): String? {<!-- --> return "9999" } } fun main() {<!-- --> val config = // Change the port number for another port ConfigFactory.parseString("akka.remote.artery.canonical.port=2551").withFallback(ConfigFactory.load("testApplication.conf")) var system = ActorSystem.create("shardingSystem", config) // Create ClusterShading configuration val settings = ClusterShardingSettings.create(system) // Start the ShardRegion service val shardingRegion = ClusterSharding.get(system).start( // partition name "Counter", // Specify the entity type managed under this partition Props.create(Counter::class.java), settings, // sharding rules MyMessageExtractor()) }
1.2.3, proxy code
Node port number: 2553
package com.test import akka.actor.ActorRef import akka.actor.ActorSystem import akka.actor.Props import akka.cluster.sharding.ClusterSharding import akka.cluster.sharding.ClusterShardingSettings import com.typesafe.config.ConfigFactory import scala.Option import java.util.* fun main() {<!-- --> val config = ConfigFactory.parseString("akka.remote.artery.canonical.port=2553").withFallback(ConfigFactory.load("testApplication.conf")) var system = ActorSystem.create("shardingSystem", config) ClusterSharding.get(system).startProxy( "Counter", Optional. empty(), MyMessageExtractor() ).let {<!-- --> println(" shard proxy $it started.") } Thread.sleep(3000) val shardRegion = ClusterSharding.get(system).shardRegion("Counter") shardRegion.tell(Counter.Get(123), ActorRef.noSender()) }
1.2.4, run, error
- Start node1, node2 first
- Restart the proxy node proxy
The proxy node reports an error:
[WARN] [06/23/2022 12:15:33.901] [shardingSystem-akka.actor.internal-dispatcher-34] [akka://[email protected]:2553/system/sharding111111/CounterProxy] Counter: Trying to register to coordinator at [ActorSelection[Anchor(akka://[email protected]:2551/), Path(/system/sharding111111/CounterCoordinator/singleton/coordinator)]], but no acknowledgement.Total [1 ] buffered messages. [Coordinator [Member(akka://[email protected]:2551, Up)] is reachable.]
1.3, Analysis
-
We know that a sharded cluster is structured like this:
Blue box: cluster node
Red dot: ShardRegion per cluster node
Yellow dot: singleton coordinator for the entire ShardRegion cluster (through which entities are found and allocated)
- Judging from the error report, it is because the proxy node cannot register with the coordinator in ShardRegion, so that the message cannot be sharded to the corresponding entity through the coordinator coordinator.
This has troubled me for a long time. After finally asking the leader, I found a solution at this URL:
Proxy try to register to itselves, so ClusterSharding does not work – Akka / Akka Cluster – Discussion Forum for Akka Serverless and Akka Platform technologies (lightbend.com)
Among them is a sentence:
Note that if you want to only run actual shards on a subset of the nodes you will need to use roles for those nodes and tell sharding that it should be limited to those nodes. If not using roles sharding will have to be started on all nodes to work. Look for “roles” in this docs page for details https://doc.akka.io/docs/akka/current/cluster-sharding.html 12
After translation, two points are obtained (only one of these two points can be accessed through the proxy):
- If you only want to run actual shards on a subset of nodes, you need to use roles for those nodes, and tell that the shards should be restricted to those nodes.
- Without roles, sharding must be started on all nodes for this to work
So although the proxy node in the code has the proxy service turned on, the node neither starts sharding nor specifies a role (one of the above conditions is not met), so an error is reported
1.4. Conclusion
(Only one of these two points can be accessed through the proxy):
- If you only want to run actual shards on a subset of nodes, you need to use roles for those nodes, and tell that the shards should be restricted to those nodes.
- Without roles, sharding must be started on all nodes for this to work
Modified node relationship:
1.4.1, add role configuration
akka { cluster { roles = ["counterRole"] sharding { role = "counterRole" } } }
Below is the configuration file of testApplication.conf
akka { extensions = ["akka.cluster.client.ClusterClientReceptionist"] actor { provider = "cluster" } remote.artery { canonical { # Change the port on the second node hostname = "127.0.0.1" port = 2551 } } cluster { roles = ["counterRole"] sharding { role = "counterRole" } seed-nodes = [ "akka://[email protected]:2551", "akka://[email protected]:2552" ] downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider" } } akka.actor.allow-java-serialization=on akka.cluster.sharding { # This configuration will affect the specific path akka://shardingSystem/system/sharding111111/Counter/121/111#-247227349 guardian-name = sharding111111 # ? }
1.4.2, add proxy class configuration file
File name: proxyApplication.conf
akka { extensions = ["akka.cluster.client.ClusterClientReceptionist"] actor { provider = "cluster" } remote.artery { canonical { hostname = "127.0.0.1" port = 2553 } } # add character cluster { roles = ["countProxy"] sharding { role = "counterRole" } seed-nodes = [ "akka://[email protected]:2551", "akka://[email protected]:2552" ] downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider" } } akka.actor.allow-java-serialization=on akka.cluster.sharding {akka://shardingSystem/system/sharding111111/Counter/121/111#-247227349 guardian-name = sharding111111 # ? }
1.4.3, specify the role in the ShardRegion startup service method
// Create ClusterShading configuration val settings = ClusterShardingSettings.create(system)
val shardingRegion = ClusterSharding.get(system).start( "Counter", Props.create(Counter::class.java), // Add configuration and specify roles settings.withRole("counterRole"), MyMessageExtractor())
1.4.4, switch the configuration file on the agent node and specify the role to which the agent ShardRegion belongs
# Switch configuration files: val config = ConfigFactory.parseString("akka.remote.artery.canonical.port=2553").withFallback(ConfigFactory.load("proxyApplication.conf")) ..... # Start the proxy service and specify the role that the proxy ShardRegion belongs to ClusterSharding.get(system).startProxy( "Counter", Optional.of("counterRole"), MyMessageExtractor() )
Full code:
Note: This is not a ShardRegion. If you want to use a ShardRegion proxy in a sub-Actor of a non-ShardRegion node, you must configure roles on the ShardRegion node and proxy node and configuration file.
Otherwise the ShardRegion service must be used on all nodes
package com.test import akka.actor.ActorRef import akka.actor.ActorSystem import akka.actor.Props import akka.cluster.sharding.ClusterSharding import akka.cluster.sharding.ClusterShardingSettings import com.typesafe.config.ConfigFactory import scala.Option import java.util.* fun main() {<!-- --> val config = ConfigFactory.parseString("akka.remote.artery.canonical.port=2553").withFallback(ConfigFactory.load("testApplication.conf")) var system = ActorSystem.create("shardingSystem", config) ClusterSharding.get(system).startProxy( "Counter", // Specify the role, the proxy can be successful Optional.of("counterRole"), MyMessageExtractor() ).let {<!-- --> println(" shard proxy $it started.") } Thread.sleep(3000) val shardRegion = ClusterSharding.get(system).shardRegion("Counter") shardRegion.tell(Counter.Get(123), ActorRef.noSender()) }