ABP Framework-EventBus源码解析

目录

https://abp.io/docs/6.0/Event-Bus

Version

6.0.3

Package

Volo.Abp.EventBus.Abstractions 
Volo.Abp.EventBus //依赖Volo.Abp.EventBus.Abstractions
Volo.Abp.EventBus.Azure //依赖Volo.Abp.EventBus
Volo.Abp.EventBus.Kafka //依赖Volo.Abp.EventBus
Volo.Abp.EventBus.RabbitMQ //依赖Volo.Abp.EventBus
Volo.Abp.EventBus.Rebus //依赖Volo.Abp.EventBus

EventHandler

当某个事件发生,如果想要订阅事件,则是定义一个Handler来处理,ABP中经常使用的模式简要如下,以本地事件为例:

public class MyHandler : ILocalEventHandler<StockCountChangedEvent>, ITransientDependency
{
    public async Task HandleEventAsync(StockCountChangedEvent eventData)
    {
        //TODO: your code that does something on the event
    }
}

EventHandler从源头上便区分为LocalHandler和DistributedHandler以应付不同的使用场景。 205030313_458ea547-4b76-4cbd-b8bc-478e661826b7 在项目启动阶段,在AbpEventBusModule中,会扫描所有LocalEventHandler和DistributedEventHandler分别注册到AbpLocalEventBusOptionsh和AbpDistributedEventBusOptions。

[DependsOn(
    typeof(AbpEventBusAbstractionsModule),
    typeof(AbpMultiTenancyModule),
    typeof(AbpJsonModule),
    typeof(AbpGuidsModule),
    typeof(AbpBackgroundWorkersModule),
    typeof(AbpDistributedLockingAbstractionsModule)
    )]
public class AbpEventBusModule : AbpModule
{
    public override void PreConfigureServices(ServiceConfigurationContext context)
    {
        AddEventHandlers(context.Services);
    }


    //...


    private static void AddEventHandlers(IServiceCollection services)
    {
        var localHandlers = new List<Type>();
        var distributedHandlers = new List<Type>();


        // 扫描项目中的Handlers
        services.OnRegistred(context =>
        {
            if (ReflectionHelper.IsAssignableToGenericType(context.ImplementationType, typeof(ILocalEventHandler<>)))
            {
                localHandlers.Add(context.ImplementationType);
            }
            else if (ReflectionHelper.IsAssignableToGenericType(context.ImplementationType, typeof(IDistributedEventHandler<>)))
            {
                distributedHandlers.Add(context.ImplementationType);
            }
        });
    
        // 存储LocalHandlers和DistributedHandlers到各自Options中
        services.Configure<AbpLocalEventBusOptions>(options =>
        {
            options.Handlers.AddIfNotContains(localHandlers);
        });
        services.Configure<AbpDistributedEventBusOptions>(options =>
        {
            options.Handlers.AddIfNotContains(distributedHandlers);
        });
    }
}

这两个Options将贯穿后续EventBus。

public class AbpLocalEventBusOptions
{
    public ITypeList<IEventHandler> Handlers { get; }
}


public class AbpDistributedEventBusOptions
{
    public ITypeList<IEventHandler> Handlers { get; }
    public OutboxConfigDictionary Outboxes { get; }
    public InboxConfigDictionary Inboxes { get; }
}

EventHandlerFactory

在上层使用中,对于Handler的实例化不是直接操作Handler,借助于EventHandlerFactory再封装一层。

205032720_a9b12297-0e08-4610-acad-f1d24711441c 以IocEventHandlerFactory为例,在构造Factory时则存入EventHandlerType,当需要使用Handler时,则调用Factory.GetHandler来实例化一个Handler并将其包装到EventHandlerDisposeWrapper中。

public class IocEventHandlerFactory : IEventHandlerFactory, IDisposable
{
    public Type HandlerType { get; }


    protected IServiceScopeFactory ScopeFactory { get; }


    public IocEventHandlerFactory(IServiceScopeFactory scopeFactory, Type handlerType)
    {
        ScopeFactory = scopeFactory;
        HandlerType = handlerType;
    }


    /// <summary>
    /// Resolves handler object from Ioc container.
    /// </summary>
    /// <returns>Resolved handler object</returns>
    public IEventHandlerDisposeWrapper GetHandler()
    {
        var scope = ScopeFactory.CreateScope();
        return new EventHandlerDisposeWrapper(
            (IEventHandler)scope.ServiceProvider.GetRequiredService(HandlerType),
            () => scope.Dispose()
        );
    }


    public bool IsInFactories(List<IEventHandlerFactory> handlerFactories)
    {
        return handlerFactories
            .OfType<IocEventHandlerFactory>()
            .Any(f => f.HandlerType == HandlerType);
    }


    public void Dispose()
    {


    }
}

EventHandlerInvoker

和Factory的封装类似,对于Handler,不直接调用Handler的执行方法,而是再封装一层。ABP中封装了IEventHandler,内部实现缓存来管理Handler的执行。

public class EventHandlerInvoker : IEventHandlerInvoker, ISingletonDependency
{
    private readonly ConcurrentDictionary<string, EventHandlerInvokerCacheItem> _cache;


    public EventHandlerInvoker()
    {
        _cache = new ConcurrentDictionary<string, EventHandlerInvokerCacheItem>();
    }


    public async Task InvokeAsync(IEventHandler eventHandler, object eventData, Type eventType)
    {
        // 获得EventHandlerMethodExecutor实例并缓存
        var cacheItem = _cache.GetOrAdd($"{eventHandler.GetType().FullName}-{eventType.FullName}", _ =>
        {
            var item = new EventHandlerInvokerCacheItem();


            if (typeof(ILocalEventHandler<>).MakeGenericType(eventType).IsInstanceOfType(eventHandler))
            {
                item.Local = (IEventHandlerMethodExecutor)Activator.CreateInstance(typeof(LocalEventHandlerMethodExecutor<>).MakeGenericType(eventType));
            }


            if (typeof(IDistributedEventHandler<>).MakeGenericType(eventType).IsInstanceOfType(eventHandler))
            {
                item.Distributed = (IEventHandlerMethodExecutor)Activator.CreateInstance(typeof(DistributedEventHandlerMethodExecutor<>).MakeGenericType(eventType));
            }


            return item;
        });


        // 执行EventHandlerMethodExecutor的执行方法,进入到Handler中
        if (cacheItem.Local != null)
        {
            // 执行EventHandlerMethodExecutor的执行方法
            await cacheItem.Local.ExecutorAsync(eventHandler, eventData);
        }
        if (cacheItem.Distributed != null)
        {
            await cacheItem.Distributed.ExecutorAsync(eventHandler, eventData);
        }


        //...
    }
}

具体的执行方法也进行了封装IEventHandlerMethodExecutor,该实现内部才是调用HandlerEventAsync方法所在。

public class LocalEventHandlerMethodExecutor<TEvent> : IEventHandlerMethodExecutor
    where TEvent : class
{
    public  EventHandlerMethodExecutorAsync ExecutorAsync => (target, parameter) => target.As<ILocalEventHandler<TEvent>>().HandleEventAsync(parameter.As<TEvent>());


    public Task ExecuteAsync(IEventHandler target, TEvent parameters)
    {
        return ExecutorAsync(target, parameters);
    }
}


public class DistributedEventHandlerMethodExecutor<TEvent> : IEventHandlerMethodExecutor
    where TEvent : class
{
    public EventHandlerMethodExecutorAsync ExecutorAsync => (target, parameter) => target.As<IDistributedEventHandler<TEvent>>().HandleEventAsync(parameter.As<TEvent>());


    public Task ExecuteAsync(IEventHandler target, TEvent parameters)
    {
        return ExecutorAsync(target, parameters);
    }
}

该部分类图简要如下 205034032_a8b48798-8ff6-4dda-95cb-7407c7343179

EventBus

LocalEventBus

205035762_6f7d1280-7f90-458d-ac56-504f9cd41fa4

Subscribe

生命周期为单例,当初始化LocalEventBus时,会从上一节的AbpLocalEventBusOptions中获取所有Handlers,再按照Handlers所处理的类型分组整理成字典集合。

