通過(guò)上篇關(guān)于Cluster-Singleton的介紹,我們了解了Akka為分布式程序提供的編程支持:基于消息驅(qū)動(dòng)的運(yùn)算模式特別適合分布式程序編程,我們不需要特別的努力,只需要按照普通的Actor編程方式就可以實(shí)現(xiàn)集群分布式程序了。Cluster-Singleton可以保證無(wú)論集群節(jié)點(diǎn)出了任何問(wèn)題,只要集群中還有節(jié)點(diǎn)在線(xiàn),都可以持續(xù)的安全運(yùn)算。Cluster-Singleton這種模式保證了某種Actor的唯一實(shí)例可以安全穩(wěn)定地在集群環(huán)境下運(yùn)行。還有一種情況就是如果有許多特別占用資源的Actor需要同時(shí)運(yùn)行,而這些Actor同時(shí)占用的資源遠(yuǎn)遠(yuǎn)超過(guò)一臺(tái)服務(wù)器的容量,如此我們必須把這些Actor分布到多臺(tái)服務(wù)器上,或者是一個(gè)由多臺(tái)服務(wù)器組成的集群環(huán)境,這時(shí)就需要Cluster-Sharding模式來(lái)幫助解決這樣的問(wèn)題了。

我把通過(guò)使用Cluster-Sharding后達(dá)到的一些目的和大家分享一下,大家一起來(lái)分析分析到底這些達(dá)成的目標(biāo)里是否包括了Actor在集群節(jié)點(diǎn)間的分布:

首先我有個(gè)Actor,它的名稱(chēng)是一個(gè)自編碼,由Cluster-Sharding在集群中某個(gè)節(jié)點(diǎn)上構(gòu)建。由于在一個(gè)集群環(huán)境里所以這個(gè)Actor到底在哪個(gè)節(jié)點(diǎn)上,具體地址是什么我都不知道,我只需要用這個(gè)自編碼就可以和它溝通。如果我有許多自編碼的消耗資源的Actor,我可以通過(guò)自編碼中的分片(shard)編號(hào)來(lái)指定在其它的分片(shard)里構(gòu)建這些Actor。Akka-Cluster還可以根據(jù)整個(gè)集群中節(jié)點(diǎn)的增減按當(dāng)前集群節(jié)點(diǎn)情況進(jìn)行分片在集群節(jié)點(diǎn)調(diào)動(dòng)來(lái)重新配載(rebalance),包括在某些節(jié)點(diǎn)因故脫離集群時(shí)把節(jié)點(diǎn)上的所有Actor在其它在線(xiàn)節(jié)點(diǎn)上重新構(gòu)建。這樣看來(lái),這個(gè)Actor的自編碼應(yīng)該是Cluster-Sharding的應(yīng)用核心元素了。按慣例我們還是用例子來(lái)示范Cluster-Sharding的使用。我們需要分片(sharding)的Actor就是前幾篇討論里提到的Calculator:

photoshop培訓(xùn),電腦培訓(xùn),電腦維修培訓(xùn),移動(dòng)軟件開(kāi)發(fā)培訓(xùn),網(wǎng)站設(shè)計(jì)培訓(xùn),網(wǎng)站建設(shè)培訓(xùn)

package clustersharding.entity

