(本實例都是使用的Net的客戶端,使用C#編寫)

  在第二個教程中,我們學習了如何使用工作隊列在多個工作實例之間分配耗時的任務(wù)。

  但是,如果我們需要在遠程計算機上運行功能并等待結(jié)果怎么辦? 那是一個不同的故事。 此模式通常稱為遠程過程調(diào)用或RPC。

  在本教程中,我們將使用RabbitMQ構(gòu)建一個RPC系統(tǒng):一個客戶機和一個可擴展的RPC服務(wù)器。 由于我們沒有任何值得分發(fā)的耗時任務(wù),我們將創(chuàng)建一個返回斐波納契數(shù)字的虛擬RPC服務(wù)。

1、客戶端接口【Client Interface】

  為了說明如何使用RPC服務(wù),我們將創(chuàng)建一個簡單的客戶端類。 它將公開一個名為call的方法,該方法發(fā)送RPC請求并阻塞,直到接收到答案:

photoshop培訓,電腦培訓,電腦維修培訓,移動軟件開發(fā)培訓,網(wǎng)站設(shè)計培訓,網(wǎng)站建設(shè)培訓

var rpcClient = new RPCClient();

Console.WriteLine(" [x] Requesting fib(30)");var response = rpcClient.Call("30");
Console.WriteLine(" [.] Got '{0}'", response);

rpcClient.Close();

photoshop培訓,電腦培訓,電腦維修培訓,移動軟件開發(fā)培訓,網(wǎng)站設(shè)計培訓,網(wǎng)站建設(shè)培訓


   關(guān)于RPC的注釋

   雖然RPC是一個很常見的計算模式,但它經(jīng)常被批評。 當系統(tǒng)出現(xiàn)問題的時候,程序員不知道函數(shù)調(diào)用是本地函數(shù)還是緩慢的RPC調(diào)用,這樣的混亂導致了系統(tǒng)的不可預測性,并增加了調(diào)試的復雜性。 濫用RPC可能導致代碼的可維護性很差,這樣的設(shè)計不但沒有簡化軟件,而且只會是系統(tǒng)更糟。

   銘記這一點,請考慮以下建議:

     確保顯而易見哪個函數(shù)調(diào)用是本地的,哪個是遠程的。
     記錄您的系統(tǒng)。 使組件之間的依賴關(guān)系清除。
     處理錯誤情況。 當RPC服務(wù)器停機很長時間后,客戶端應(yīng)該如何反應(yīng)?

    當有疑問避免RPC。 如果可以的話,您應(yīng)該使用異步管道 - 而不是類似RPC的阻塞,將異步推送到下一個計算階段。

2、回調(diào)隊列【Callback queue】
  
   一般來說RPC對RabbitMQ來說很容易。 客戶端發(fā)送請求消息,服務(wù)器回復一條響應(yīng)消息。 為了收到一個響應(yīng),我們需要發(fā)送一個請求向'回調(diào)'的隊列地址:

photoshop培訓,電腦培訓,電腦維修培訓,移動軟件開發(fā)培訓,網(wǎng)站設(shè)計培訓,網(wǎng)站建設(shè)培訓

var corrId = Guid.NewGuid().ToString();var props = channel.CreateBasicProperties();
props.ReplyTo = replyQueueName;
props.CorrelationId = corrId;var messageBytes = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "",
                     routingKey: "rpc_queue",
                     basicProperties: props,
                     body: messageBytes);// ... then code to read a response message from the callback_queue ...

photoshop培訓,電腦培訓,電腦維修培訓,移動軟件開發(fā)培訓,網(wǎng)站設(shè)計培訓,網(wǎng)站建設(shè)培訓


消息屬性

  AMQP 0-9-1協(xié)議預先定義了一組14個隨附消息的屬性。 大多數(shù)屬性很少使用,除了以下內(nèi)容:

  deliveryMode:將消息標記為persistent(值為2)或transient(任何其他值)。 您可能會從第二個教程中記住此屬性。
  contentType:用于描述mime類型的編碼。 例如對于經(jīng)常使用的JSON編碼,將此屬性設(shè)置為:application / json是一個很好的做法。
  replyTo:通常用來命名一個回調(diào)隊列。
  correlationId:用于將RPC響應(yīng)與請求相關(guān)聯(lián)。