[ExposeServices(typeof(ILocalEventBus), typeof(LocalEventBus))]
public class LocalEventBus : EventBusBase, ILocalEventBus, ISingletonDependency
{
    protected AbpLocalEventBusOptions Options { get; }
    protected ConcurrentDictionary<Type, List<IEventHandlerFactory>> HandlerFactories { get; }


    public LocalEventBus(
        IOptions<AbpLocalEventBusOptions> options,
        IServiceScopeFactory serviceScopeFactory,
        ICurrentTenant currentTenant,
        IUnitOfWorkManager unitOfWorkManager,
        IEventHandlerInvoker eventHandlerInvoker)
        : base(serviceScopeFactory, currentTenant, unitOfWorkManager, eventHandlerInvoker)
    {
        Options = options.Value;
        Logger = NullLogger<LocalEventBus>.Instance;


        HandlerFactories = new ConcurrentDictionary<Type, List<IEventHandlerFactory>>();
        SubscribeHandlers(Options.Handlers);
    }


    protected virtual void SubscribeHandlers(ITypeList<IEventHandler> handlers)
    {
        foreach (var handler in handlers)
        {
            var interfaces = handler.GetInterfaces();
            foreach (var @interface in interfaces)
            {
                if (!typeof(IEventHandler).GetTypeInfo().IsAssignableFrom(@interface))
                {
                    continue;
                }


                // 获取事件类型
                var genericArgs = @interface.GetGenericArguments();
                if (genericArgs.Length == 1)
                {
                    Subscribe(genericArgs[0], new IocEventHandlerFactory(ServiceScopeFactory, handler));
                }
            }
        }
    }


    public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory)
    {
        // 按照类型,将包装了Handler的Factory加入到对应Factory集合中
        GetOrCreateHandlerFactories(eventType)
            .Locking(factories =>
                {
                    if (!factory.IsInFactories(factories))
                    {
                        factories.Add(factory);
                    }
                }
            );


        return new EventHandlerFactoryUnregistrar(this, eventType, factory);
    }


    private List<IEventHandlerFactory> GetOrCreateHandlerFactories(Type eventType)
    {
        return HandlerFactories.GetOrAdd(eventType, (type) => new List<IEventHandlerFactory>());
    }


    //...
}

简要过程如下

  1. 初始化后从AbpLocalEventBusOptions得到所有LocalHandlers

  2. 解析每一个Handler继承接口所声明的参数,例如,找到ILocalEventHandler的泛型类型StockCountChangedEvent。

public class MyHandler : ILocalEventHandler<StockCountChangedEvent>, ITransientDependency
{
    public async Task HandleEventAsync(StockCountChangedEvent eventData)
    {
        //TODO: your code that does something on the event
    }
}
  1. 将这个事件参数类型对应一个IEventHandlerFactory集合。所有事件参数类型和对应的Handlers都会保存在HandlerFactories中。
HandlerFactories.GetOrAdd(eventType, (type) => new List<IEventHandlerFactory>())

HandlerFactories在LocalEventBus中定义为并发字典类型。

protected ConcurrentDictionary<Type, List<IEventHandlerFactory>> HandlerFactories { get; }

例如,事件类型,StockCountChangedEvent,会存入到字典中,类型作为key, MyHandler作为Eve。

TypeList
StockCountChangedEventnew IocEventHandlerFactory(ServiceScopeFactory, MyHandler )

Unsubscribe

对应于Subscribe,还有一个Unsubscribe,来解除事件参数类型关联的EventHandlers

public class LocalEventBus : EventBusBase, ILocalEventBus, ISingletonDependency
{
    protected ConcurrentDictionary<Type, List<IEventHandlerFactory>> HandlerFactories { get; }
    //...
    public override void Unsubscribe(Type eventType, IEventHandler handler)
    {
        GetOrCreateHandlerFactories(eventType)
            .Locking(factories =>
            {
                factories.RemoveAll(
                    factory =>
                        factory is SingleInstanceHandlerFactory &&
                        (factory as SingleInstanceHandlerFactory).HandlerInstance == handler
                );
            });
    }
    //...
}

该方法使用场景较少,可能在特定需求下要删除某些Handler的处理,类似于实现动态订阅。

Publish

当在使用eventbus,发送事件时,常见用法是

await _localEventBus.PublishAsync(new StockCountChangedEvent
{
    ProductId = productId,
    NewCount = newCount
});

在其内部,会因onUnitOWorkComplete参数的差异采用不同的方式。

public class LocalEventBus : EventBusBase, ILocalEventBus, ISingletonDependency
{
    //...
    public Task PublishAsync<TEvent>(TEvent eventData, bool onUnitOfWorkComplete = true)
        where TEvent : class
    {
        return PublishAsync(typeof(TEvent), eventData, onUnitOfWorkComplete);
    }


    public virtual async Task PublishAsync(
        Type eventType,
        object eventData,
        bool onUnitOfWorkComplete = true)
    {
        if (onUnitOfWorkComplete && UnitOfWorkManager.Current != null)
        {
            AddToUnitOfWork(
                UnitOfWorkManager.Current,
                new UnitOfWorkEventRecord(eventType, eventData, EventOrderGenerator.GetNext())
            );
            return;
        }


        await PublishToEventBusAsync(eventType, eventData);
    }


    protected override void AddToUnitOfWork(IUnitOfWork unitOfWork, UnitOfWorkEventRecord eventRecord)
    {
        unitOfWork.AddOrReplaceLocalEvent(eventRecord);
    }


    protected override async Task PublishToEventBusAsync(Type eventType, object eventData)
    {
        await PublishAsync(new LocalEventMessage(Guid.NewGuid(), eventData, eventType));
    }
    //...
 }

onUnitOfWorkComplete(true)

当参数为true,且当前存在工作单元下,会进入AddToUnitOfWork方法内,其内部则是调用UnitOfWork,将当前的LocalEvent先临时存储在LocalEvents集合中。

public class UnitOfWork : IUnitOfWork, ITransientDependency
{
    protected List<UnitOfWorkEventRecord> LocalEvents { get; } = new List<UnitOfWorkEventRecord>();
    //...
    public virtual void AddOrReplaceLocalEvent(
      UnitOfWorkEventRecord eventRecord,
      Predicate<UnitOfWorkEventRecord> replacementSelector = null)
    {
        // 注意传入的LocalEvents集合
        AddOrReplaceEvent(LocalEvents, eventRecord, replacementSelector);
    }
  
    public virtual void AddOrReplaceEvent(
        List<UnitOfWorkEventRecord> eventRecords,
        UnitOfWorkEventRecord eventRecord,
        Predicate<UnitOfWorkEventRecord> replacementSelector = null)
    {
        if (replacementSelector == null)
        {
            eventRecords.Add(eventRecord);
        }
        else
        {
            var foundIndex = eventRecords.FindIndex(replacementSelector);
            if (foundIndex < 0)
            {
                eventRecords.Add(eventRecord);
            }
            else
            {
                eventRecords[foundIndex] = eventRecord;
            }
        }
    }
    //...
}

这种方式下,会在工作单元即将完成时才会循环遍历各LocalEvent并发布出去触发其对应的EventHandler。该部分代码在UnitOfWork中完成。

public class UnitOfWork : IUnitOfWork, ITransientDependency
{
    protected List<UnitOfWorkEventRecord> LocalEvents { get; } = new List<UnitOfWorkEventRecord>();
    protected IUnitOfWorkEventPublisher UnitOfWorkEventPublisher { get; }
    //...
    public virtual async Task CompleteAsync(CancellationToken cancellationToken = default)
    {
        if (_isRolledback)
        {
            return;
        }
    
        PreventMultipleComplete();
    
        try
        {
            _isCompleting = true;
            await SaveChangesAsync(cancellationToken);
    
            while (LocalEvents.Any() || DistributedEvents.Any())
            {
                if (LocalEvents.Any())
                {
                    var localEventsToBePublished = LocalEvents.OrderBy(e => e.EventOrder).ToArray();
                    LocalEvents.Clear();
                    // UnitOfWorkEventPublisher内部则是依赖EventBus发布
                    await UnitOfWorkEventPublisher.PublishLocalEventsAsync(
                        localEventsToBePublished
                    );
                }
    
                if (DistributedEvents.Any())
                {
                    var distributedEventsToBePublished = DistributedEvents.OrderBy(e => e.EventOrder).ToArray();
                    DistributedEvents.Clear();
                    await UnitOfWorkEventPublisher.PublishDistributedEventsAsync(
                        distributedEventsToBePublished
                    );
                }
    
                await SaveChangesAsync(cancellationToken);
            }
    
            await CommitTransactionsAsync();
            IsCompleted = true;
            await OnCompletedAsync();
        }
        catch (Exception ex)
        {
            _exception = ex;
            throw;
        }
    }
    //...
}

