* * INTERNAL API. */ private[persistence] def limit(toSequenceNr: Long): SnapshotSelectionCriteria = if (toSequenceNr < maxSequenceNr) copy(maxSequenceNr = toSequenceNr) else this /** * INTERNAL API. */ http://www.cnblogs.com/tiger-xc/p/7215898.htmlLong,http://www.cnblogs.com/tiger-xc/p/7215898.html http://www.cnblogs.com/tiger-xc/p/7215898.html minSequenceNr: Long, minTimestamp: Long) = SnapshotSelectionCriteria(maxSequenceNr, maxTimestamp, minSequenceNr, minTimestamp) /** * Java API. */ def latest() = Latest /** * Java API. */ def none() = None }/** * Plugin API: a selected snapshot matching [[SnapshotSelectionCriteria]]. * * @param metadata snapshot metadata. * @param snapshot snapshot. */final case class SelectedSnapshot(metadata: SnapshotMetadata, snapshot: Any)object SelectedSnapshot { /** * Java API, Plugin API. */ def create(metadata: SnapshotMetadata, snapshot: Any): SelectedSnapshot = SelectedSnapshot(metadata, snapshot) }/** * INTERNAL API. * * Defines messages exchanged between persistent actors and a snapshot store. */private[persistence] object SnapshotProtocol { /** Marker trait shared by internal snapshot messages. */ sealed trait Message extends Protocol.Message /** Internal snapshot command. */ sealed trait Request extends Message /** Internal snapshot acknowledgement. */ sealed trait Response extends Message /** * Instructs a snapshot store to load a snapshot. * * @param persistenceId persistent actor id. * @param criteria criteria for selecting a snapshot from which recovery should start. * @param toSequenceNr upper sequence number bound (inclusive) for recovery. */ final case class LoadSnapshot(persistenceId: String, criteria: SnapshotSelectionCriteria, toSequenceNr: Long) extends Request /** * Response message to a [[LoadSnapshot]] message. * * @param snapshot loaded snapshot, if any. */ final case class LoadSnapshotResult(snapshot: Option[SelectedSnapshot], toSequenceNr: Long) extends Response /** * Reply message to a failed [[LoadSnapshot]] request. * @param cause failure cause. */ final case class LoadSnapshotFailed(cause: Throwable) extends Response /** * Instructs snapshot store to save a snapshot. * * @param metadata snapshot metadata. * @param snapshot snapshot. */ final case class SaveSnapshot(metadata: SnapshotMetadata, snapshot: Any) extends Request /** * Instructs snapshot store to delete a snapshot. * * @param metadata snapshot metadata. */ final case class DeleteSnapshot(metadata: SnapshotMetadata) extends Request /** * Instructs snapshot store to delete all snapshots that match `criteria`. * * @param persistenceId persistent actor id. * @param criteria criteria for selecting snapshots to be deleted. */ final case class DeleteSnapshots(persistenceId: String, criteria: SnapshotSelectionCriteria) extends Request }
好了,下面我們再分析一下這個事件來源模式(event-sourcing):首先對指令消息進(jìn)行驗(yàn)證,然后把指令轉(zhuǎn)換成事件,把事件寫入日志然后再更新內(nèi)部狀態(tài)。其中驗(yàn)證是為了保證該事件在更新內(nèi)部狀態(tài)時不會出現(xiàn)異常。這個驗(yàn)證必須在事件寫入日志前,否則造成異常的事件就會被寫入日志,就會在恢復(fù)時的事件重演中重復(fù)發(fā)生異常。我們用個例子來示范如何來設(shè)計指令消息到事件的轉(zhuǎn)換的:
import akka.actor._ import akka.persistence._object Calculator { sealed trait Command case class Operand(x: Int) extends Command case class Add(x: Int) extends Command case class Sub(x: Int) extends Command case class Mul(x: Int) extends Command case class Div(x: Int) extends Command case class ShowResult(x: Double) extends Command sealed trait Event case class SetNum(x: Int) extends Event case class Added(x: Int) extends Event case class Subtracted(x: Int) extends Event case class Multiplied(x: Int) extends Event case class Divided(x: Int) extends Event case class State(result: Int) { def updateState(evt: Event): State = evt match { case SetNum(x) => copy(result = x) case Added(x) => copy(result = this.result + x) case Subtracted(x) => copy(result = this.result - x) case Multiplied(x) => copy(result = this.result * x) case Divided(x) => copy(result = this.result / x) } } }class Calculator extends PersistentActor with ActorLogging { import Calculator._ var state: State = State(0) override def persistenceId: String = "persistence-actor" override def receiveCommand: Receive = { case Operand(x) => persist(SetNum(x)){evt => state = state.updateState(evt)} case Add(x) => persist(Added(x)){evt => state = state.updateState(evt)} case Sub(x) => persist(Subtracted(x)){evt => state = state.updateState(evt)} case Mul(x) => persist(Multiplied(x)){evt => state = state.updateState(evt)} case Divided(x) if (x != 0) => persist(Added(x)){evt => state = state.updateState(evt)} } override def receiveRecover: Receive = { case evt: Event => state = state.updateState(evt) case SnapshotOffer(_, sts: State) => state = sts.copy(sts.result) } }
以上代碼基本上進(jìn)行了Command和Event的直接對應(yīng)。這是一種比較直觀的關(guān)系對應(yīng)方式。我們注意到只有在收到Div(x)指令時才進(jìn)行了指令驗(yàn)證(x == 0)。因?yàn)檫@個例子比較簡單,所以我們可以肯定只有指令Div對狀態(tài)進(jìn)行施用時才有可能造成異常。這樣才能使我們比較直觀地進(jìn)行Command與Event關(guān)系對應(yīng)。假如對內(nèi)部狀態(tài)的更新涉及到一些非常復(fù)雜的算法,我們無法肯定哪個指令會產(chǎn)生異常,那我們只有先進(jìn)行運(yùn)算指令得出一個結(jié)果,然后直接替換狀態(tài),這個動作肯定是安全的了。按這個邏輯,我們把上面的例子調(diào)整一下:
import akka.actor._ import akka.persistence._object Calculator { sealed trait Command case class Operand(x: Int) extends Command case class Add(x: Int) extends Command case class Sub(x: Int) extends Command case class Mul(x: Int) extends Command case class Div(x: Int) extends Command case class ShowResult(x: Int) extends Command sealed trait Event case class SetResult(x: Int) extends Event def getResult(res: Int, cmd: Command): Int = cmd match { case Operand(x) => x case Add(x) => res + x case Sub(x) => res - x case Mul(x) => res * x case Div(x) => res / x case _ => 0 } case class State(result: Int) { def updateState(evt: Event): State = evt match { case SetResult(x) => copy(result = x) } } }class Calculator extends PersistentActor with ActorLogging { import Calculator._ var state: State = State(0) override def persistenceId: String = "persistence-actor" override def receiveCommand: Receive = { case opr: Operand => persist(SetResult(getResult(state.result,opr)))(evt => state = state.updateState(evt)) case add: Add => persist(SetResult(getResult(state.result,add)))(evt => state = state.updateState(evt)) case sub: Sub => persist(SetResult(getResult(state.result,sub)))(evt => state = state.updateState(evt)) case mul: Mul => persist(SetResult(getResult(state.result,mul)))(evt => state = state.updateState(evt)) case div: Div => persist(SetResult(getResult(state.result,div)))(evt => state = state.updateState(evt)) } override def onPersistFailure(cause: Throwable, event: Any, seqNr: Long): Unit = { log.info(s"Persistence Error: ${cause.getMessage}") } override def receiveRecover: Receive = { case evt: Event => state = state.updateState(evt) case SnapshotOffer(_, sts: State) => state = sts.copy(sts.result) } }
現(xiàn)在在寫入日志前進(jìn)行了驗(yàn)證。注意如果驗(yàn)證結(jié)果異??梢栽趏nPersistFailure中處理。
下面我們就在Calculator里增加一些消息發(fā)布機(jī)制把恢復(fù)過程、消息處理過程和當(dāng)前狀態(tài)都發(fā)布出去:
class Calculator extends PersistentActor with ActorLogging { import Calculator._ var state: State = State(0) override def persistenceId: String = "persistence-actor" val snapShotInterval = 5 override def receiveCommand: Receive = { case Operand(x) => persist(SetNum(x))(handleEvent) case Add(x) => persist(Added(x))(handleEvent) case Sub(x) => persist(Subtracted(x))(handleEvent) case Mul(x) => persist(Multiplied(x))(handleEvent) case Div(x) if (x != 0) => persist(Divided(x))(handleEvent) case ShowResult => context.system.eventStream.publish(LogMessage(s"Current state: $state")) case BackupResult => saveSnapshot(state) context.system.eventStream.publish(LogMessage(s"Manual saving snapshot: $state")) case SaveSnapshotSuccess(metadata) => context.system.eventStream.publish(LogMessage(s"Successfully saved state: $state")) case SaveSnapshotFailure(metadata, reason) => context.system.eventStream.publish(LogMessage(s"Saving state: $state failed!")) } def handleEvent(evt: Event) = { //update state and publish progress state = state.updateState(evt) context.system.eventStream.publish(LogMessage(s"Logged event: $evt")) if (lastSequenceNr % snapShotInterval == 0 && lastSequenceNr != 0) { saveSnapshot(state) context.system.eventStream.publish(LogMessage(s"Saving snapshot: $state after $snapShotInterval events")) } } override def receiveRecover: Receive = { case evt: Event => { state = state.updateState(evt) context.system.eventStream.publish(LogMessage(s"Restoring event: $evt")) } case SnapshotOffer(mdata, sts: State) => { state = sts.copy(sts.result) context.system.eventStream.publish(LogMessage(s"Restoring snapshot: $mdata")) } case RecoveryCompleted => log.info(s"Recovery completed with starting state: $state") } override def onPersistRejected(cause: Throwable, event: Any, seqNr: Long): Unit = { log.info(s"Persistence Rejected: ${cause.getMessage}") } override def onPersistFailure(cause: Throwable, event: Any, seqNr: Long): Unit = { log.info(s"Persistence Error: ${cause.getMessage}") } }
發(fā)布消息內(nèi)容已經(jīng)可以說明每段代碼的作用。
下面是測試運(yùn)算Calculator的代碼:
package persistence.demo import akka.actor._ import persistence.calculator.Calculator import persistence.tracker.EventTrackerobject persistenceDemo extends App { val persistenceSystem = ActorSystem("persistenceSystem")
http://www.cnblogs.com/tiger-xc/p/7215898.html