*
   * INTERNAL API.   */
  private[persistence] def limit(toSequenceNr: Long): SnapshotSelectionCriteria =    if (toSequenceNr < maxSequenceNr) copy(maxSequenceNr = toSequenceNr) else this

  /**
   * INTERNAL API.   */ http://www.cnblogs.com/tiger-xc/p/7215898.htmlLong,http://www.cnblogs.com/tiger-xc/p/7215898.html
  
  
  
  
  
  http://www.cnblogs.com/tiger-xc/p/7215898.html
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
             minSequenceNr: Long, minTimestamp: Long) =
    SnapshotSelectionCriteria(maxSequenceNr, maxTimestamp, minSequenceNr, minTimestamp)  /**
   * Java API.   */
  def latest() = Latest  /**
   * Java API.   */
  def none() = None
}/**
 * Plugin API: a selected snapshot matching [[SnapshotSelectionCriteria]].
 *
 * @param metadata snapshot metadata.
 * @param snapshot snapshot. */final case class SelectedSnapshot(metadata: SnapshotMetadata, snapshot: Any)object SelectedSnapshot {  /**
   * Java API, Plugin API.   */
  def create(metadata: SnapshotMetadata, snapshot: Any): SelectedSnapshot =
    SelectedSnapshot(metadata, snapshot)
}/**
 * INTERNAL API.
 *
 * Defines messages exchanged between persistent actors and a snapshot store. */private[persistence] object SnapshotProtocol {  /** Marker trait shared by internal snapshot messages. */
  sealed trait Message extends Protocol.Message  /** Internal snapshot command. */
  sealed trait Request extends Message  /** Internal snapshot acknowledgement. */
  sealed trait Response extends Message  /**
   * Instructs a snapshot store to load a snapshot.
   *
   * @param persistenceId persistent actor id.
   * @param criteria criteria for selecting a snapshot from which recovery should start.
   * @param toSequenceNr upper sequence number bound (inclusive) for recovery.   */
  final case class LoadSnapshot(persistenceId: String, criteria: SnapshotSelectionCriteria, toSequenceNr: Long)
    extends Request  /**
   * Response message to a [[LoadSnapshot]] message.
   *
   * @param snapshot loaded snapshot, if any.   */
  final case class LoadSnapshotResult(snapshot: Option[SelectedSnapshot], toSequenceNr: Long)
    extends Response  /**
   * Reply message to a failed [[LoadSnapshot]] request.
   * @param cause failure cause.   */
  final case class LoadSnapshotFailed(cause: Throwable) extends Response  /**
   * Instructs snapshot store to save a snapshot.
   *
   * @param metadata snapshot metadata.
   * @param snapshot snapshot.   */
  final case class SaveSnapshot(metadata: SnapshotMetadata, snapshot: Any)
    extends Request  /**
   * Instructs snapshot store to delete a snapshot.
   *
   * @param metadata snapshot metadata.   */
  final case class DeleteSnapshot(metadata: SnapshotMetadata)
    extends Request  /**
   * Instructs snapshot store to delete all snapshots that match `criteria`.
   *
   * @param persistenceId persistent actor id.
   * @param criteria criteria for selecting snapshots to be deleted.   */
  final case class DeleteSnapshots(persistenceId: String, criteria: SnapshotSelectionCriteria)
    extends Request
}

大學(xué)生就業(yè)培訓(xùn),高中生培訓(xùn),在職人員轉(zhuǎn)行培訓(xùn),企業(yè)團(tuán)訓(xùn)

好了,下面我們再分析一下這個事件來源模式(event-sourcing):首先對指令消息進(jìn)行驗(yàn)證,然后把指令轉(zhuǎn)換成事件,把事件寫入日志然后再更新內(nèi)部狀態(tài)。其中驗(yàn)證是為了保證該事件在更新內(nèi)部狀態(tài)時不會出現(xiàn)異常。這個驗(yàn)證必須在事件寫入日志前,否則造成異常的事件就會被寫入日志,就會在恢復(fù)時的事件重演中重復(fù)發(fā)生異常。我們用個例子來示范如何來設(shè)計指令消息到事件的轉(zhuǎn)換的:

大學(xué)生就業(yè)培訓(xùn),高中生培訓(xùn),在職人員轉(zhuǎn)行培訓(xùn),企業(yè)團(tuán)訓(xùn)