如上提及的UnitOfWorkEventPublisher,是在EventBus上封装一层。内部还是调用LocalEventBus,此时传递的参数onUnitOfWorkComplete为false。

public class UnitOfWorkEventPublisher : IUnitOfWorkEventPublisher, ITransientDependency
{
    private readonly ILocalEventBus _localEventBus;
    //...
    
    public async Task PublishLocalEventsAsync(IEnumerable<UnitOfWorkEventRecord> localEvents)
    {
        foreach (var localEvent in localEvents)
        {
            await _localEventBus.PublishAsync(
                localEvent.EventType,
                localEvent.EventData,
                onUnitOfWorkComplete: false
            );
        }
    }
    //...
}

205037014_231f328e-eeac-477f-9650-3c8db64c140a

onUnitOfWorkComplete(false)

当传参false时,则会按照发送事件类型找到在初始化阶段注册好的HandlerFactories中搜寻类型对应的Handler,再执行调用。

public class LocalEventBus : EventBusBase, ILocalEventBus, ISingletonDependency
{
    //...
    public virtual async Task PublishAsync(LocalEventMessage localEventMessage)
    {
        await TriggerHandlersAsync(localEventMessage.EventType, localEventMessage.EventData);
    }
    
    protected virtual async Task TriggerHandlersAsync(Type eventType, object eventData, List<Exception> exceptions, InboxConfig inboxConfig = null)
    {
        await new SynchronizationContextRemover();
    
        // 按照事件类型得到关联的HandlerFactory
        foreach (var handlerFactories in GetHandlerFactories(eventType))
        {
            foreach (var handlerFactory in handlerFactories.EventHandlerFactories)
            {
                // 得到HandlerFactory中的Handler,传参调用Handler的执行方法
                await TriggerHandlerAsync(handlerFactory, handlerFactories.EventType, eventData, exceptions, inboxConfig);
            }
        }
    
        // 泛型事件参数Handler下的事件处理
        if (eventType.GetTypeInfo().IsGenericType &&
            eventType.GetGenericArguments().Length == 1 &&
            typeof(IEventDataWithInheritableGenericArgument).IsAssignableFrom(eventType))
        {
            var genericArg = eventType.GetGenericArguments()[0];
            var baseArg = genericArg.GetTypeInfo().BaseType;
            if (baseArg != null)
            {
                var baseEventType = eventType.GetGenericTypeDefinition().MakeGenericType(baseArg);
                var constructorArgs = ((IEventDataWithInheritableGenericArgument)eventData).GetConstructorArgs();
                var baseEventData = Activator.CreateInstance(baseEventType, constructorArgs);
                // 该方法最终也还是进入到TriggerHandlerAsync
                await PublishToEventBusAsync(baseEventType, baseEventData);
            }
        }
    }
    
    // 核心方法,执行Handler的处理逻辑
    protected virtual async Task TriggerHandlerAsync(IEventHandlerFactory asyncHandlerFactory, Type eventType,
        object eventData, List<Exception> exceptions, InboxConfig inboxConfig = null)
    {
        using (var eventHandlerWrapper = asyncHandlerFactory.GetHandler())
        {
            //...
            // 通过EventHandler类型找到匹配的InboxConfig(该部分代码仅用于DistributedEventBus,此处可忽视该段代码)
            var handlerType = eventHandlerWrapper.EventHandler.GetType();
            if (inboxConfig?.HandlerSelector != null && !inboxConfig.HandlerSelector(handlerType))
            {
                return;
            }


            // 设置当前执行环境的租户
            using (CurrentTenant.Change(GetEventDataTenantId(eventData)))
            {
                // 调用进入到Handler的方法中,在上一节中提及EventHandlerInvoker
                await EventHandlerInvoker.InvokeAsync(eventHandlerWrapper.EventHandler, eventData, eventType);
            }
            //...
        }
    }


    //...
}

如上对应于泛型事件参数的使用,对应于文档中提及的如下用法,主要是EntityChangedEventData这类泛型类。

public class MyHandler : ILocalEventHandler<EntityCreatedEventData<IdentityUser>>, ITransientDependency
{
/    
}

205038111_5e357bce-1836-44d3-95ca-b4cff291b314 在Publish中,通过判别是否继承IEventDataWithInheritableGenericArgument以此来判断是否是泛型事件参数,从而再找到实际事件类型和对应的HandlerFactory再进行发送处理。

简要描述下整体思路:

  1. 项目启动阶段收集所有Handler注册到AbpLocalEventBusOptions中。

  2. 实例化LocalEventBus阶段,从Options中遍历事件类型及事件对应Handler组合到HandlerFactory,分类构建成HandlerFactories字典。

  3. 发送事件时,调用LocalEventBus.PublishAsync方法,根据参数onUnitOfWorkComplete决定是否立即发送(false)还是待工作单元即将完成时发送(true),如果是待工作单元即将完成,则等到工作单元执行完成方法时,其内部再次调用发送事件,此时onUnitOfWorkComplete传参为false进入4步骤;如果是立即发送,则直接进入4步骤。

  4. 实际的发送事件中,其内部根据发送的事件类型,从HandlerFactories中找到事件类型对应的HandlerFactory,再从Factory得到具体的Handler

  5. 借助EventHandlerInvoker来管理Handler的执行,EventHandlerInvoker内部使用EventHandlerMethodExecutor来实际调用Handler的HandleEventAsync方法。

  6. 进入到LocalEventHandler的HandleEventAsync方法内执行逻辑处理。

DistributedEventBus

分布式事件相对于本地事件,解耦的更彻底,通常借助于MQ来实现微服务间或者应用间的解耦。

205039433_023c5b19-82c1-429f-86a0-e0fe6b7f743b随着服务间使用的MQ不同,其实现也有相应的版本,默认ABP实现了如下几个版本

  • LocalDistributedEventBus,当没有选定MQ时,使用这个实现,其内部则使用上一节的LocalEventBus,作为本地事件来处理,场景更限制于单个服务上,事件无法应用到其他微服务/应用中。

  • RabbitMqDistributedEventBus,内部使用RabbitMq的包,将服务作为队列名注册到RabbitMq中,以及利用RabbitMq订阅消息,再转换到EventHandler处理。

  • KafkaDistributedEventBus,内部使用Kafka的包,过程和RabbitMq类似。

  • AzureDistributedEventBus,内部使用AzureServiceBus的包,过程和RabbitMq类似。

  • RebusDistributedEventBus,内部使用Rebus的包,过程和RabbitMq类似。

Subscribe

项目启动时会先调用各DistributedEventBus内初始化方法,执行和LocalEventBus相同的Subscribe方法,从AbpDistributedEventBusOptions中获取所有Handlers,再按照Handlers所处理的类型分组整理成字典集合。

例如RabbitMqDistrubutedEventBus,在应用启动时执行初始化

[DependsOn(
    typeof(AbpEventBusModule),
    typeof(AbpRabbitMqModule))]
public class AbpEventBusRabbitMqModule : AbpModule
{
    public override void ConfigureServices(ServiceConfigurationContext context)
    {
        var configuration = context.Services.GetConfiguration();
        Configure<AbpRabbitMqEventBusOptions>(configuration.GetSection("RabbitMQ:EventBus"));
    }


