[Solved] akka – Cluster sharding uses shardRegionProxy to report an error Trying to register to coordinator

akka – Cluster sharding uses shardRegionProxy to report an error Trying to register to coordinator

1.1, complete error report

An error is reported at the proxy node:

[WARN] [06/23/2022 11:59:55.449] [shardingSystem-akka.actor.internal-dispatcher-29] [akka://[email protected]:2553/system/sharding111111/CounterProxy] Counter: Trying to register to coordinator at [ActorSelection[Anchor(akka://[email protected]:2551/), Path(/system/sharding111111/CounterCoordinator/singleton/coordinator)]], but no acknowledgement.Total [1 ] buffered messages. [Coordinator [Member(akka://[email protected]:2551, Up)] is reachable.]

If you want to see the results directly, go directly to the conclusion of this chapter.

1.2, scene reproduction

Cluster and node relationship:

[External link image transfer failed, the source site may have an anti-leech mechanism, it is recommended to save the image and upload it directly (img-RTkVor4Y-1655962807272)(Timage/FAQ_image/image-20220623133935349.png)]

1.2.1, node configuration of two shardRegions

File name: testApplication.conf

akka {
extensions = ["akka.cluster.client.ClusterClientReceptionist"]
  actor {
    provider = "cluster"
  }
  remote.artery {
    canonical {
      hostname = "127.0.0.1"
      port = 2551
    }
  }

    cluster {
      seed-nodes = [
        "akka://[email protected]:2551",
        "akka://[email protected]:2552"
      ]

      downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
    }
}

akka.actor.allow-java-serialization=on

akka.cluster.sharding {
# This configuration will affect the specific path akka://shardingSystem/system/sharding111111/Counter/121/111#-247227349
  guardian-name = sharding111111

  # ?

}

1.2.2, worderActor and startup code of two shardRegion nodes

Create node1 and node2 nodes with port numbers 2551 and 2552 respectively

package com.test

import akka.actor.AbstractActor
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.Props
import akka.cluster.client.ClusterClientReceptionist
import akka.cluster.pubsub.DistributedPubSub
import akka.cluster.pubsub.DistributedPubSubMediator
import akka.cluster.sharding.ClusterSharding
import akka.cluster.sharding.ClusterShardingSettings
import akka.cluster.sharding.ShardCoordinator
import akka.cluster.sharding.ShardRegion
import com.typesafe.config.ConfigFactory
import java.time.Duration

class Counter: AbstractActor(), java.io.Serializable {<!-- -->

    class Get(var counterId: Long): java.io.Serializable

    override fun createReceive(): Receive {<!-- -->
        return receiveBuilder()
            .match(Get::class.java) {<!-- -->
                println("Can go here normally~~~~~~~~")
            }
            .build();
    }
}

// Fragmentation rules, in order to reproduce the error, write it simply
class MyMessageExtractor: ShardRegion.MessageExtractor {<!-- -->

    // Extract the entity ID, the actor corresponding to the entity
    override fun entityId(message: Any?): String? {<!-- -->
        return "2222"
    }

    // The message can be unpacked
    override fun entityMessage(message: Any?): Any? {<!-- -->
        return message
    }

    // Calculate the corresponding shard ID according to the entity ID (note that this is based on the entity ID)
    override fun shardId(message: Any?): String? {<!-- -->
        return "9999"
    }

}

fun main() {<!-- -->
    val config =
        // Change the port number for another port
        ConfigFactory.parseString("akka.remote.artery.canonical.port=2551").withFallback(ConfigFactory.load("testApplication.conf"))
    var system = ActorSystem.create("shardingSystem", config)

    // Create ClusterShading configuration
    val settings = ClusterShardingSettings.create(system)
        // Start the ShardRegion service
    val shardingRegion = ClusterSharding.get(system).start(
        // partition name
        "Counter",
        // Specify the entity type managed under this partition
        Props.create(Counter::class.java),
        settings,
        // sharding rules
        MyMessageExtractor())
}

1.2.3, proxy code

Node port number: 2553

package com.test

import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.Props
import akka.cluster.sharding.ClusterSharding
import akka.cluster.sharding.ClusterShardingSettings
import com.typesafe.config.ConfigFactory
import scala.Option
import java.util.*

fun main() {<!-- -->
    val config = ConfigFactory.parseString("akka.remote.artery.canonical.port=2553").withFallback(ConfigFactory.load("testApplication.conf"))
    var system = ActorSystem.create("shardingSystem", config)
    ClusterSharding.get(system).startProxy(
        "Counter",
        Optional. empty(),
        MyMessageExtractor()
    ).let {<!-- -->
        println(" shard proxy $it started.")
    }

    Thread.sleep(3000)
    val shardRegion = ClusterSharding.get(system).shardRegion("Counter")

    shardRegion.tell(Counter.Get(123), ActorRef.noSender())
}

1.2.4, run, error

  • Start node1, node2 first
  • Restart the proxy node proxy

The proxy node reports an error:

[WARN] [06/23/2022 12:15:33.901] [shardingSystem-akka.actor.internal-dispatcher-34] [akka://[email protected]:2553/system/sharding111111/CounterProxy] Counter: Trying to register to coordinator at [ActorSelection[Anchor(akka://[email protected]:2551/), Path(/system/sharding111111/CounterCoordinator/singleton/coordinator)]], but no acknowledgement.Total [1 ] buffered messages. [Coordinator [Member(akka://[email protected]:2551, Up)] is reachable.]

1.3, Analysis

  • We know that a sharded cluster is structured like this:

    Blue box: cluster node

    Red dot: ShardRegion per cluster node

    Yellow dot: singleton coordinator for the entire ShardRegion cluster (through which entities are found and allocated)

[External link image transfer failed, the source site may have an anti-leech mechanism, it is recommended to save the image and upload it directly (img-yfF1gBel-1655962807272)(Timage/FAQ_image/image-20220623123101639.png)]

  • Judging from the error report, it is because the proxy node cannot register with the coordinator in ShardRegion, so that the message cannot be sharded to the corresponding entity through the coordinator coordinator.

This has troubled me for a long time. After finally asking the leader, I found a solution at this URL:

Proxy try to register to itselves, so ClusterSharding does not work – Akka / Akka Cluster – Discussion Forum for Akka Serverless and Akka Platform technologies (lightbend.com)

Among them is a sentence:

Note that if you want to only run actual shards on a subset of the nodes you will need to use roles for those nodes and tell sharding that it should be limited to those nodes. If not using roles sharding will have to be started on all nodes to work.

Look for “roles” in this docs page for details https://doc.akka.io/docs/akka/current/cluster-sharding.html 12

After translation, two points are obtained (only one of these two points can be accessed through the proxy):

  • If you only want to run actual shards on a subset of nodes, you need to use roles for those nodes, and tell that the shards should be restricted to those nodes.
  • Without roles, sharding must be started on all nodes for this to work

So although the proxy node in the code has the proxy service turned on, the node neither starts sharding nor specifies a role (one of the above conditions is not met), so an error is reported

1.4. Conclusion

(Only one of these two points can be accessed through the proxy):

  • If you only want to run actual shards on a subset of nodes, you need to use roles for those nodes, and tell that the shards should be restricted to those nodes.
  • Without roles, sharding must be started on all nodes for this to work

Modified node relationship:

1.4.1, add role configuration

akka {
cluster {
roles = ["counterRole"]
          sharding {
            role = "counterRole"
          }
}
}

Below is the configuration file of testApplication.conf

akka {
extensions = ["akka.cluster.client.ClusterClientReceptionist"]
  actor {
    provider = "cluster"
  }
  remote.artery {
    canonical {
      # Change the port on the second node
      hostname = "127.0.0.1"
      port = 2551
    }
  }

    cluster {
      roles = ["counterRole"]
      sharding {
        role = "counterRole"
      }
      seed-nodes = [
        "akka://[email protected]:2551",
        "akka://[email protected]:2552"
      ]

      downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
    }
}

akka.actor.allow-java-serialization=on

akka.cluster.sharding {
# This configuration will affect the specific path akka://shardingSystem/system/sharding111111/Counter/121/111#-247227349
  guardian-name = sharding111111

  # ?

}

1.4.2, add proxy class configuration file

File name: proxyApplication.conf

akka {
extensions = ["akka.cluster.client.ClusterClientReceptionist"]
  actor {
    provider = "cluster"
  }
  remote.artery {
    canonical {
      hostname = "127.0.0.1"
      port = 2553
    }
  }

# add character
    cluster {
      roles = ["countProxy"]
      sharding {
        role = "counterRole"
      }
      seed-nodes = [
        "akka://[email protected]:2551",
        "akka://[email protected]:2552"
      ]

      downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
    }
}

akka.actor.allow-java-serialization=on

akka.cluster.sharding {akka://shardingSystem/system/sharding111111/Counter/121/111#-247227349
  guardian-name = sharding111111

  # ?

}

1.4.3, specify the role in the ShardRegion startup service method

// Create ClusterShading configuration
val settings = ClusterShardingSettings.create(system)
 val shardingRegion = ClusterSharding.get(system).start(
        "Counter",
        Props.create(Counter::class.java),
        // Add configuration and specify roles
        settings.withRole("counterRole"),
        MyMessageExtractor())

1.4.4, switch the configuration file on the agent node and specify the role to which the agent ShardRegion belongs

# Switch configuration files:
val config = ConfigFactory.parseString("akka.remote.artery.canonical.port=2553").withFallback(ConfigFactory.load("proxyApplication.conf"))
.....
# Start the proxy service and specify the role that the proxy ShardRegion belongs to
ClusterSharding.get(system).startProxy(
"Counter",
Optional.of("counterRole"),
MyMessageExtractor()
)

Full code:

Note: This is not a ShardRegion. If you want to use a ShardRegion proxy in a sub-Actor of a non-ShardRegion node, you must configure roles on the ShardRegion node and proxy node and configuration file.

Otherwise the ShardRegion service must be used on all nodes

package com.test

import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.Props
import akka.cluster.sharding.ClusterSharding
import akka.cluster.sharding.ClusterShardingSettings
import com.typesafe.config.ConfigFactory
import scala.Option
import java.util.*

fun main() {<!-- -->
    val config = ConfigFactory.parseString("akka.remote.artery.canonical.port=2553").withFallback(ConfigFactory.load("testApplication.conf"))
    var system = ActorSystem.create("shardingSystem", config)
    ClusterSharding.get(system).startProxy(
        "Counter",
        // Specify the role, the proxy can be successful
        Optional.of("counterRole"),
        MyMessageExtractor()
    ).let {<!-- -->
        println(" shard proxy $it started.")
    }

    Thread.sleep(3000)
    val shardRegion = ClusterSharding.get(system).shardRegion("Counter")

    shardRegion.tell(Counter.Get(123), ActorRef.noSender())
}