import akka.actor._
import akka.persistence._object Calculator {  sealed trait Command  case class Operand(x: Int) extends Command  case class Add(x: Int) extends Command  case class Sub(x: Int) extends Command  case class Mul(x: Int) extends Command  case class Div(x: Int) extends Command  case class ShowResult(x: Double) extends Command  sealed trait Event  case class SetNum(x: Int) extends Event  case class Added(x: Int) extends Event  case class Subtracted(x: Int) extends Event  case class Multiplied(x: Int) extends Event  case class Divided(x: Int) extends Event  case class State(result: Int) {
    def updateState(evt: Event): State = evt match {      case SetNum(x) => copy(result = x)      case Added(x) => copy(result = this.result + x)      case Subtracted(x) => copy(result = this.result - x)      case Multiplied(x) => copy(result = this.result * x)      case Divided(x) => copy(result = this.result / x)
    }
  }
}class Calculator extends PersistentActor with ActorLogging {
import Calculator._  var state: State = State(0)  override def persistenceId: String = "persistence-actor"
  override def receiveCommand: Receive = {    case Operand(x) =>
      persist(SetNum(x)){evt => state = state.updateState(evt)}    case Add(x) =>
      persist(Added(x)){evt => state = state.updateState(evt)}    case Sub(x) =>
      persist(Subtracted(x)){evt => state = state.updateState(evt)}    case Mul(x) =>
      persist(Multiplied(x)){evt => state = state.updateState(evt)}    case Divided(x) if (x != 0) =>
      persist(Added(x)){evt => state = state.updateState(evt)}
  }  override def receiveRecover: Receive = {    case evt: Event => state = state.updateState(evt)    case SnapshotOffer(_, sts: State) => state = sts.copy(sts.result)
  }
}

大學(xué)生就業(yè)培訓(xùn),高中生培訓(xùn),在職人員轉(zhuǎn)行培訓(xùn),企業(yè)團(tuán)訓(xùn)

以上代碼基本上進(jìn)行了Command和Event的直接對應(yīng)。這是一種比較直觀的關(guān)系對應(yīng)方式。我們注意到只有在收到Div(x)指令時才進(jìn)行了指令驗(yàn)證(x == 0)。因?yàn)檫@個例子比較簡單,所以我們可以肯定只有指令Div對狀態(tài)進(jìn)行施用時才有可能造成異常。這樣才能使我們比較直觀地進(jìn)行Command與Event關(guān)系對應(yīng)。假如對內(nèi)部狀態(tài)的更新涉及到一些非常復(fù)雜的算法,我們無法肯定哪個指令會產(chǎn)生異常,那我們只有先進(jìn)行運(yùn)算指令得出一個結(jié)果,然后直接替換狀態(tài),這個動作肯定是安全的了。按這個邏輯,我們把上面的例子調(diào)整一下: 

大學(xué)生就業(yè)培訓(xùn),高中生培訓(xùn),在職人員轉(zhuǎn)行培訓(xùn),企業(yè)團(tuán)訓(xùn)

import akka.actor._
import akka.persistence._object Calculator {  sealed trait Command  case class Operand(x: Int) extends Command  case class Add(x: Int) extends Command  case class Sub(x: Int) extends Command  case class Mul(x: Int) extends Command  case class Div(x: Int) extends Command  case class ShowResult(x: Int) extends Command  sealed trait Event  case class SetResult(x: Int) extends Event

  def getResult(res: Int, cmd: Command): Int = cmd match {    case Operand(x) => x    case Add(x) => res + x    case Sub(x) => res - x    case Mul(x) => res * x    case Div(x) => res / x    case _ => 0
  }  case class State(result: Int) {
    def updateState(evt: Event): State = evt match {      case SetResult(x) => copy(result = x)
    }
  }
}class Calculator extends PersistentActor with ActorLogging {
import Calculator._  var state: State = State(0)  override def persistenceId: String = "persistence-actor"
  override def receiveCommand: Receive = {    case opr: Operand =>
      persist(SetResult(getResult(state.result,opr)))(evt => state = state.updateState(evt))    case add: Add =>
      persist(SetResult(getResult(state.result,add)))(evt => state = state.updateState(evt))    case sub: Sub =>
      persist(SetResult(getResult(state.result,sub)))(evt => state = state.updateState(evt))    case mul: Mul =>
      persist(SetResult(getResult(state.result,mul)))(evt => state = state.updateState(evt))    case div: Div =>
      persist(SetResult(getResult(state.result,div)))(evt => state = state.updateState(evt))

  }  override def onPersistFailure(cause: Throwable, event: Any, seqNr: Long): Unit = {
    log.info(s"Persistence Error: ${cause.getMessage}")
  }  override def receiveRecover: Receive = {    case evt: Event => state = state.updateState(evt)    case SnapshotOffer(_, sts: State) => state = sts.copy(sts.result)
  }
}