    public override void OnApplicationInitialization(ApplicationInitializationContext context)
    {
        context
            .ServiceProvider
            .GetRequiredService<RabbitMqDistributedEventBus>()
            .Initialize();
    }
}

方法内部,则还是调用到SubscribeHandlers,所有事件类型及对应HandlerFactory保存在HanderFactories字典中。

public class RabbitMqDistributedEventBus : DistributedEventBusBase, ISingletonDependency
{
    protected AbpRabbitMqEventBusOptions AbpRabbitMqEventBusOptions { get; }
    protected ConcurrentDictionary<Type, List<IEventHandlerFactory>> HandlerFactories { get; }
    //...
    public void Initialize()
    {
        // 注册消费者
        Consumer = MessageConsumerFactory.Create(
            new ExchangeDeclareConfiguration(
                AbpRabbitMqEventBusOptions.ExchangeName,
                type: AbpRabbitMqEventBusOptions.GetExchangeTypeOrDefault(),
                durable: true
            ),
            new QueueDeclareConfiguration(
                AbpRabbitMqEventBusOptions.ClientName,
                durable: true,
                exclusive: false,
                autoDelete: false
            ),
            AbpRabbitMqEventBusOptions.ConnectionName
        );
        // 绑定收到MQ的消息后的后续处理
        Consumer.OnMessageReceived(ProcessEventAsync);


        SubscribeHandlers(AbpDistributedEventBusOptions.Handlers);
    }
    //...
}

对于AzureEventBus,KafkaEventBus和RebusEventBus都是如此,在应用启动时调用初始化方法。

Unsubscribe

和LocalEventBus中作用相同,解除事件参数类型关联的EventHandlers,使用场景较少,其作用在一些特定场景下解除绑定,比如只允许一次生效等场景。

public DistributedEventBus : DistributedEventBusBase, ISingletonDependency
{
    protected ConcurrentDictionary<Type, List<IEventHandlerFactory>> HandlerFactories { get; }
    //...
    public override void Unsubscribe<TEvent>(Func<TEvent, Task> action)
    {
        Check.NotNull(action, nameof(action));


        GetOrCreateHandlerFactories(typeof(TEvent))
            .Locking(factories =>
            {
                factories.RemoveAll(
                    factory =>
                    {
                        var singleInstanceFactory = factory as SingleInstanceHandlerFactory;
                        if (singleInstanceFactory == null)
                        {
                            return false;
                        }


                        var actionHandler = singleInstanceFactory.HandlerInstance as ActionEventHandler<TEvent>;
                        if (actionHandler == null)
                        {
                            return false;
                        }


                        return actionHandler.Action == action;
                    });
            });
    }
    //...
}

Publish

发布事件时,因需要考虑到分布式事务的处理,在发布参数上有些不同。主要考虑是否要在当前工作单元即将完成后发送事件,事件也往往定义成过去时,比如orderCreated, orderCompleted等完成时。

public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventBus, ISupportsEventBoxes
{
  //...
  public async Task PublishAsync(
      Type eventType,
      object eventData,
      bool onUnitOfWorkComplete = true,
      bool useOutbox = true)
  {
      // 当前有工作单元下且设定在工作单元快完成时发送事件。
      if (onUnitOfWorkComplete && UnitOfWorkManager.Current != null)
      {
          AddToUnitOfWork(
              UnitOfWorkManager.Current,
              new UnitOfWorkEventRecord(eventType, eventData, EventOrderGenerator.GetNext(), useOutbox)
          );
          return;
      }
  
      // 使用Outbox模式,将事件记录到本地消息表中
      if (useOutbox)
      {
          if (await AddToOutboxAsync(eventType, eventData))
          {
              return;
          }
      }
  
      // 直接发送事件到MQ或接收者(未配置MQ时使用LocalDistributedEventBus)
      await PublishToEventBusAsync(eventType, eventData);
  }
  //...
}

如上有三种处理逻辑,每一个拎出来细化。

onUnitOfWorkComplete(true)

在参数onUnitOfWorkComplete为true且当前开启了工作单元下将分布式事件加入到当前工作单元内。

public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventBus, ISupportsEventBoxes
{
    protected override void AddToUnitOfWork(IUnitOfWork unitOfWork, UnitOfWorkEventRecord eventRecord)
    {
        unitOfWork.AddOrReplaceDistributedEvent(eventRecord);
    }
}

在UnitOfWork内,则会将待发布的分布式事件保存在DistributedEvents集合中。

public class UnitOfWork : IUnitOfWork, ITransientDependency
{
  //...
  protected List<UnitOfWorkEventRecord> DistributedEvents { get; } = new List<UnitOfWorkEventRecord>();
  //...


  public virtual void AddOrReplaceDistributedEvent(
      UnitOfWorkEventRecord eventRecord,
      Predicate<UnitOfWorkEventRecord> replacementSelector = null)
  {
      AddOrReplaceEvent(DistributedEvents, eventRecord, replacementSelector);
  }
  
  public virtual void AddOrReplaceEvent(
      List<UnitOfWorkEventRecord> eventRecords,
      UnitOfWorkEventRecord eventRecord,
      Predicate<UnitOfWorkEventRecord> replacementSelector = null)
  {
      if (replacementSelector == null)
      {
          eventRecords.Add(eventRecord);
      }
      else
      {
          var foundIndex = eventRecords.FindIndex(replacementSelector);
          if (foundIndex < 0)
          {
              eventRecords.Add(eventRecord);
          }
          else
          {
              eventRecords[foundIndex] = eventRecord;
          }
      }
  }
}

当整个工作单元即将完成,则再次调用DistributedEventBus,将保存好的分布式事件发送出去。此次发送则标记onUnitOfWorkComplete为false。

public class UnitOfWork : IUnitOfWork, ITransientDependency
{
  //...
  protected List<UnitOfWorkEventRecord> DistributedEvents { get; } = new List<UnitOfWorkEventRecord>();
  protected IUnitOfWorkEventPublisher UnitOfWorkEventPublisher { get; }
  //...
  public virtual async Task CompleteAsync(CancellationToken cancellationToken = default)
  {
      if (_isRolledback)
      {
          return;
      }
  
      PreventMultipleComplete();
  
      try
      {
          _isCompleting = true;
          await SaveChangesAsync(cancellationToken);
  
          while (LocalEvents.Any() || DistributedEvents.Any())
          {
              if (LocalEvents.Any())
              {
                  var localEventsToBePublished = LocalEvents.OrderBy(e => e.EventOrder).ToArray();
                  LocalEvents.Clear();
                  await UnitOfWorkEventPublisher.PublishLocalEventsAsync(
                      localEventsToBePublished
                  );
              }
  
              if (DistributedEvents.Any())
              {
                  var distributedEventsToBePublished = DistributedEvents.OrderBy(e => e.EventOrder).ToArray();
                  DistributedEvents.Clear();
                  await UnitOfWorkEventPublisher.PublishDistributedEventsAsync(
                      distributedEventsToBePublished
                  );
              }
  
              await SaveChangesAsync(cancellationToken);
          }
  
          await CommitTransactionsAsync();
          IsCompleted = true;
          await OnCompletedAsync();
      }
      catch (Exception ex)
      {
          _exception = ex;
          throw;
      }
  }
}

此处同样会借助UnitOfWorkEventPublisher,其内部封装了DistributedEventBus,此时的onUnitOfWorkComplete参数为false;

public class UnitOfWorkEventPublisher : IUnitOfWorkEventPublisher, ITransientDependency
{
    //...
    private readonly IDistributedEventBus _distributedEventBus;
    //...
    public async Task PublishDistributedEventsAsync(IEnumerable<UnitOfWorkEventRecord> distributedEvents)
    {
        foreach (var distributedEvent in distributedEvents)
        {
            await _distributedEventBus.PublishAsync(
                distributedEvent.EventType,
                distributedEvent.EventData,
                onUnitOfWorkComplete: false,
                useOutbox: distributedEvent.UseOutbox
            );
        }
    }
}