import akka.actor._
import akka.cluster._
import akka.persistence._
import scala.concurrent.duration._
import akka.cluster.sharding._object Calculator {  sealed trait Command  case class Num(d: Double) extends Command  case class Add(d: Double) extends Command  case class Sub(d: Double) extends Command  case class Mul(d: Double) extends Command  case class Div(d: Double) extends Command  case object ShowResult extends Command  sealed trait Event  case class SetResult(d: Any) extends Event

  def getResult(res: Double, cmd: Command) = cmd match {    case Num(x) => x    case Add(x) => res + x    case Sub(x) => res - x    case Mul(x) => res * x    case Div(x) => {
      val _ = res.toInt / x.toInt //yield ArithmeticException when /0.00
      res / x
    }    case _ => new ArithmeticException("Invalid Operation!")
  }  case class State(result: Double) {

    def updateState(evt: Event): State = evt match {      case SetResult(n) => copy(result = n.asInstanceOf[Double])
    }
  }  case object Disconnect extends Command    //exit cluster
  def props = Props(new Calcultor)

}class Calcultor extends PersistentActor with ActorLogging {
  import Calculator._
  val cluster = Cluster(context.system)  var state: State = State(0)  override def persistenceId: String = self.path.parent.name+"-"+self.path.name  override def receiveRecover: Receive = {    case evt: Event => state = state.updateState(evt)    case SnapshotOffer(_,st: State) => state = state.copy(result =  st.result)
  }  override def receiveCommand: Receive = {    case Num(n) => persist(SetResult(getResult(state.result,Num(n))))(evt => state = state.updateState(evt))    case Add(n) => persist(SetResult(getResult(state.result,Add(n))))(evt => state = state.updateState(evt))    case Sub(n) => persist(SetResult(getResult(state.result,Sub(n))))(evt => state = state.updateState(evt))    case Mul(n) => persist(SetResult(getResult(state.result,Mul(n))))(evt => state = state.updateState(evt))    case Div(n) => persist(SetResult(getResult(state.result,Div(n))))(evt => state = state.updateState(evt))    case ShowResult => log.info(s"Result on ${cluster.selfAddress.hostPort} is: ${state.result}")    case Disconnect =>
      log.info(s"${cluster.selfAddress} is leaving cluster!!!")
      cluster.leave (cluster.selfAddress)

  }  override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
    log.info(s"Restarting calculator: ${reason.getMessage}")
    super.preRestart(reason, message)
  }
}class CalcSupervisor extends Actor {
  def decider: PartialFunction[Throwable,SupervisorStrategy.Directive] = {    case _: ArithmeticException => SupervisorStrategy.Resume
  }  override def supervisorStrategy: SupervisorStrategy =
    OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds){
      decider.orElse(SupervisorStrategy.defaultDecider)
    }
  val calcActor = context.actorOf(Calculator.props,"calculator")  override def receive: Receive = {    case msg@ _ => calcActor.forward(msg)
  }

}

photoshop培訓(xùn),電腦培訓(xùn),電腦維修培訓(xùn),移動(dòng)軟件開(kāi)發(fā)培訓(xùn),網(wǎng)站設(shè)計(jì)培訓(xùn),網(wǎng)站建設(shè)培訓(xùn)

我們看到:Calculator是一個(gè)普通的PersisitentActor,內(nèi)部狀態(tài)可以實(shí)現(xiàn)持久化,Actor重啟時(shí)可以恢復(fù)狀態(tài)。CalcSupervisor是Calculator的監(jiān)管,這樣做是為了實(shí)現(xiàn)新的監(jiān)管策略SupervisorStrategy。

Calculator就是我們準(zhǔn)備集群分片(sharding)的目標(biāo)enitity。一種Actor的分片是通過(guò)Akka的Cluster-Sharding的ClusterSharding.start方法在集群中構(gòu)建的。我們需要在所有將承載分片的節(jié)點(diǎn)上運(yùn)行這個(gè)方法來(lái)部署分片:

photoshop培訓(xùn),電腦培訓(xùn),電腦維修培訓(xùn),移動(dòng)軟件開(kāi)發(fā)培訓(xùn),網(wǎng)站設(shè)計(jì)培訓(xùn),網(wǎng)站建設(shè)培訓(xùn)

/**
   * Register a named entity type by defining the [[akka.actor.Props]] of the entity actor and
   * functions to extract entity and shard identifier from messages. The [[ShardRegion]] actor
   * for this type can later be retrieved with the [[#shardRegion]] method.
   *
   * The default shard allocation strategy [[ShardCoordinator.LeastShardAllocationStrategy]]
   * is used. [[akka.actor.PoisonPill]] is used as `handOffStopMessage`.
   *
   * Some settings can be configured as described in the `akka.cluster.sharding` section
   * of the `reference.conf`.
   *
   * @param typeName the name of the entity type
   * @param entityProps the `Props` of the entity actors that will be created by the `ShardRegion`
   * @param settings configuration settings, see [[ClusterShardingSettings]]
   * @param extractEntityId partial function to extract the entity id and the message to send to the
   *   entity from the incoming message, if the partial function does not match the message will
   *   be `unhandled`, i.e. posted as `Unhandled` messages on the event stream
   * @param extractShardId function to determine the shard id for an incoming message, only messages
   *   that passed the `extractEntityId` will be used
   * @return the actor ref of the [[ShardRegion]] that is to be responsible for the shard   */
  def start(
    typeName:        String,
    entityProps:     Props,
    settings:        ClusterShardingSettings,
    extractEntityId: ShardRegion.ExtractEntityId,
    extractShardId:  ShardRegion.ExtractShardId): ActorRef = {

    val allocationStrategy = new LeastShardAllocationStrategy(
      settings.tuningParameters.leastShardAllocationRebalanceThreshold,
      settings.tuningParameters.leastShardAllocationMaxSimultaneousRebalance)

    start(typeName, entityProps, settings, extractEntityId, extractShardId, allocationStrategy, PoisonPill)
  }