大學(xué)生就業(yè)培訓(xùn),高中生培訓(xùn),在職人員轉(zhuǎn)行培訓(xùn),企業(yè)團(tuán)訓(xùn)

現(xiàn)在在寫入日志前進(jìn)行了驗(yàn)證。注意如果驗(yàn)證結(jié)果異??梢栽趏nPersistFailure中處理。

下面我們就在Calculator里增加一些消息發(fā)布機(jī)制把恢復(fù)過程、消息處理過程和當(dāng)前狀態(tài)都發(fā)布出去:

大學(xué)生就業(yè)培訓(xùn),高中生培訓(xùn),在職人員轉(zhuǎn)行培訓(xùn),企業(yè)團(tuán)訓(xùn)

class Calculator extends PersistentActor with ActorLogging {
  import Calculator._  var state: State = State(0)  override def persistenceId: String = "persistence-actor"
  val snapShotInterval = 5
  override def receiveCommand: Receive = {    case Operand(x) => persist(SetNum(x))(handleEvent)    case Add(x) => persist(Added(x))(handleEvent)    case Sub(x) => persist(Subtracted(x))(handleEvent)    case Mul(x) => persist(Multiplied(x))(handleEvent)    case Div(x) if (x != 0) => persist(Divided(x))(handleEvent)    case ShowResult =>
      context.system.eventStream.publish(LogMessage(s"Current state: $state"))    case BackupResult =>
      saveSnapshot(state)
      context.system.eventStream.publish(LogMessage(s"Manual saving snapshot: $state"))    case SaveSnapshotSuccess(metadata) =>
      context.system.eventStream.publish(LogMessage(s"Successfully saved state: $state"))    case SaveSnapshotFailure(metadata, reason) =>
      context.system.eventStream.publish(LogMessage(s"Saving state: $state failed!"))

  }
  def handleEvent(evt: Event) = {   //update state and publish progress
    state = state.updateState(evt)
    context.system.eventStream.publish(LogMessage(s"Logged event: $evt"))    if (lastSequenceNr % snapShotInterval == 0 && lastSequenceNr != 0) {
      saveSnapshot(state)
      context.system.eventStream.publish(LogMessage(s"Saving snapshot: $state after $snapShotInterval events"))
    }
  }  override def receiveRecover: Receive = {    case evt: Event => {
      state = state.updateState(evt)
      context.system.eventStream.publish(LogMessage(s"Restoring event: $evt"))
    }    case SnapshotOffer(mdata, sts: State) => {
      state = sts.copy(sts.result)
      context.system.eventStream.publish(LogMessage(s"Restoring snapshot: $mdata"))
    }    case RecoveryCompleted => log.info(s"Recovery completed with starting state: $state")
  }  override def onPersistRejected(cause: Throwable, event: Any, seqNr: Long): Unit = {
    log.info(s"Persistence Rejected: ${cause.getMessage}")
  }  override def onPersistFailure(cause: Throwable, event: Any, seqNr: Long): Unit = {
    log.info(s"Persistence Error: ${cause.getMessage}")
  }
}

大學(xué)生就業(yè)培訓(xùn),高中生培訓(xùn),在職人員轉(zhuǎn)行培訓(xùn),企業(yè)團(tuán)訓(xùn)

發(fā)布消息內(nèi)容已經(jīng)可以說明每段代碼的作用。

下面是測試運(yùn)算Calculator的代碼:

大學(xué)生就業(yè)培訓(xùn),高中生培訓(xùn),在職人員轉(zhuǎn)行培訓(xùn),企業(yè)團(tuán)訓(xùn)

package persistence.demo
import akka.actor._
import persistence.calculator.Calculator
import persistence.tracker.EventTrackerobject persistenceDemo extends App {
  val persistenceSystem = ActorSystem("persistenceSystem")

http://www.cnblogs.com/tiger-xc/p/7215898.html