205037014_231f328e-eeac-477f-9650-3c8db64c140a 如上过程简要描述下,因存在一个循环,主要是参数差异(onUnitOfWorkComplete)。

  1. 从DistributedEventBus.Publish(onUnitOfWorkComplete=true)调用到UnitOfWork.AddOrReplaceEvent,将待发布分布式事件存储到UnitOfWork.DistributedEvents中。

  2. 当前工作单元即将完成,从UnitOfWork.Complete又调用到DistributedEventBus.Publish(onUnitOfWorkComplete=false),将待发布的分布式事件发送,但不再回到第1步的处理过程,,而是取决于useOutbux,是存储到表待发还是直接发送。

onUnitOfWorkComplete(false)&useOutbox(true)

当useOutbox为true时,则保存事件到本地消息表中,保存时共享当前所在的工作单元,如果当前没有工作单元,则改为直接发送到MQ或LocalHandler。Inbox&Outbox模式见下节。

public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventBus, ISupportsEventBoxes
{
  //...
  
  public async Task PublishAsync(
      Type eventType,
      object eventData,
      bool onUnitOfWorkComplete = true,
      bool useOutbox = true)
  {
      //... 
  
     // 使用Outbox模式,将事件记录到本地消息表中
      if (useOutbox)
      {
          if (await AddToOutboxAsync(eventType, eventData))
          {
              return;
          }
      }
  
      // 直接发送事件到MQ或接收者(未配置MQ时使用LocalDistributedEventBus)
      await PublishToEventBusAsync(eventType, eventData);
  }


  private async Task<bool> AddToOutboxAsync(Type eventType, object eventData)
  {
      //共享当前工作单元
      var unitOfWork = UnitOfWorkManager.Current;
      if (unitOfWork == null)
      {
          return false;
      }
  
      foreach (var outboxConfig in AbpDistributedEventBusOptions.Outboxes.Values.OrderBy(x => x.Selector is null))
      {
          if (outboxConfig.Selector == null || outboxConfig.Selector(eventType))
          {
              var eventOutbox =
                  (IEventOutbox)unitOfWork.ServiceProvider.GetRequiredService(outboxConfig.ImplementationType);
              var eventName = EventNameAttribute.GetNameOrDefault(eventType);
              await eventOutbox.EnqueueAsync(
                  new OutgoingEventInfo(
                      GuidGenerator.Create(),
                      eventName,
                      Serialize(eventData),
                      Clock.Now
                  )
              );
              return true;
          }
      }
  
      return false;
  }
}

保存在消息表中后,ABP设计了单独的BackgroundWorker,定期扫描表将事件发送到MQ。

public class OutboxSenderManager : IBackgroundWorker
{
    //...
    public async Task StartAsync(CancellationToken cancellationToken = default)
    {
        foreach (var outboxConfig in Options.Outboxes.Values)
        {
            if (outboxConfig.IsSendingEnabled)
            {
                var sender = ServiceProvider.GetRequiredService<IOutboxSender>();
                await sender.StartAsync(outboxConfig, cancellationToken);
                Senders.Add(sender);
            }
        }
    }
    /...
}


public class OutboxSender : IOutboxSender, ITransientDependency
{
  //...
  protected virtual async Task PublishOutgoingMessagesAsync(List<OutgoingEventInfo> waitingEvents)
  {
      foreach (var waitingEvent in waitingEvents)
      {
          // 发送分布式事件到MQ中
          await DistributedEventBus
              .AsSupportsEventBoxes()
              .PublishFromOutboxAsync(
                  waitingEvent,
                  OutboxConfig
              );


          await Outbox.DeleteAsync(waitingEvent.Id);
      }
  }
  //...
}

onUnitOfWorkComplete(false)&useOutbox(false)

最后一种模式下,即useOutbox为false时,直接发送到MQ或LocalHandler,以RabbitMq为例,最终使用RabbitMq包中channel发送消息到RabbitMq中。对应于Azure,kafka和Rebus都是使用各自的包发送到自家MQ中。

public class RabbitMqDistributedEventBus : DistributedEventBusBase, ISingletonDependency
{
    //...
    protected async override Task PublishToEventBusAsync(Type eventType, object eventData)
    {
        await PublishAsync(eventType, eventData, null);
    }


    public Task PublishAsync(
        Type eventType,
        object eventData        IBasicProperties properties,
        Dictionary<string, object> headersArguments = null)
    {
        var eventName = EventNameAttribute.GetNameOrDefault(eventType);
        var body = Serializer.Serialize(eventData);


        return PublishAsync(eventName, body, properties, headersArguments);
    }


    protected virtual Task PublishAsync(
        string eventName,
        byte[] body,
        IBasicProperties properties,
        Dictionary<string, object> headersArguments = null,
        Guid? eventId = null)
    {
        using (var channel = ConnectionPool.Get(AbpRabbitMqEventBusOptions.ConnectionName).CreateModel())
        {
            return PublishAsync(channel, eventName, body, properties, headersArguments, eventId);
        }
    }


    protected virtual Task PublishAsync(
        IModel channel,
        string eventName,
        byte[] body,
        IBasicProperties properties,
        Dictionary<string, object> headersArguments = null,
        Guid? eventId = null)
    {
        EnsureExchangeExists(channel);


        if (properties == null)
        {
            properties = channel.CreateBasicProperties();
            properties.DeliveryMode = RabbitMqConsts.DeliveryModes.Persistent;
        }


        if (properties.MessageId.IsNullOrEmpty())
        {
            properties.MessageId = (eventId ?? GuidGenerator.Create()).ToString("N");
        }


        SetEventMessageHeaders(properties, headersArguments);


        // 发送消息到RabbitMq
        channel.BasicPublish(
            exchange: AbpRabbitMqEventBusOptions.ExchangeName,
            routingKey: eventName,
            mandatory: true,
            basicProperties: properties,
            body: body
        );


        return Task.CompletedTask;
    }
    //...
}

如果使用的是LocalDistributedEventBus,则其内部使用LocalEventBus,走本地事件方式就回归到上一节LocalEventBus中代码了。

public class LocalDistributedEventBus : IDistributedEventBus, ISingletonDependency
{
  private readonly ILocalEventBus _localEventBus;
  //...
  protected override async Task PublishToEventBusAsync(Type eventType, object eventData)
  {
      await PublishAsync(new LocalEventMessage(Guid.NewGuid(), eventData, eventType));
  }


  public Task PublishAsync<TEvent>(TEvent eventData, bool onUnitOfWorkComplete = true)
      where TEvent : class
  {
      return _localEventBus.PublishAsync(eventData, onUnitOfWorkComplete);
  }


  public Task PublishAsync(Type eventType, object eventData, bool onUnitOfWorkComplete = true)
  {
      return _localEventBus.PublishAsync(eventType, eventData, onUnitOfWorkComplete);
  }
  
  public Task PublishAsync<TEvent>(TEvent eventData, bool onUnitOfWorkComplete = true, bool useOutbox = true) where TEvent : class
  {
      return _localEventBus.PublishAsync(eventData, onUnitOfWorkComplete);
  }


  public Task PublishAsync(Type eventType, object eventData, bool onUnitOfWorkComplete = true, bool useOutbox = true)
  {
      return _localEventBus.PublishAsync(eventType, eventData, onUnitOfWorkComplete);
  }
}

Inbox&Outbox

本地消息表作为解决分布式最终一致性的利器,在ABP中也实现了,尽管功能有限,不像DotNet Cap般强大,但总归还是能够处理一些分布式事务场景需求。如果想要在服务中使用上Inbox&Oubbox,则需要创建其对应表,ABP已经提前设计好了该部分实体及表。只需在DbContext中加入属性及模型配置即可。

public class AppDbContext : AbpDbContext<AppDbContext >
{
  //...
  public DbSet<OutgoingEventRecord> OutgoingEvents { get; set; }
  public DbSet<IncomingEventRecord> IncomingEvents { get; set; }


  protected override void OnModelCreating(ModelBuilder builder)
  {
    //...
    builder.ConfigureEventOutbox();
    builder.ConfigureEventInbox();
  }
}

Outbox

EventStorage

在DistributedEventBus一节中提及到了分布式事件会存到表中,即Outbox对应表。