3、相關(guān)標識【Correlation Id】

  在上面所提出的方法中,我們建議為每個RPC請求創(chuàng)建一個回調(diào)隊列。這是非常低效的,但幸運的是有一個更好的方法 - 讓我們?yōu)槊總€客戶端創(chuàng)建一個回調(diào)隊列。

  這將引發(fā)了一個新問題,在該隊列中收到響應(yīng),響應(yīng)所屬的請求是不知道的。此時正是使用correlationId屬性的時候。我們將為每個請求設(shè)置一個唯一的值。稍后,當我們在回調(diào)隊列中收到一條消息時,我們將查看此屬性,并且基于此,我們將能夠?qū)㈨憫?yīng)與請求相匹配。如果我們看到一個未知的correlationId值,我們可以安全地丟棄該消息 - 它不屬于我們的請求。

  您可能會問,為什么我們應(yīng)該忽略回調(diào)隊列中的未知消息,而不是出現(xiàn)錯誤?這是由于服務(wù)器端可能出現(xiàn)競爭情況。雖然不太可能,RPC服務(wù)器可能會在發(fā)送答復之后,但在發(fā)送請求的確認消息之前死亡。如果發(fā)生這種情況,重新啟動的RPC服務(wù)器將再次處理該請求。這就是為什么在客戶端上,我們必須優(yōu)雅地處理這些重復的響應(yīng),并且RPC應(yīng)該理想地是冪等的。

4、概要【Summary】

 photoshop培訓,電腦培訓,電腦維修培訓,移動軟件開發(fā)培訓,網(wǎng)站設(shè)計培訓,網(wǎng)站建設(shè)培訓
  我們的RPC將像這樣工作:

     當客戶端啟動時,它創(chuàng)建一個匿名獨占回調(diào)隊列。
     對于RPC請求,客戶端發(fā)送一個具有兩個屬性的消息:replyTo,它被設(shè)置為回調(diào)隊列和correlationId,它被設(shè)置為每個請求的唯一值。
     請求被發(fā)送到rpc_queue隊列。
     RPC worker(aka:server)正在等待隊列上的請求。 當請求出現(xiàn)時,它將執(zhí)行該作業(yè),并使用replyTo字段中的隊列將結(jié)果發(fā)送回客戶端。
     客戶端等待回呼隊列中的數(shù)據(jù)。 當信息出現(xiàn)時,它檢查correlationId屬性。 如果它與請求中的值相匹配,則返回對應(yīng)用程序的響應(yīng)。

5、整合

  斐波納契【Fibonacci】任務(wù):

private static int fib(int n)
  {    if (n == 0 || n == 1) return n;    return fib(n - 1) + fib(n - 2);
  }


  我們聲明斐波那契函數(shù)。 它只假設(shè)有效的正整數(shù)輸入。 (不要指望這一個能為大數(shù)字工作,而且這可能是最慢的遞歸實現(xiàn))

   我們的RPC服務(wù)器RPCServer.cs的代碼如下所示:

photoshop培訓,電腦培訓,電腦維修培訓,移動軟件開發(fā)培訓,網(wǎng)站設(shè)計培訓,網(wǎng)站建設(shè)培訓

 1  using System; 2 using RabbitMQ.Client; 3 using RabbitMQ.Client.Events; 4 using System.Text; 5  6 class RPCServer 7 { 8     public static void Main() 9     {10         var factory = new ConnectionFactory() { HostName = "localhost" };11         using (var connection = factory.CreateConnection())12         using (var channel = connection.CreateModel())13         {14             channel.QueueDeclare(queue: "rpc_queue", durable: false,15               exclusive: false, autoDelete: false, arguments: null);16             channel.BasicQos(0, 1, false);17             var consumer = new EventingBasicConsumer(channel);18             channel.BasicConsume(queue: "rpc_queue",19               noAck: false, consumer: consumer);20             Console.WriteLine(" [x] Awaiting RPC requests");21 22             consumer.Received += (model, ea) =>23             {24                 string response = null;25 26                 var body = ea.Body;27                 var props = ea.BasicProperties;28                 var replyProps = channel.CreateBasicProperties();29                 replyProps.CorrelationId = props.CorrelationId;30 31                 try32                 {33                     var message = Encoding.UTF8.GetString(body);34                     int n = int.Parse(message);35                     Console.WriteLine(" [.] fib({0})", message);36                     response = fib(n).ToString();37                 }38                 catch (Exception e)39                 {40                     Console.WriteLine(" [.] " + e.Message);41                     response = "";42                 }43                 finally44                 {45                     var responseBytes = Encoding.UTF8.GetBytes(response);46                     channel.BasicPublish(exchange: "", routingKey: props.ReplyTo,47                       basicProperties: replyProps, body: responseBytes);48                     channel.BasicAck(deliveryTag: ea.DeliveryTag,49                       multiple: false);50                 }51             };52 53             Console.WriteLine(" Press [enter] to exit.");54             Console.ReadLine();55         }56     }57 58     ///59 60     /// Assumes only valid positive integer input.61     /// Don't expect this one to work for big numbers, and it's62     /// probably the slowest recursive implementation possible.63     ///64 65     private static int fib(int n)66     {67         if (n == 0 || n == 1)68         {69             return n;70         }71 72         return fib(n - 1) + fib(n - 2);73     }74 }

photoshop培訓,電腦培訓,電腦維修培訓,移動軟件開發(fā)培訓,網(wǎng)站設(shè)計培訓,網(wǎng)站建設(shè)培訓


 服務(wù)器代碼相當簡單:

     像往常一樣,我們開始建立連接,通道并聲明隊列。
     我們可能想要運行多個服務(wù)器進程。 為了在多個服務(wù)器上平均分配負載,我們需要在channel.basicQos中設(shè)置prefetchCount設(shè)置。
     我們使用basicConsume訪問隊列。 然后我們注冊一個交付處理程序,我們在其中進行工作并發(fā)回響應(yīng)。

我們的RPC客戶端的代碼RPCClient.cs

photoshop培訓,電腦培訓,電腦維修培訓,移動軟件開發(fā)培訓,網(wǎng)站設(shè)計培訓,網(wǎng)站建設(shè)培訓

 1 using System; 2 using System.Collections.Generic; 3 using System.Linq; 4 using System.Text; 5 using System.Threading.Tasks; 6 using RabbitMQ.Client; 7 using RabbitMQ.Client.Events; 8  9 class RPCClient10 {11     private IConnection connection;12     private IModel channel;13     private string replyQueueName;14     private QueueingBasicConsumer consumer;15 16     public RPCClient()17     {18         var factory = new ConnectionFactory() { HostName = "localhost" };19         connection = factory.CreateConnection();20         channel = connection.CreateModel();21         replyQueueName = channel.QueueDeclare().QueueName;22         consumer = new QueueingBasicConsumer(channel);23         channel.BasicConsume(queue: replyQueueName,24                              noAck: true,25                              consumer: consumer);26     }27 28     public string Call(string message)29     {30         var corrId = Guid.NewGuid().ToString();31         var props = channel.CreateBasicProperties();32         props.ReplyTo = replyQueueName;33         props.CorrelationId = corrId;34 35         var messageBytes = Encoding.UTF8.GetBytes(message);36         channel.BasicPublish(exchange: "",37                              routingKey: "rpc_queue",38                              basicProperties: props,39                              body: messageBytes);40 41         while(true)42         {43             var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();44             if(ea.BasicProperties.CorrelationId == corrId)45             {46                 return Encoding.UTF8.GetString(ea.Body);47             }48         }49     }50 51     public void Close()52     {53         connection.Close();54     }55 }56 57 class RPC58 {59     public static void Main()60     {61         var rpcClient = new RPCClient();62 63         Console.WriteLine(" [x] Requesting fib(30)");64         var response = rpcClient.Call("30");65         Console.WriteLine(" [.] Got '{0}'", response);66 67         rpcClient.Close();68     }69 }

photoshop培訓,電腦培訓,電腦維修培訓,移動軟件開發(fā)培訓,網(wǎng)站設(shè)計培訓,網(wǎng)站建設(shè)培訓


客戶端代碼稍微復雜一些:

     我們建立一個連接和通道,并為回覆聲明一個獨占的'回調(diào)'隊列。
     我們訂閱'回調(diào)'隊列,這樣我們可以收到RPC響應(yīng)。
     我們的調(diào)用方法使得實際的RPC請求。
     在這里,我們首先生成一個唯一的correlationId數(shù)字并保存它 - while循環(huán)將使用此值來捕獲適當?shù)捻憫?yīng)。
     接下來,我們發(fā)布請求消息,此請求消息具有兩個屬性:replyTo和correlationId。
     在這一點上,我們可以坐下來等待適當?shù)捻憫?yīng)到達。
     while循環(huán)正在做一個非常簡單的工作,對于每個響應(yīng)消息,它檢查correlationId是否是我們正在尋找的。 如果是這樣,它會保存響應(yīng)。
     最后,我們將響應(yīng)返回給用戶。

