大家好像對分析源碼厭倦了,說實在我也會厭倦,不過不看是無法分析其后面的東西,從易到難是一個必要的過程。
今天說下EventBus,前幾天園里的大神已經(jīng)把其解刨,我今天就借著大神的肩膀,分析下在eShop項目中EventBus的實現(xiàn)。
最近發(fā)覺轉(zhuǎn)發(fā)文章不寫出處的,特此加上鏈接:http://inday.cnblogs.com
解析源碼
我們知道使用EventBus是為了解除Publisher和Subscriber之間的依賴性,這樣我們的Publisher就不需要知道有多少Subscribers,只需要通過EventBus進行注冊管理就好了,在eShop項目中,有一個這樣的接口IEventBus(eShopOnContainers\src\BuildingBlocks\EventBus\EventBus\Abstractions)
public interface IEventBus { void Subscribe<T, TH>(Func<TH> handler) where T : IntegrationEvent where TH : IIntegrationEventHandler<T>; void Unsubscribe<T, TH>() where TH : IIntegrationEventHandler<T> where T : IntegrationEvent; void Publish(IntegrationEvent @event); }
我們可以看到這個接口定義了EventBus所需的一些操作, 對比大神的EventBus,相關(guān)功能都是一致的,我們看下它的實現(xiàn)類:EventBusRabbitMQ,從名字上可以看出,這是一個通過RabbitMQ來進行管理的EventBus,我們可以看到它使用了IEventBusSubscriptionsManager進行訂閱存儲,也就是大神文中的:
private readonly ConcurrentDictionary<Type, List<Type>> _eventAndHandlerMapping;
微軟在Demo中把其提取出了接口,把一些常用方法給提煉了出來,但是核心還是Dictionary<string, List<Delegate>>, 使用Dictionary進行Map映射。通過Subscribe和UnSubscribe進行訂閱和取消,使用Publish方法進行發(fā)布操作。
public void Subscribe<T, TH>(Func<TH> handler) where T : IntegrationEvent where TH : IIntegrationEventHandler<T>{ var eventName = typeof(T).Name; var containsKey = _subsManager.HasSubscriptionsForEvent<T>(); if (!containsKey) { if (!_persistentConnection.IsConnected) { _persistentConnection.TryConnect(); } using (var channel = _persistentConnection.CreateModel()) { channel.QueueBind(queue: _queueName, exchange: BROKER_NAME, routingKey: eventName); } } _subsManager.AddSubscription<T, TH>(handler); }
我們看到在訂閱的時候,EventBus會檢查下在Map中是否有相應(yīng)的注冊,如果沒有的話首先回去RabbitMQ中創(chuàng)建一個新的channel進行綁定,隨后在Map中進行注冊映射。
UnSubscribe則直接從Map中取消映射,通過OnEventRemoved事件判斷Map下此映射的subscriber是否為空,為空則從RabbitMQ中關(guān)閉channel。
在RabbitMQ的構(gòu)造方法中,我們看到這樣一個創(chuàng)建:CreateConsumerChannel(),這里創(chuàng)建了一個EventingBasicConsumer,當Queue中有新的消息時會通過ProcessEvent執(zhí)行Map中注冊的handler(subscribers),看圖可能更清晰些:
在ProcessEvent方法中,回去Map中找尋subscribers,然后通過動態(tài)反射進行執(zhí)行:
private async Task ProcessEvent(string eventName, string message) { if (_subsManager.HasSubscriptionsForEvent(eventName)) { var eventType = _subsManager.GetEventTypeByName(eventName); var integrationEvent = JsonConvert.DeserializeObject(message, eventType); var handlers = _subsManager.GetHandlersForEvent(eventName); foreach (var handlerfactory in handlers) { var handler = handlerfactory.DynamicInvoke(); var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType); await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent }); } } }
微軟通過簡單的代碼解耦了Publisher和Subscribers之間的依賴關(guān)系,我們引用大神的總結(jié):
應(yīng)用
在catalog.api中,微軟出現(xiàn)了EventBus,我在上一篇中也提到了,這是我的一個疑惑,因為在catalog中并沒有訂閱操作,直接執(zhí)行了Publish操作,原先以為是一個空操作,后來看了Basket.Api我才知道為何微軟要用RabbitMQ。
使用RabbitMQ,我們不僅是從類之間的解耦,更可以跨項目,跨語言,跨平臺的解耦,publisher僅僅需要把消息體(IntegrationEvent)傳送到RabbitMQ,Consumer從Queue中獲取消息體,然后推送到Subscribers執(zhí)行相應(yīng)的操作。我們看下Basket.Api.Startup.cs:
protected virtual void ConfigureEventBus(IApplicationBuilder app) { var catalogPriceHandler = app.ApplicationServices .GetService<IIntegrationEventHandler<ProductPriceChangedIntegrationEvent>>(); var orderStartedHandler = app.ApplicationServices .GetService<IIntegrationEventHandler<OrderStartedIntegrationEvent>>(); var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>(); eventBus.Subscribe<ProductPriceChangedIntegrationEvent, ProductPriceChangedIntegrationEventHandler> (() => app.ApplicationServices.GetRequiredService<ProductPriceChangedIntegrationEventHandler>()); eventBus.Subscribe<OrderStartedIntegrationEvent, OrderStartedIntegrationEventHandler> (() => app.ApplicationServices.GetRequiredService<OrderStartedIntegrationEventHandler>()); }
在這個方法里,我們看到了Subscribe操作,想想之前的提問有點搞笑,不過研究明白了也不錯,對吧!
總結(jié)
今天我們看了EventBus在Demo中的應(yīng)用,總結(jié)一下。
1、EventBus可以很好的解耦訂閱者和發(fā)布者之間的依賴
2、使用RabbitMQ能夠跨項目、跨平臺、跨語言的解耦訂閱者和發(fā)布者
雖然在Demo中我們看到對訂閱者的管理是通過Dictionary內(nèi)存的方式,所以我們的Subscribe僅僅只在Basket.Api中看到,但微軟是通過IEventBusSubscriptionsManager接口定義的,我們可以通過自己的需求來進行定制,可以做成分布式的,比如使用memcached。
寫在最后
每個月到下旬就會比較忙,所以文章發(fā)布會比較慢,但我也會堅持學習完eShop的,為了學習,我建了個群,大家可以進來一起學習,有什么建議和問題都可以進來哦。
eShop雖好,但不建議大家放到生產(chǎn)環(huán)境,畢竟是一個Demo,而且目前還是ALPHA版本,用來學習是一個很好的教材,這就是一個大雜燴,學習中你會學到很多新的東西,大家如果看好core的發(fā)展,可以一起研究下。
QQ群:376248054
技術(shù)改變生活,技術(shù)改變?nèi)松?!用技術(shù)來創(chuàng)造價值,擁有技術(shù),不僅僅是開發(fā),您將獲得更多!如果您覺得我能幫到您,您可以通過掃描下面二維碼來【捐助】我!
http://www.cnblogs.com/inday/p/eventbus-in-eshopcontainers.html