public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventBus, ISupportsEventBoxes
{
    //...
    private async Task<bool> AddToOutboxAsync(Type eventType, object eventData)
    {
        var unitOfWork = UnitOfWorkManager.Current;
        if (unitOfWork == null)
        {
            return false;
        }
          
        foreach (var outboxConfig in AbpDistributedEventBusOptions.Outboxes.Values.OrderBy(x => x.Selector is null))
        {
            // 事件类型匹配是否该outboxConfig符合条件
            if (outboxConfig.Selector == null || outboxConfig.Selector(eventType))
            {
                // 实例化具体类型的EventOutbox
                var eventOutbox = (IEventOutbox)unitOfWork.ServiceProvider.GetRequiredService(outboxConfig.ImplementationType);
                var eventName = EventNameAttribute.GetNameOrDefault(eventType);


                // 分布式事件入表
                await eventOutbox.EnqueueAsync(
                    new OutgoingEventInfo(
                        GuidGenerator.Create(),
                        eventName,
                        Serialize(eventData),
                        Clock.Now
                    )
                );
                return true;
            }
        }


        return false;
    }
    //...
}

OutboxConfig

在保存时,会先根据事件类型选择Outbox的配置,对应关系的存储在AbpDistributedEventBusOptions中。

public class AbpDistributedEventBusOptions
{
    //...
    public OutboxConfigDictionary Outboxes { get; }
    //...
}


public class OutboxConfigDictionary : Dictionary<string, OutboxConfig>
{
    public void Configure(Action<OutboxConfig> configAction)
    {
        Configure("Default", configAction);
    }


    public void Configure(string outboxName, Action<OutboxConfig> configAction)
    {
        var outboxConfig = this.GetOrAdd(outboxName, () => new OutboxConfig(outboxName));
        configAction(outboxConfig);
    }
}


public class OutboxConfig
{
    public string Name { get; }
    public Type ImplementationType { get; set; }
    public Func<Type, bool> Selector { get; set; }
    public bool IsSendingEnabled { get; set; } = true;
}

默认情况下,如果当前服务只有一个Database,在OutboxConfigDictionary中则是使用Default,服务注册时,如下配置即可。

Configure<AbpDistributedEventBusOptions>(options =>
{
    options.Outboxes.Configure(config =>
    {
        config.MySQL<MyDbContext>();
    });
});

当有多个Database时,则可以配置多个Outbox并且设置存储源,还能够过滤事件类型是否在这个Outbox上应用,从而实现对于事件类型能够分开处理。

Configure<AbpDistributedEventBusOptions>(options =>
{
    options.Outboxes.Configure(config =>
    {
        config.MySQL<MyDbContext>();
    });
    options.Outboxes.Configure("second", config =>
    {
        config.UseMySQL<MySecondDbContext>();
    });
    options.Outboxes.Configure("third", config =>
    {
        config.UseMongoDbContext<MyMongoDbContext>();
        config.Selector = eventType =>
        {
            return eventType == typeof(ProductCreatedEto);
        };
    });
});

在如上UseMySQL,UseMongbDbContex扩展方法中,实际上是对ImplementationType赋值。

public static void UseMySQL<TDbContext>(this OutboxConfig outboxConfig)
    where TDbContext : IHasEventOutbox
{
    outboxConfig.ImplementationType = typeof(ISqlRawDbContextEventOutbox<TDbContext>);
}

EventOutbox

当有了符合事件类型的OutboxConfig后,则是按照ImplementationType实例化相应的EventOutbox。

var eventOutbox = (IEventOutbox)unitOfWork.ServiceProvider.GetRequiredService(outboxConfig.ImplementationType);
var eventName = EventNameAttribute.GetNameOrDefault(eventType);

对于EventOutbox因为存储源的的不同,分别做了实现。 205041821_7ad1a7a2-9999-4309-8d9d-8c8b3cbcc8de 例如,当使用EfCore的DbContext时,存储分布式事件时,使用DbContext.Entity直接保存到表中。

public class DbContextEventOutbox<TDbContext> : IDbContextEventOutbox<TDbContext>
    where TDbContext : IHasEventOutbox
{
    //...
    [UnitOfWork]
    public virtual async Task EnqueueAsync(OutgoingEventInfo outgoingEvent)
    {
        var dbContext = (IHasEventOutbox)await DbContextProvider.GetDbContextAsync();
        dbContext.OutgoingEvents.Add(
            new OutgoingEventRecord(outgoingEvent)
        );
    }
    //...
}

最终,分布式事件存入到存储源中。

EventPublish

在之前提到了有个单独的线程将分布式事件发送到MQ中,此处细说,在AbpEventBusModule中,应用启动时会启动两个BackgroudWorker。

public class AbpEventBusModule : AbpModule
{
    //...
    public async override Task OnApplicationInitializationAsync(ApplicationInitializationContext context)
    {
        await context.AddBackgroundWorkerAsync<OutboxSenderManager>();
        await context.AddBackgroundWorkerAsync<InboxProcessManager>();
    }
   //...
}

OutboxSenderManager会定期启动,扫描服务注册时配置好的多个OutboxConfig并交付给OutboxSender来执行读取存储的分布式事件及发送到MQ。

public class OutboxSenderManager : IBackgroundWorker
{
    protected AbpDistributedEventBusOptions Options { get; }
    protected IServiceProvider ServiceProvider { get; }
    protected List<IOutboxSender> Senders { get; }
    //...
    public async Task StartAsync(CancellationToken cancellationToken = default)
    {
        foreach (var outboxConfig in Options.Outboxes.Values)
        {
            if (outboxConfig.IsSendingEnabled)
            {
                var sender = ServiceProvider.GetRequiredService<IOutboxSender>();
                await sender.StartAsync(outboxConfig, cancellationToken);
                Senders.Add(sender);
            }
        }
    } 
    //...
}

在OutboxSender内部,则是先进行读取

public class OutboxSender : IOutboxSender, ITransientDependency
{
    protected IDistributedEventBus DistributedEventBus { get; }
    protected IEventOutbox Outbox { get; private set; }
    protected OutboxConfig OutboxConfig { get; private set; }
    protected AbpEventBusBoxesOptions EventBusBoxesOptions { get; }
    //...
    protected virtual async Task RunAsync()
    {
        //...
        while (true)
        {
            // 读取待发送的分布式事件
            var waitingEvents = await Outbox.GetWaitingEventsAsync(EventBusBoxesOptions.OutboxWaitingEventMaxCount, StoppingToken);
            if (waitingEvents.Count <= 0)
            {
                // 发送完毕则推出
                break;
            }


            if (EventBusBoxesOptions.BatchPublishOutboxEvents)
            {
                // 批量发送
                await PublishOutgoingMessagesInBatchAsync(waitingEvents);
            }
            else
            {
                // 单条发送
                await PublishOutgoingMessagesAsync(waitingEvents);
            }
        }
        //...
    }


    protected virtual async Task PublishOutgoingMessagesAsync(List<OutgoingEventInfo> waitingEvents)
    {
        foreach (var waitingEvent in waitingEvents)
        {
            // 通过EventBus发送
            await DistributedEventBus
                .AsSupportsEventBoxes()
                .PublishFromOutboxAsync(
                    waitingEvent,
                    OutboxConfig
                );


            // 发送完毕删除存储的分布式事件记录
            await Outbox.DeleteAsync(waitingEvent.Id);
            //...
        }
    }


    protected virtual async Task PublishOutgoingMessagesInBatchAsync(List<OutgoingEventInfo> waitingEvents)
    {
        // 通过EventBus发送
        await DistributedEventBus
            .AsSupportsEventBoxes()
            .PublishManyFromOutboxAsync(waitingEvents, OutboxConfig);
        
        // 发送完毕删除存储的分布式事件记录
        await Outbox.DeleteManyAsync(waitingEvents.Select(x => x.Id).ToArray());
        //...
    }
}

最终还是回到了EventBus中发送事件,但EventBus有要求,必须要继承了ISupportsEventBoxes,其调用的方法来源于该接口中。

