在很多應(yīng)用場(chǎng)景中都會(huì)出現(xiàn)在系統(tǒng)中需要某類(lèi)Actor的唯一實(shí)例(only instance)。這個(gè)實(shí)例在集群環(huán)境中可能在任何一個(gè)節(jié)點(diǎn)上,但保證它是唯一的。Akka的Cluster-Singleton提供對(duì)這種Singleton Actor模式的支持,能做到當(dāng)這個(gè)實(shí)例所在節(jié)點(diǎn)出現(xiàn)問(wèn)題需要脫離集群時(shí)自動(dòng)在另一個(gè)節(jié)點(diǎn)上構(gòu)建一個(gè)同樣的Actor,并重新轉(zhuǎn)交控制。當(dāng)然,由于涉及了一個(gè)新構(gòu)建的Actor,內(nèi)部狀態(tài)會(huì)在這個(gè)過(guò)程中丟失。Single-Actor的主要應(yīng)用包括某種對(duì)外部只能支持一個(gè)接入的程序接口,或者一種帶有由多個(gè)其它Actor運(yùn)算結(jié)果產(chǎn)生的內(nèi)部狀態(tài)的累積型Actor(aggregator)。當(dāng)然,如果使用一種帶有內(nèi)部狀態(tài)的Singleton-Actor,可以考慮使用PersistenceActor來(lái)實(shí)現(xiàn)內(nèi)部狀態(tài)的自動(dòng)恢復(fù)。如此Cluster-Singleton變成了一種非常實(shí)用的模式,可以在許多場(chǎng)合下應(yīng)用。
Cluster-Singleton模式也恰恰因?yàn)樗奈ㄒ恍蕴攸c(diǎn)存在著一些隱憂(yōu),需要特別關(guān)注。唯一性容易造成的隱憂(yōu)包括:容易造成超負(fù)荷、無(wú)法保證穩(wěn)定在線、無(wú)法保證消息投遞。這些需要用戶(hù)在編程時(shí)增加特別處理。
好了,我們?cè)O(shè)計(jì)個(gè)例子來(lái)了解Cluster-Singleton,先看看Singleton-Actor的功能:
class SingletonActor extends PersistentActor with ActorLogging { import SingletonActor._ val cluster = Cluster(context.system) var freeHoles = 0 var freeTrees = 0 var ttlMatches = 0 override def persistenceId = self.path.parent.name + "-" + self.path.name def updateState(evt: Event): Unit = evt match { case AddHole => if (freeTrees > 0) { ttlMatches += 1 freeTrees -= 1 } else freeHoles += 1 case AddTree => if (freeHoles > 0) { ttlMatches += 1 freeHoles -= 1 } else freeTrees += 1 } override def receiveRecover: Receive = { case evt: Event => updateState(evt) case SnapshotOffer(_,ss: State) => freeHoles = ss.nHoles freeTrees = ss.nTrees ttlMatches = ss.nMatches } override def receiveCommand: Receive = { case Dig => persist(AddHole){evt => updateState(evt) } sender() ! AckDig //notify sender message received log.info(s"State on ${cluster.selfAddress}:freeHoles=$freeHoles,freeTrees=$freeTrees,ttlMatches=$ttlMatches") case Plant => persist(AddTree) {evt => updateState(evt) } sender() ! AckPlant //notify sender message received log.info(s"State on ${cluster.selfAddress}:freeHoles=$freeHoles,freeTrees=$freeTrees,ttlMatches=$ttlMatches") case Disconnect => //this node exits cluster. expect switch to another node log.info(s"${cluster.selfAddress} is leaving cluster ...") cluster.leave(cluster.selfAddress) case CleanUp => //clean up ... self ! PoisonPill } }
這個(gè)SingletonActor就是一種特殊的Actor,它繼承了PersistentActor,所以需要實(shí)現(xiàn)PersistentActor的抽象函數(shù)。SingletonActor維護(hù)了幾個(gè)內(nèi)部狀態(tài),分別是各類(lèi)運(yùn)算的當(dāng)前累積結(jié)果freeHoles,freeTrees,ttlMatches。SingletonActor模擬的是一個(gè)種樹(shù)場(chǎng)景:當(dāng)收到Dig指令后產(chǎn)生登記樹(shù)坑AddHole事件,在這個(gè)事件中更新當(dāng)前狀態(tài)值;當(dāng)收到Plant指令后產(chǎn)生AddTree事件并更新?tīng)顟B(tài)。因?yàn)镃luster-Singleton模式無(wú)法保證消息安全投遞所以應(yīng)該加個(gè)回復(fù)機(jī)制AckDig,AckPlant讓消息發(fā)送者可用根據(jù)情況補(bǔ)發(fā)消息。我們是用Cluster.selfAddress來(lái)確認(rèn)當(dāng)前集群節(jié)點(diǎn)的轉(zhuǎn)換。
我們需要在所有承載SingletonActor的集群節(jié)點(diǎn)上構(gòu)建部署ClusterSingletonManager,如下:
def create(port: Int) = { val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port") .withFallback(ConfigFactory.parseString("akka.cluster.roles=[singleton]")) .withFallback(ConfigFactory.load()) val singletonSystem = ActorSystem("SingletonClusterSystem",config) startupSharedJournal(singletonSystem, (port == 2551), path = ActorPath.fromString("akka.tcp://SingletonClusterSystem@127.0.0.1:2551/user/store")) val singletonManager = singletonSystem.actorOf(ClusterSingletonManager.props( singletonProps = Props[SingletonActor], terminationMessage = CleanUp, settings = ClusterSingletonManagerSettings(singletonSystem).withRole(Some("singleton")) ), name = "singletonManager") }
可以看的出來(lái),ClusterSingletonManager也是一種Actor,通過(guò)ClusterSingletonManager.props配置其所管理的SingletonActor。我們的目的主要是去求證當(dāng)前集群節(jié)點(diǎn)出現(xiàn)故障需要退出集群時(shí),這個(gè)SingletonActor是否能夠自動(dòng)轉(zhuǎn)移到其它在線的節(jié)點(diǎn)上。ClusterSingletonManager的工作原理是首先在所有選定的集群節(jié)點(diǎn)上構(gòu)建和部署,然后在最先部署的節(jié)點(diǎn)上啟動(dòng)SingletonActor,當(dāng)這個(gè)節(jié)點(diǎn)不可使用時(shí)(unreachable)自動(dòng)在次先部署的節(jié)點(diǎn)上重新構(gòu)建部署SingletonActor。
同樣作為一種Actor,ClusterSingletonProxy是通過(guò)與ClusterSingletonManager消息溝通來(lái)調(diào)用SingletonActor的。ClusterSingletonProxy動(dòng)態(tài)跟蹤在線的SingletonActor,為用戶(hù)提供它的ActorRef。我們可以通過(guò)下面的代碼來(lái)具體調(diào)用SingletonActor:
object SingletonUser { def create = { val config = ConfigFactory.parseString("akka.cluster.roles=[frontend]") .withFallback(ConfigFactory.load()) val suSystem = ActorSystem("SingletonClusterSystem",config) val singletonProxy = suSystem.actorOf(ClusterSingletonProxy.props( singletonManagerPath = "/user/singletonManager", settings = ClusterSingletonProxySettings(suSystem).withRole(None) ), name= "singletonUser") import suSystem.dispatcher //send Dig messages every 2 seconds to SingletonActor through prox suSystem.scheduler.schedule(0.seconds,3.second,singletonProxy,SingletonActor.Dig) //send Plant messages every 3 seconds to SingletonActor through prox suSystem.scheduler.schedule(1.seconds,2.second,singletonProxy,SingletonActor.Plant) //send kill message to hosting node every 30 seconds suSystem.scheduler.schedule(10.seconds,15.seconds,singletonProxy,SingletonActor.Disconnect) } }
我們分不同的時(shí)間段通過(guò)ClusterSingletonProxy向SingletonActor發(fā)送Dig和Plant消息。然后每隔30秒向SingletonActor發(fā)送一個(gè)Disconnect消息通知它所在節(jié)點(diǎn)開(kāi)始脫離集群。然后我們用下面的代碼來(lái)試著運(yùn)行:
package clustersingleton.demo import clustersingleton.sa.SingletonActor import clustersingleton.frontend.SingletonUserobject ClusterSingletonDemo extends App { SingletonActor.create(2551) //seed-node SingletonActor.create(0) //ClusterSingletonManager node SingletonActor.create(0) SingletonActor.create(0) SingletonActor.create(0) SingletonUser.create //ClusterSingletonProxy node}
運(yùn)算結(jié)果如下:
[INFO] [07/09/2017 20:17:28.210] [main] [akka.remote.Remoting] Starting remoting [INFO] [07/09/2017 20:17:28.334] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://SingletonClusterSystem@127.0.0.1:2551][INFO] [07/09/2017 20:17:28.489] [main] [akka.remote.Remoting] Starting remoting [INFO] [07/09/2017 20:17:28.493] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://SingletonClusterSystem@127.0.0.1:55839][INFO] [07/09/2017 20:17:28.514] [main] [akka.remote.Remoting] Starting remoting [INFO] [07/09/2017 20:17:28.528] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://SingletonClusterSystem@127.0.0.1:55840][INFO] [07/09/2017 20:17:28.566] [main] [akka.remote.Remoting] Starting remoting [INFO] [07/09/2017 20:17:28.571] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://SingletonClusterSystem@127.0.0.1:55841][INFO] [07/09/2017 20:17:28.595] [main] [akka.remote.Remoting] Starting remoting [INFO] [07/09/2017 20:17:28.600] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://SingletonClusterSystem@127.0.0.1:55842][INFO] [07/09/2017 20:17:28.620] [main] [akka.remote.Remoting] Starting remoting [INFO] [07/09/2017 20:17:28.624] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://SingletonClusterSystem@127.0.0.1:55843][INFO] [07/09/2017 20:17:28.794] [SingletonClusterSystem-akka.actor.default-dispatcher-15] [akka.tcp://SingletonClusterSystem@127.0.0.1:55843/user/singletonUser] Singleton identified at [akka.tcp://SingletonClusterSystem@127.0.0.1:2551/user/singletonManager/singleton][INFO] [07/09/2017 20:17:28.817] [SingletonClusterSystem-akka.actor.default-dispatcher-15] [akka.tcp://SingletonClusterSystem@127.0.0.1:2551/user/singletonManager/singleton] State on akka.tcp://SingletonClusterSystem@127.0.0.1:2551:freeHoles=0,freeTrees=0,ttlMatches=0[INFO] [07/09/2017 20:17:29.679] [SingletonClusterSystem-akka.actor.default-dispatcher-14] [akka.tcp://SingletonClusterSystem@127.0.0.1:2551/user/singletonManager/singleton] State on akka.tcp://SingletonClusterSystem@127.0.0.1:2551:freeHoles=1,freeTrees=0,ttlMatches=0... [INFO] [07/09/2017 20:17:38.676] [SingletonClusterSystem-akka.actor.default-dispatcher-3] [akka.tcp://SingletonClusterSystem@127.0.0.1:2551/user/singletonManager/singleton] akka.tcp://SingletonClusterSystem@127.0.0.1:2551 is leaving cluster ...[INFO] [07/09/2017 20:17:39.664] [SingletonClusterSystem-akka.actor.default-dispatcher-3] [akka.tcp://SingletonClusterSystem@127.0.0.1:2551/user/singletonManager/singleton] State on akka.tcp://SingletonClusterSystem@127.0.0.1:2551:freeHoles=0,freeTrees=1,ttlMatches=4[INFO] [07/09/2017 20:17:40.654] [SingletonClusterSystem-akka.actor.default-dispatcher-21] [akka.tcp://SingletonClusterSystem@127.0.0.1:2551/user/singletonManager/singleton] State on akka.tcp://SingletonClusterSystem@127.0.0.1:2551:freeHoles=0,freeTrees=2,ttlMatches=4[INFO] [07/09/2017 20:17:41.664] [SingletonClusterSystem-akka.actor.default-dispatcher-17] [akka.tcp://SingletonClusterSystem@127.0.0.1:2551/user/singletonManager/singleton] State on akka.tcp://SingletonClusterSystem@127.0.0.1:2551:freeHoles=0,freeTrees=1,ttlMatches=5[INFO] [07/09/2017 20:17:42.518] [SingletonClusterSystem-akka.actor.default-dispatcher-3] [akka.tcp://SingletonClusterSystem@127.0.0.1:55843/user/singletonUser] Singleton identified at [akka.tcp://SingletonClusterSystem@127.0.0.1:55839/user/singletonManager/singleton][INFO] [07/09/2017 20:17:43.653] [SingletonClusterSystem-akka.actor.default-dispatcher-19] http://www.cnblogs.com/tiger-xc/p/7144879.html