大家好像對分析源碼厭倦了,說實在我也會厭倦,不過不看是無法分析其后面的東西,從易到難是一個必要的過程。

今天說下EventBus,前幾天園里的大神已經(jīng)把其解刨,我今天就借著大神的肩膀,分析下在eShop項目中EventBus的實現(xiàn)。

最近發(fā)覺轉(zhuǎn)發(fā)文章不寫出處的,特此加上鏈接:http://inday.cnblogs.com

解析源碼

我們知道使用EventBus是為了解除Publisher和Subscriber之間的依賴性,這樣我們的Publisher就不需要知道有多少Subscribers,只需要通過EventBus進行注冊管理就好了,在eShop項目中,有一個這樣的接口IEventBus(eShopOnContainers\src\BuildingBlocks\EventBus\EventBus\Abstractions)

電腦培訓,計算機培訓,平面設(shè)計培訓,網(wǎng)頁設(shè)計培訓,美工培訓,Web培訓,Web前端開發(fā)培訓

public interface IEventBus
{    void Subscribe<T, TH>(Func<TH> handler)        where T : IntegrationEvent        where TH : IIntegrationEventHandler<T>;    void Unsubscribe<T, TH>()        where TH : IIntegrationEventHandler<T>        where T : IntegrationEvent;    void Publish(IntegrationEvent @event);
}

電腦培訓,計算機培訓,平面設(shè)計培訓,網(wǎng)頁設(shè)計培訓,美工培訓,Web培訓,Web前端開發(fā)培訓

我們可以看到這個接口定義了EventBus所需的一些操作, 對比大神的EventBus,相關(guān)功能都是一致的,我們看下它的實現(xiàn)類:EventBusRabbitMQ,從名字上可以看出,這是一個通過RabbitMQ來進行管理的EventBus,我們可以看到它使用了IEventBusSubscriptionsManager進行訂閱存儲,也就是大神文中的:

private readonly ConcurrentDictionary<Type, List<Type>> _eventAndHandlerMapping;

微軟在Demo中把其提取出了接口,把一些常用方法給提煉了出來,但是核心還是Dictionary<string, List<Delegate>>, 使用Dictionary進行Map映射。通過Subscribe和UnSubscribe進行訂閱和取消,使用Publish方法進行發(fā)布操作。

電腦培訓,計算機培訓,平面設(shè)計培訓,網(wǎng)頁設(shè)計培訓,美工培訓,Web培訓,Web前端開發(fā)培訓

public void Subscribe<T, TH>(Func<TH> handler)    where T : IntegrationEvent    where TH : IIntegrationEventHandler<T>{    var eventName = typeof(T).Name;    var containsKey = _subsManager.HasSubscriptionsForEvent<T>();    if (!containsKey)
    {        if (!_persistentConnection.IsConnected)
        {
            _persistentConnection.TryConnect();
        }        using (var channel = _persistentConnection.CreateModel())
        {
            channel.QueueBind(queue: _queueName,
                                exchange: BROKER_NAME,
                                routingKey: eventName);
        }
    }

    _subsManager.AddSubscription<T, TH>(handler);

}

電腦培訓,計算機培訓,平面設(shè)計培訓,網(wǎng)頁設(shè)計培訓,美工培訓,Web培訓,Web前端開發(fā)培訓

我們看到在訂閱的時候,EventBus會檢查下在Map中是否有相應(yīng)的注冊,如果沒有的話首先回去RabbitMQ中創(chuàng)建一個新的channel進行綁定,隨后在Map中進行注冊映射。

UnSubscribe則直接從Map中取消映射,通過OnEventRemoved事件判斷Map下此映射的subscriber是否為空,為空則從RabbitMQ中關(guān)閉channel。

在RabbitMQ的構(gòu)造方法中,我們看到這樣一個創(chuàng)建:CreateConsumerChannel(),這里創(chuàng)建了一個EventingBasicConsumer,當Queue中有新的消息時會通過ProcessEvent執(zhí)行Map中注冊的handler(subscribers),看圖可能更清晰些:

 

電腦培訓,計算機培訓,平面設(shè)計培訓,網(wǎng)頁設(shè)計培訓,美工培訓,Web培訓,Web前端開發(fā)培訓

在ProcessEvent方法中,回去Map中找尋subscribers,然后通過動態(tài)反射進行執(zhí)行:

電腦培訓,計算機培訓,平面設(shè)計培訓,網(wǎng)頁設(shè)計培訓,美工培訓,Web培訓,Web前端開發(fā)培訓