public interface ISupportsEventBoxes
{
    Task PublishFromOutboxAsync(OutgoingEventInfo outgoingEvent, OutboxConfig outboxConfig);
    Task PublishManyFromOutboxAsync(IEnumerable<OutgoingEventInfo> outgoingEvents, OutboxConfig outboxConfig);
    Task ProcessFromInboxAsync(IncomingEventInfo incomingEvent, InboxConfig inboxConfig);
}

其实现为各Mq的EventBus,以RabbitMqDistributedEventBus为例,其内部直接使用channel发送事件到Mq,Azure,Kafka和Rebus均类似。

public class RabbitMqDistributedEventBus : DistributedEventBusBase, ISingletonDependency
{  
    //...
    public async override Task PublishManyFromOutboxAsync(
        IEnumerable<OutgoingEventInfo> outgoingEvents,
        OutboxConfig outboxConfig)
    {
        using (var channel = ConnectionPool.Get(AbpRabbitMqEventBusOptions.ConnectionName).CreateModel())
        {
            var outgoingEventArray = outgoingEvents.ToArray();
            channel.ConfirmSelect();


            foreach (var outgoingEvent in outgoingEventArray)
            {
                await PublishAsync(
                    channel,
                    outgoingEvent.EventName, 
                    outgoingEvent.EventData,  
                    properties: null,
                    eventId: outgoingEvent.Id);
            }


            channel.WaitForConfirmsOrDie();
        }
    }
    //...
}

该部分类图简要如下 205042983_aec6fe60-2bd2-4154-ad4a-408e5e015646 此处有一个控制参数,控制读取数量,发送失败间隔时间等,可在服务注册时对其配置,也可直接使用默认值。

public class AbpEventBusBoxesOptions
{
    public TimeSpan CleanOldEventTimeIntervalSpan { get; set; }
    public int InboxWaitingEventMaxCount { get; set; }
    public int OutboxWaitingEventMaxCount { get; set; }
    public TimeSpan PeriodTimeSpan { get; set; }
    public TimeSpan DistributedLockWaitDuration { get; set; }
    public TimeSpan WaitTimeToDeleteProcessedInboxEvents { get; set; }
    public bool BatchPublishOutboxEvents { get; set; }
}

Inbox

EventStorage

当处于MQ的另一端从Mq拉取到消息或MQ主动推送消息后,按照本地消息表的工作原理,需要先保存到表中。在各MQ的DistributedEventBus服务初始化时,提及到Handler的订阅过程,同时还有一个回调注册。例如在RabbitMqDistributedEventBus中,先注册消费者,同时设置了一个回调,用来接收MQ推送的消息触发回调ProcessEventAsync。

public class RabbitMqDistributedEventBus : DistributedEventBusBase, ISingletonDependency
{
    protected AbpRabbitMqEventBusOptions AbpRabbitMqEventBusOptions { get; }
    protected ConcurrentDictionary<Type, List<IEventHandlerFactory>> HandlerFactories { get; }
    //...
    public void Initialize()
    {
        // 注册消费者
        Consumer = MessageConsumerFactory.Create(
            new ExchangeDeclareConfiguration(
                AbpRabbitMqEventBusOptions.ExchangeName,
                type: AbpRabbitMqEventBusOptions.GetExchangeTypeOrDefault(),
                durable: true
            ),
            new QueueDeclareConfiguration(
                AbpRabbitMqEventBusOptions.ClientName,
                durable: true,
                exclusive: false,
                autoDelete: false
            ),
            AbpRabbitMqEventBusOptions.ConnectionName
        );
        // 绑定收到MQ的消息后的后续处理
        Consumer.OnMessageReceived(ProcessEventAsync);


        SubscribeHandlers(AbpDistributedEventBusOptions.Handlers);
    }
    //...
}

该方法中,实现了收到MQ的消息后转换存储到本地表中。

public class RabbitMqDistributedEventBus : DistributedEventBusBase, ISingletonDependency
{
    //...
    private async Task ProcessEventAsync(IModel channel, BasicDeliverEventArgs ea)
    {
        var eventName = ea.RoutingKey;
        var eventType = EventTypes.GetOrDefault(eventName);
        if (eventType == null)
        {
            return;
        }
    
        var eventBytes = ea.Body.ToArray();
    
        if (await AddToInboxAsync(ea.BasicProperties.MessageId, eventName, eventType, eventBytes))
        {
            return;
        }
    
        var eventData = Serializer.Deserialize(eventBytes, eventType);
    
        await TriggerHandlersAsync(eventType, eventData);
    }
    //...
}

其中会分为两种场景处理:

  • 如果没有使用上Inbox,事件则直接被触发,该触发逻辑与下一小节中的TriggerHandlersAsync为同一个方法,此处先不提及。

  • 如果使用了Inbox,则会将事件存储到表中,再由下一小节的InboxProcessManager来推动事件触发。

public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventBus, ISupportsEventBoxes
{
    //...
    protected async Task<bool> AddToInboxAsync(
        string messageId,
        string eventName,
        Type eventType,
        byte[] eventBytes)
    {
        if (AbpDistributedEventBusOptions.Inboxes.Count <= 0)
        {
            return false;
        }


        using (var scope = ServiceScopeFactory.CreateScope())
        {
            foreach (var inboxConfig in AbpDistributedEventBusOptions.Inboxes.Values.OrderBy(x => x.EventSelector is null))
            {
                // 事件类型匹配是否该inboxConfig符合条件
                if (inboxConfig.EventSelector == null || inboxConfig.EventSelector(eventType))
                {
                    // 实例化具体类型的EventInbox
                    var eventInbox = (IEventInbox)scope.ServiceProvider.GetRequiredService(inboxConfig.ImplementationType);
                    if (!messageId.IsNullOrEmpty())
                    {
                        // 消息幂等校验,重复消息则去除
                        if (await eventInbox.ExistsByMessageIdAsync(messageId))
                        {
                            continue;
                        }
                    }


                    // 保存到存储源中
                    await eventInbox.EnqueueAsync(new IncomingEventInfo(
                            GuidGenerator.Create(),
                            messageId,
                            eventName,
                            eventBytes,
                            Clock.Now
                        )
                    );
                }
            }
        }


        return true;
    }
    //...
}

InboxConfig

在保存事件到表前,会先根据事件类型选择一个InboxConfig,类似于OutboxConfig,配置了一些存储源,过滤特定事件类型等。对应关系的存储在AbpDistributedEventBusOptions中。

public class AbpDistributedEventBusOptions
{
    //...
    public InboxConfigDictionary Inboxes { get; }
    //...
}


public class InboxConfigDictionary : Dictionary<string, InboxConfig>
{
    public void Configure(Action<InboxConfig> configAction)
    {
        Configure("Default", configAction);
    }


    public void Configure(string outboxName, Action<InboxConfig> configAction)
    {
        var outboxConfig = this.GetOrAdd(outboxName, () => new InboxConfig(outboxName));
        configAction(outboxConfig);
    }
}


public class InboxConfig
{
    public string Name { get; }
    public Type ImplementationType { get; set; }
    public Func<Type, bool> EventSelector { get; set; }
    public Func<Type, bool> HandlerSelector { get; set; }
    public bool IsProcessingEnabled { get; set; } = true;
}

在服务注册时,可以进行配置。默认情况下,如果当前服务只有一个Database,在InboxConfigDictionary中则是使用Default,服务注册时,如下配置即可。

Configure<AbpDistributedEventBusOptions>(options =>
{
    options.Inboxes.Configure(config =>
    {
        config.UseMySQL();
    });
});

当有多个Database时,则可以配置多个Inbox并且设置存储源,还能够过滤事件类型是否在这个Inbox上应用,从而实现对于事件类型能够分开处理。

Configure<AbpDistributedEventBusOptions>(options =>
{
    options.Inboxes.Configure(config =>
    {
        config.MySQL<MyDbContext>();
    });
    options.Inboxes.Configure("second", config =>
    {
        config.UseMySQL<MySecondDbContext>();
    });
    options.Inboxes.Configure("third", config =>
    {
        config.UseMongoDbContext<MyMongoDbContext>();
        config.EventSelector = eventType =>
        {
            return eventType == typeof(ProductCreatedEto);
        };
        config.HandlerSelector = handlerType =>
        {
            return handlerType == typeof(ProductCreatedHandler);
        };
    });
});