讓客戶端發(fā)送請求:

photoshop培訓,電腦培訓,電腦維修培訓,移動軟件開發(fā)培訓,網(wǎng)站設(shè)計培訓,網(wǎng)站建設(shè)培訓

var rpcClient = new RPCClient();

Console.WriteLine(" [x] Requesting fib(30)");var response = rpcClient.Call("30");
Console.WriteLine(" [.] Got '{0}'", response);

rpcClient.Close();

photoshop培訓,電腦培訓,電腦維修培訓,移動軟件開發(fā)培訓,網(wǎng)站設(shè)計培訓,網(wǎng)站建設(shè)培訓


現(xiàn)在是看看我們的RPCClient.csRPCServer.cs的完整示例源代碼(包括基本異常處理)的好時機。

照常設(shè)置(參見教程一):

我們的RPC服務(wù)現(xiàn)在已經(jīng)準備好了。 我們可以啟動服務(wù)器:

cd RPCServer
dotnet run
# => [x] Awaiting RPC requests

要請求運行客戶端的fibonacci號碼:

cd RPCClient
dotnet run
# => [x] Requesting fib(30)


這里提出的設(shè)計不是RPC服務(wù)的唯一可能的實現(xiàn),而是具有一些重要的優(yōu)點:

     如果RPC服務(wù)器太慢,可以通過運行另一個RPC服務(wù)器進行擴展。 嘗試在新的控制臺中運行第二個RPCServer。
     在客戶端,RPC需要發(fā)送和接收一條消息。 不需要像queueDeclare這樣的同步調(diào)用。 因此,RPC客戶端只需要一個網(wǎng)絡(luò)往返單個RPC請求。

我們的代碼仍然非常簡單,沒有嘗試解決更復雜(但重要的)問題,例如:

     如果沒有服務(wù)器運行,客戶端應(yīng)該如何反應(yīng)?
     客戶端是否需要RPC的某種超時時間?
     如果服務(wù)器發(fā)生故障并引發(fā)異常,應(yīng)該將其轉(zhuǎn)發(fā)給客戶端?
     在處理之前防止無效的傳入消息(例如檢查邊界,類型)。
好了,這個系列也快結(jié)束了。

在把原地址貼出來,讓大家了解更多。地址如下:http://www.rabbitmq.com/tutorials/tutorial-six-dotnet.html

天下國家,可均也;爵祿,可辭也;白刃,可蹈也;中庸不可能也

http://www.cnblogs.com/PatrickLiu/p/7154550.html