photoshop培訓(xùn),電腦培訓(xùn),電腦維修培訓(xùn),移動(dòng)軟件開(kāi)發(fā)培訓(xùn),網(wǎng)站設(shè)計(jì)培訓(xùn),網(wǎng)站建設(shè)培訓(xùn)

start返回了ShardRegion,是個(gè)ActorRef類(lèi)型。ShardRegion是一個(gè)特殊的Actor,負(fù)責(zé)管理可能多個(gè)分片(shard)內(nèi)稱(chēng)為Entity的Actor實(shí)例。這些分片可能是分布在不同的集群節(jié)點(diǎn)上的,外界通過(guò)ShardRegion與其轄下Entities溝通。從start函數(shù)參數(shù)entityProps我們看到:每個(gè)分片中只容許一個(gè)種類(lèi)的Actor;具體的Entity實(shí)例是由另一個(gè)內(nèi)部Actor即shard構(gòu)建的,shard可以在一個(gè)分片中構(gòu)建多個(gè)Entity實(shí)例。多shard多entity的特性可以從extractShardId,extractEntityId這兩個(gè)方法中得到一些信息。我們說(shuō)過(guò)Actor自編碼即entity-id是Cluster-Sharding的核心元素。在entity-id這個(gè)自編碼中還包含了shard-id,所以用戶(hù)可以通過(guò)entity-id的編碼規(guī)則來(lái)設(shè)計(jì)整個(gè)分片系統(tǒng)包括每個(gè)ShardRegion下shard和entity的數(shù)量。當(dāng)ShardRegion得到一個(gè)entity-id后,首先從中抽取shard-id,如果shard-id在集群中不存在的話(huà)就按集群各節(jié)點(diǎn)負(fù)載情況在其中一個(gè)節(jié)點(diǎn)上構(gòu)建新的shard;然后再用entity-id在shard-id分片中查找entity,如果不存在就構(gòu)建一個(gè)新的entity實(shí)例。整個(gè)shard和entity的構(gòu)建過(guò)程都是通過(guò)用戶(hù)提供的函數(shù)extractShardId和extractEntityId實(shí)現(xiàn)的,Cluster-Sharding就是通過(guò)這兩個(gè)函數(shù)按用戶(hù)的要求來(lái)構(gòu)建和使用shard和entity的。這個(gè)自編碼無(wú)需按一定的順序,只需要保證唯一性。下面是一個(gè)編碼例子:

photoshop培訓(xùn),電腦培訓(xùn),電腦維修培訓(xùn),移動(dòng)軟件開(kāi)發(fā)培訓(xùn),網(wǎng)站設(shè)計(jì)培訓(xùn),網(wǎng)站建設(shè)培訓(xùn)

object CalculatorShard {
  import Calculator._  case class CalcCommands(eid: String, msg: Command)  //user should use it to talk to shardregion
  val shardName = "calcShard"
  val getEntityId: ShardRegion.ExtractEntityId = {    case CalcCommands(id,msg) => (id,msg)
  }
  val getShardId: ShardRegion.ExtractShardId = {    case CalcCommands(id,_) => id.head.toString
  }
  def entityProps = Props(new CalcSupervisor)
}

photoshop培訓(xùn),電腦培訓(xùn),電腦維修培訓(xùn),移動(dòng)軟件開(kāi)發(fā)培訓(xùn),網(wǎng)站設(shè)計(jì)培訓(xùn),網(wǎng)站建設(shè)培訓(xùn)