private async Task ProcessEvent(string eventName, string message)
{    if (_subsManager.HasSubscriptionsForEvent(eventName))
    { 
        var eventType = _subsManager.GetEventTypeByName(eventName);        var integrationEvent = JsonConvert.DeserializeObject(message, eventType);        var handlers = _subsManager.GetHandlersForEvent(eventName);        foreach (var handlerfactory in handlers)
        {            var handler = handlerfactory.DynamicInvoke();            var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType);            await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent });
        }
    }
}

電腦培訓,計算機培訓,平面設(shè)計培訓,網(wǎng)頁設(shè)計培訓,美工培訓,Web培訓,Web前端開發(fā)培訓

微軟通過簡單的代碼解耦了Publisher和Subscribers之間的依賴關(guān)系,我們引用大神的總結(jié):

電腦培訓,計算機培訓,平面設(shè)計培訓,網(wǎng)頁設(shè)計培訓,美工培訓,Web培訓,Web前端開發(fā)培訓

應(yīng)用

在catalog.api中,微軟出現(xiàn)了EventBus,我在上一篇中也提到了,這是我的一個疑惑,因為在catalog中并沒有訂閱操作,直接執(zhí)行了Publish操作,原先以為是一個空操作,后來看了Basket.Api我才知道為何微軟要用RabbitMQ。

使用RabbitMQ,我們不僅是從類之間的解耦,更可以跨項目,跨語言,跨平臺的解耦,publisher僅僅需要把消息體(IntegrationEvent)傳送到RabbitMQ,Consumer從Queue中獲取消息體,然后推送到Subscribers執(zhí)行相應(yīng)的操作。我們看下Basket.Api.Startup.cs:

電腦培訓,計算機培訓,平面設(shè)計培訓,網(wǎng)頁設(shè)計培訓,美工培訓,Web培訓,Web前端開發(fā)培訓

protected virtual void ConfigureEventBus(IApplicationBuilder app)
{    var catalogPriceHandler = app.ApplicationServices
        .GetService<IIntegrationEventHandler<ProductPriceChangedIntegrationEvent>>();    var orderStartedHandler = app.ApplicationServices
        .GetService<IIntegrationEventHandler<OrderStartedIntegrationEvent>>();    var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>();

    eventBus.Subscribe<ProductPriceChangedIntegrationEvent, ProductPriceChangedIntegrationEventHandler>
        (() => app.ApplicationServices.GetRequiredService<ProductPriceChangedIntegrationEventHandler>());

    eventBus.Subscribe<OrderStartedIntegrationEvent, OrderStartedIntegrationEventHandler>
        (() => app.ApplicationServices.GetRequiredService<OrderStartedIntegrationEventHandler>());
}

電腦培訓,計算機培訓,平面設(shè)計培訓,網(wǎng)頁設(shè)計培訓,美工培訓,Web培訓,Web前端開發(fā)培訓

在這個方法里,我們看到了Subscribe操作,想想之前的提問有點搞笑,不過研究明白了也不錯,對吧!

總結(jié)

今天我們看了EventBus在Demo中的應(yīng)用,總結(jié)一下。

1、EventBus可以很好的解耦訂閱者和發(fā)布者之間的依賴

2、使用RabbitMQ能夠跨項目、跨平臺、跨語言的解耦訂閱者和發(fā)布者

雖然在Demo中我們看到對訂閱者的管理是通過Dictionary內(nèi)存的方式,所以我們的Subscribe僅僅只在Basket.Api中看到,但微軟是通過IEventBusSubscriptionsManager接口定義的,我們可以通過自己的需求來進行定制,可以做成分布式的,比如使用memcached。

寫在最后

每個月到下旬就會比較忙,所以文章發(fā)布會比較慢,但我也會堅持學習完eShop的,為了學習,我建了個群,大家可以進來一起學習,有什么建議和問題都可以進來哦。

eShop雖好,但不建議大家放到生產(chǎn)環(huán)境,畢竟是一個Demo,而且目前還是ALPHA版本,用來學習是一個很好的教材,這就是一個大雜燴,學習中你會學到很多新的東西,大家如果看好core的發(fā)展,可以一起研究下。

QQ群:376248054

技術(shù)改變生活,技術(shù)改變?nèi)松?!用技術(shù)來創(chuàng)造價值,擁有技術(shù),不僅僅是開發(fā),您將獲得更多!如果您覺得我能幫到您,您可以通過掃描下面二維碼來【捐助】我!

http://www.cnblogs.com/inday/p/eventbus-in-eshopcontainers.html