在如上UseMySQL,UseMongbDbContex扩展方法中,实际上是对ImplementationType赋值。

public static void UseMySQL<TDbContext>(this InboxConfig inboxConfig)
    where TDbContext : IHasEventInbox
{
    inboxConfig.ImplementationType = typeof(ISqlRawDbContextEventOutbox<TDbContext>);
}

EventInBox

当有了符合事件类型的InConfig后,则是按照ImplementationType实例化相应的EventOutbox。

var eventInbox = (IEventInbox)scope.ServiceProvider.GetRequiredService(inboxConfig.ImplementationType);

对于EventInbox因为存储源的的不同,分别做了实现。 205044121_682e4001-1360-4b6c-ba40-43534e554f6e 例如,当使用EfCore的DbContext时,存储分布式事件时,使用DbContext.Entity直接保存到表中。

public class DbContextEventInbox<TDbContext> : IDbContextEventInbox<TDbContext>
    where TDbContext : IHasEventInbox
{
    //...
    [UnitOfWork]
    public virtual async Task EnqueueAsync(IncomingEventInfo incomingEvent)
    {
        var dbContext = await DbContextProvider.GetDbContextAsync();
      
        dbContext.IncomingEvents.Add(
            new IncomingEventRecord(incomingEvent)
        );
    }
    //...
}

最终,分布式事件存入到存储源中。

EventTrigger

消息已经到表中,还需要实现匹配相应的Handler从而完成消费端对关注事件的处理逻辑。在AbpEventBusModule中,应用启动时除了OutboxSenderManager还有一个InboxProcessManager,来管理将收到的消息触发其对应的EventHandler。

public class AbpEventBusModule : AbpModule
{
    //...
    public async override Task OnApplicationInitializationAsync(ApplicationInitializationContext context)
    {
        await context.AddBackgroundWorkerAsync<OutboxSenderManager>();
        await context.AddBackgroundWorkerAsync<InboxProcessManager>();
    }
   //...
}

这也是一个BackgroundWorker,循环所有的存储源,拿到InboxConfig移交给InboxProcessor来具体执行。

public class InboxProcessManager : IBackgroundWorker
{
    protected AbpDistributedEventBusOptions Options { get; }
    protected IServiceProvider ServiceProvider { get; }
    protected List<IInboxProcessor> Processors { get; }
    ///...
    public async Task StartAsync(CancellationToken cancellationToken = default)
    {
        foreach (var inboxConfig in Options.Inboxes.Values)
        {
            if (inboxConfig.IsProcessingEnabled)
            {
                var processor = ServiceProvider.GetRequiredService<IInboxProcessor>();
                await processor.StartAsync(inboxConfig, cancellationToken);
                Processors.Add(processor);
            }
        }
    }
    public async Task StopAsync(CancellationToken cancellationToken = default)
    {
        foreach (var processor in Processors)
        {
            await processor.StopAsync(cancellationToken);
        }
    }
}


public class InboxProcessor : IInboxProcessor, ITransientDependency
{
    //...
    protected IDistributedEventBus DistributedEventBus { get; }
    protected InboxConfig InboxConfig { get; private set; }
    protected AbpEventBusBoxesOptions EventBusBoxesOptions { get; }
    //...
    protected virtual async Task RunAsync()
    {
      // 删除过期的事件
      await DeleteOldEventsAsync();


      //...
      while (true)
      {  
          // 循环读取待处理状态的事件,直到全部读取完成
          var waitingEvents = await Inbox.GetWaitingEventsAsync(EventBusBoxesOptions.InboxWaitingEventMaxCount, StoppingToken);
          if (waitingEvents.Count <= 0)
          {
              break;
          }
          //...
          foreach (var waitingEvent in waitingEvents)
          {
              using (var uow = UnitOfWorkManager.Begin(isTransactional: true, requiresNew: true))
              {
                  // 通过事件类型从HanderFactories中找到符合条件的HandlerFactory再实现调用各Handler的方法
                  await DistributedEventBus
                      .AsSupportsEventBoxes()
                      .ProcessFromInboxAsync(waitingEvent, InboxConfig);


                  // 事件触发完成则标记完成状态
                  await Inbox.MarkAsProcessedAsync(waitingEvent.Id);


                  await uow.CompleteAsync();
              }
              //...
          }
      }
      //...
    }
}

如上使用的DistributedEventBus仍然需要是支持ISupportsEventBoxes接口的。以RabbitMqDistributedEventBus为例,

public class RabbitMqDistributedEventBus : DistributedEventBusBase, ISingletonDependency
{
    //...
    public async override Task ProcessFromInboxAsync(IncomingEventInfo incomingEvent, InboxConfig inboxConfig)
    {
        var eventType = EventTypes.GetOrDefault(incomingEvent.EventName);
        if (eventType == null)
        {
            return;
        }
    
        var eventData = Serializer.Deserialize(incomingEvent.EventData, eventType);
        var exceptions = new List<Exception>();
        // 执行EventType找Handler完成触发逻辑
        await TriggerHandlersAsync(eventType, eventData, exceptions, inboxConfig);
        if (exceptions.Any())
        {
            ThrowOriginalExceptions(eventType, exceptions);
        }
    }
    
    protected virtual async Task TriggerHandlersAsync(Type eventType, object eventData, List<Exception> exceptions, InboxConfig inboxConfig = null)
    {
        await new SynchronizationContextRemover();


        // 按照事件类型得到关联的HandlerFactory
        foreach (var handlerFactories in GetHandlerFactories(eventType))
        {
            // 得到HandlerFactory中的Handler,传参调用Handler的执行方法
            foreach (var handlerFactory in handlerFactories.EventHandlerFactories)
            {
                await TriggerHandlerAsync(handlerFactory, handlerFactories.EventType, eventData, exceptions, inboxConfig);
            }
        }


        // 泛型事件参数Handler下的事件处理(该部分仅用于LocalEvent,此处可忽略)
        if (eventType.GetTypeInfo().IsGenericType &&
            eventType.GetGenericArguments().Length == 1 &&
            typeof(IEventDataWithInheritableGenericArgument).IsAssignableFrom(eventType))
        {
            //...
        }
    }


    // 核心方法,执行Handler的处理逻辑
    protected virtual async Task TriggerHandlerAsync(IEventHandlerFactory asyncHandlerFactory, Type eventType,
        object eventData, List<Exception> exceptions, InboxConfig inboxConfig = null)
    {
        using (var eventHandlerWrapper = asyncHandlerFactory.GetHandler())
        {
          //...
          // 通过EventHandler类型找到匹配的InboxConfig,再次匹配下Handler是否符合预先配置好的条件
          var handlerType = eventHandlerWrapper.EventHandler.GetType();
          if (inboxConfig?.HandlerSelector != null &&
              !inboxConfig.HandlerSelector(handlerType))
          {
              return;
          }
    
          // 设置当前执行环境的租户
          using (CurrentTenant.Change(GetEventDataTenantId(eventData)))
          {
              // 调用进入到Handler的方法中,在上一节中提及EventHandlerInvoker
              await EventHandlerInvoker.InvokeAsync(eventHandlerWrapper.EventHandler, eventData, eventType);
          }
        }
    }
    //...
}

最终进入到EventHandlerInvoker中,则是进入到了EventHandler提及的小节。 该部分类图简要如下

205045710_5b66ded8-c03f-46ce-bf1d-9c5eb6acc2d1

扩展

可扩展点非常之多,此处只简单提及几个。

  1. 实现自定义的DistributedEventBus,如实现基于Redis的,基于System.Net.Channels的,可参照RabbitMqDistributedEventBus少量修改。

  2. 扩展IEventHandlerInvoker,实现自定义的EventHandlerInvoker,在Handler执行前后增加一些功能。

  3. 重写EventBusBase中的TriggerHandlersAsync逻辑,在原有基础上再扩展功能。

2024-12-26,望技术有成后能回来看见自己的脚步。