用戶(hù)是用CalcCommands與ShardRegion溝通的。這是一個(gè)專(zhuān)門(mén)為與分片系統(tǒng)溝通而設(shè)的包嵌消息類(lèi)型,包嵌的信息里除了Calculator正常支持的Command消息外,還包括了目標(biāo)Entity實(shí)例的編號(hào)eid。這個(gè)eid的第一個(gè)字節(jié)代表shard-id,這樣我們可以直接指定目標(biāo)entity所在分片或者隨意任選一個(gè)shard-id如:Random.NextInt(9).toString。由于每個(gè)分片只含一種類(lèi)型的Actor,不同的entity-id代表多個(gè)同類(lèi)Actor實(shí)例的同時(shí)存在,就像前面討論的Router一樣:所有實(shí)例針對(duì)不同的輸入進(jìn)行相同功能的運(yùn)算處理。一般來(lái)說(shuō)用戶(hù)會(huì)通過(guò)某種算法任意產(chǎn)生entity-id,希望能做到各分片中entity的均衡部署,Cluster-Sharding可以根據(jù)具體的集群負(fù)載情況自動(dòng)調(diào)整分片在集群節(jié)點(diǎn)層面上的部署。

下面的代碼示范了如何在一個(gè)集群節(jié)點(diǎn)上部署分片:

photoshop培訓(xùn),電腦培訓(xùn),電腦維修培訓(xùn),移動(dòng)軟件開(kāi)發(fā)培訓(xùn),網(wǎng)站設(shè)計(jì)培訓(xùn),網(wǎng)站建設(shè)培訓(xùn)

package clustersharding.shard
import akka.persistence.journal.leveldb._
import akka.actor._
import akka.cluster.sharding._
import com.typesafe.config.ConfigFactory
import akka.util.Timeout
import scala.concurrent.duration._
import akka.pattern._
import clustersharding.entity.CalculatorShardobject CalcShards {
  def create(port: Int) = {
    val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=${port}")
      .withFallback(ConfigFactory.load("sharding"))    // Create an Akka system
    val system = ActorSystem("ShardingSystem", config)

    startupSharding(port,system)

  }

  def startupSharedJournal(system: ActorSystem, startStore: Boolean, path: ActorPath): Unit = {    // Start the shared journal one one node (don't crash this SPOF)    // This will not be needed with a distributed journal
    if (startStore)
      system.actorOf(Props[SharedLeveldbStore], "store")    // register the shared journal    import system.dispatcher    implicit val timeout = Timeout(15.seconds)
    val f = (system.actorSelection(path) ? Identify(None))
    f.onSuccess {      case ActorIdentity(_, Some(ref)) =>
        SharedLeveldbJournal.setStore(ref, system)      case _ =>
        system.log.error("Shared journal not started at {}", path)
        system.terminate()
    }
    f.onFailure {      case _ =>
        system.log.error("Lookup of shared journal at {} timed out", path)
        system.terminate()
    }
  }

  def startupSharding(port: Int, system: ActorSystem) = {

    startupSharedJournal(system, startStore = (port == 2551), path =
      ActorPath.fromString("akka.tcp://ShardingSystem@127.0.0.1:2551/user/store"))

    ClusterSharding(system).start(
      typeName = CalculatorShard.shardName,
      entityProps = CalculatorShard.entityProps,
      settings = ClusterShardingSettings(system),
      extractEntityId = CalculatorShard.getEntityId,
      extractShardId = CalculatorShard.getShardId
    )

  }

}

photoshop培訓(xùn),電腦培訓(xùn),電腦維修培訓(xùn),移動(dòng)軟件開(kāi)發(fā)培訓(xùn),網(wǎng)站設(shè)計(jì)培訓(xùn),網(wǎng)站建設(shè)培訓(xùn)

具體的部署代碼在startupSharding方法里。下面這段代碼示范了如何使用分片里的entity:

photoshop培訓(xùn),電腦培訓(xùn),電腦維修培訓(xùn),移動(dòng)軟件開(kāi)發(fā)培訓(xùn),網(wǎng)站設(shè)計(jì)培訓(xùn),網(wǎng)站建設(shè)培訓(xùn)http://www.cnblogs.com/tiger-xc/p/7170127.html