ABP Framework-BackgroundJob源码解析

目录

https://abp.io/docs/6.0/Background-Jobs

Version

6.0.3

Package

Volo.Abp.BackgroundJobs.HangFire //依赖Hangfire和Volo.Abp.BackgroundJobs.Abstractions
Volo.Abp.BackgroundJobs.Quartz //依赖Quartz和Volo.Abp.BackgroundJobs.Abstractions
Volo.Abp.BackgroundJobs.RabbitMQ //依赖RabbitMQ和Volo.Abp.BackgroundJobs.Abstractions
Volo.Abp.BackgroundJobs //依赖BackgroundWorker和Volo.Abp.BackgroundJobs.Abstractions,与上第三方组件平级
Volo.Abp.BackgroundJobs.Abstractions


//独立模块
Volo.Abp.BackgroundJobs.Domain
Volo.Abp.BackgroundJobs.Domain.Shared
Volo.Abp.BackgroundJobs.EntityFrameworkCore
Volo.Abp.BackgroundJobs.MongoDB

BackgroundJob

JobArgs

作为最小执行单元,使用时只需继承BackgroundJob或AsyncBackgroundJob。

public class EmailSendingJob : AsyncBackgroundJob<EmailSendingArgs>, ITransientDependency
{
    private readonly IEmailSender _emailSender;


    public EmailSendingJob(IEmailSender emailSender)
    {
        _emailSender = emailSender;
    }


    public override async Task ExecuteAsync(EmailSendingArgs args)
    {
        await _emailSender.SendAsync(args.EmailAddress, args.Subject, args.Body);
    }
}

前提是需要一个JobArgs,来传递Job执行需要的一些参数。

public class EmailSendingArgs
{
    public string EmailAddress { get; set; }
    public string Subject { get; set; }
    public string Body { get; set; }
}

文档中提到BackgroundJobName,在代码中体现为BackgroundJobNameAttribute。 204419812_a16ae82e-8133-4d8b-91ea-4557d06d2d11 实则是在当使用了RabbitMQ或者类似的第三方组件时才需要拿到名字作为队列名,如果在Hangfire或者Quartz或者ABP自身的BackgroundJobWorker管理,都无需要使用到BackgroundJobNameAttribute。当然如果加上了,也没有影响,只是加入Job时,会以加上的名字作为Job名,加入到执行队列中。以DefaultBackgroundJobManager为例

[Dependency(ReplaceServices = true)]
public class DefaultBackgroundJobManager : IBackgroundJobManager, ITransientDependency
{
    //...


    public virtual async Task<string> EnqueueAsync<TArgs>(TArgs args, BackgroundJobPriority priority = BackgroundJobPriority.Normal, TimeSpan? delay = null)
    {
        var jobName = BackgroundJobNameAttribute.GetName<TArgs>();
        var jobId = await EnqueueAsync(jobName, args, priority, delay);
        return jobId.ToString();
    }


    //...
}

当取JobArgs的名字时,优先从Attribute中取,如果没有则使用类名

public class BackgroundJobNameAttribute : Attribute, IBackgroundJobNameProvider
{
    //...
    
    public static string GetName([NotNull] Type jobArgsType)
    {
        Check.NotNull(jobArgsType, nameof(jobArgsType));


        return jobArgsType.GetCustomAttributes(true)
                   .OfType<IBackgroundJobNameProvider>()
                   .FirstOrDefault()?.Name
               ?? jobArgsType.FullName;
    }
}

BackgroundJob

回归到最小执行单元BackgroundJob,ABP中提供了两类抽象接口与抽象实现

204420986_38bce3c7-670f-456f-b349-5c57790b157c BackgroundJob中并没有什么实质性内容,仅作为抽象封装。

public abstract class AsyncBackgroundJob<TArgs> : IAsyncBackgroundJob<TArgs>
{
    public ILogger<AsyncBackgroundJob<TArgs>> Logger { get; set; }


    protected AsyncBackgroundJob()
    {
        Logger = NullLogger<AsyncBackgroundJob<TArgs>>.Instance;
    }


    public abstract Task ExecuteAsync(TArgs args);
}

BackgroundJob注册

项目启动时,会扫描到所有继承自IBackgroundJob和IAsyncBackgroundJob的实现完成注册并将注册内容存储到AbpBackgroundJobOptions中。

[DependsOn(typeof(AbpJsonModule))]
public class AbpBackgroundJobsAbstractionsModule : AbpModule
{
    public override void PreConfigureServices(ServiceConfigurationContext context)
    {
        RegisterJobs(context.Services);
    }


    private static void RegisterJobs(IServiceCollection services)
    {
        var jobTypes = new List<Type>();
        
        // 扫描实现加入到JobTypes中
        services.OnRegistred(context =>
        {
            if (ReflectionHelper.IsAssignableToGenericType(context.ImplementationType, typeof(IBackgroundJob<>)) ||
                ReflectionHelper.IsAssignableToGenericType(context.ImplementationType, typeof(IAsyncBackgroundJob<>)))
            {
                jobTypes.Add(context.ImplementationType);
            }
        });
        
        // 扫描到的结果转换存储到AbpBackgroundJobOptions中
        services.Configure<AbpBackgroundJobOptions>(options =>
        {
            foreach (var jobType in jobTypes)
            {
                options.AddJob(jobType);
            }
        });
    }
}

在Options中,一个属性是控制整个BackgroundJob是否启用,另外的则是使用字典存储所有注册的Job,简化代码如下。

public class AbpBackgroundJobOptions
{
    private readonly Dictionary<Type, BackgroundJobConfiguration> _jobConfigurationsByArgsType;
    public bool IsJobExecutionEnabled { get; set; } = true;


    public BackgroundJobConfiguration GetJob(Type argsType)
    {
        return _jobConfigurationsByArgsType.GetOrDefault(argsType);
    }


    public void AddJob(BackgroundJobConfiguration jobConfiguration)
    {
        _jobConfigurationsByArgsType[jobConfiguration.ArgsType] = jobConfiguration;
    }
}

此处依赖BackgroundJobConfiguration类,该类仅作为一个记录中转,记录Job本身的类型,JobArgs类型,Job名,对这三者信息的封装。

public class BackgroundJobConfiguration
{
    public Type ArgsType { get; }
    public Type JobType { get; }
    public string JobName { get; }


    public BackgroundJobConfiguration(Type jobType)
    {
        JobType = jobType;
        //从JobType中分析出JobArgs的类型,取其泛型参数具体类型
        ArgsType = BackgroundJobArgsHelper.GetJobArgsType(jobType);
        JobName = BackgroundJobNameAttribute.GetName(ArgsType);
    }
}

BackgroundJobExecuter

BackgroundJob类型上分为了两类,ABP代码中并不直接对这两类Job分别调用,而是再包一层,对这两类的调用抽象,得到一个唯一的操作入口。

204422162_be33c88e-00a1-4a8b-9547-d7db41ee0c11 ExecuteAsync方法简要如下代码,JobExecutionContext承担要执行的Job信息的承载,和BackgroundJobConfiguration的职责类似。其中完成按类别调用实际的Job,该方法支持override,如果想要自定义一个BackgroundJob类型,可以在此基础上参考。上层的Hangfire/Quartz/RabbitMQ/ABP-BackgroundJobWorker等都是基于IBackgroundJobExecuter来执行,因此此处扩展并不会对上层产生影响。

public class BackgroundJobExecuter : IBackgroundJobExecuter, ITransientDependency
{
    //...
    
    public virtual async Task ExecuteAsync(JobExecutionContext context)
    {
        // 从DI中获得Job实例
        var job = context.ServiceProvider.GetService(context.JobType);


        // 按照继承接口类型找到执行方法
        var jobExecuteMethod = context.JobType.GetMethod(nameof(IBackgroundJob<object>.Execute)) ??
                               context.JobType.GetMethod(nameof(IAsyncBackgroundJob<object>.ExecuteAsync));
        //...
        try
        {
            // 分类执行对应方法
            if (jobExecuteMethod.Name == nameof(IAsyncBackgroundJob<object>.ExecuteAsync))
            {
                await ((Task)jobExecuteMethod.Invoke(job, new[] { context.JobArgs }));
            }
            else
            {
                jobExecuteMethod.Invoke(job, new[] { context.JobArgs });
            }
        }
        catch (Exception ex)
        {
            //...
            // 异常转换为BackgroundJob独有异常,便于捕获方便后续Job重试
            throw new BackgroundJobExecutionException("A background job execution is failed. See inner exception for details.", ex)
            {
                JobType = context.JobType.AssemblyQualifiedName,
                JobArgs = context.JobArgs
            };
        }
    }
}

对于上层调用方(Hangfire/Quartz/RabbitMQ/ABP-BackgroundJob)对BackgroundJobExecuter的具体使用,跳转到下下小节中提及。

BackgroundJobManager

BackgroundJob定义好了,在业务编排中期望在某些点加入一个Job,以便于在特定时间点执行Job。ABP封装了IBackgroundJobManager,用来管理Job入队编排操作。

204423373_5727e2d3-d995-4cd6-b50a-3e525cf9c026 在编排时Job加入到执行队列(默认是入库)。在加入时可以指定期望的Job,执行时间,执行优先级等参数。

204424544_b02277e3-b529-463b-b0b7-94a2d629e1e1 此处以ABP自身的BackgroundJobManager和基于Hangfire实现的 BackgroundJobManager解析。

DefaultBackgroundJobManager

ABP实现了默认的BackgroundJobManager(位于Volo.Abp.BackgroundJobs包),将待执行Job信息存储到库中。再由BackgroundJobWorker捞起符合条件的Job执行。

204425914_6da1fac4-8f77-4423-b97d-9facb17ae55d 其中数据来源依赖于IBackgroudJobStore,借助仓储从数据库存入Job数据,并返回JobId,方便于一些场景下跟踪Job执行进度。

[Dependency(ReplaceServices = true)]
public class DefaultBackgroundJobManager : IBackgroundJobManager, ITransientDependency
{
    //...
    public virtual async Task<string> EnqueueAsync<TArgs>(TArgs args, BackgroundJobPriority priority = BackgroundJobPriority.Normal, TimeSpan? delay = null)
    {
        var jobName = BackgroundJobNameAttribute.GetName<TArgs>();
        var jobId = await EnqueueAsync(jobName, args, priority, delay);
        return jobId.ToString();
    }


    protected virtual async Task<Guid> EnqueueAsync(string jobName, object args, BackgroundJobPriority priority = BackgroundJobPriority.Normal, TimeSpan? delay = null)
    {
        var jobInfo = new BackgroundJobInfo
        {
            Id = GuidGenerator.Create(),
            JobName = jobName,
            JobArgs = Serializer.Serialize(args),
            Priority = priority,
            CreationTime = Clock.Now,
            NextTryTime = Clock.Now
        };


        if (delay.HasValue)
        {
            jobInfo.NextTryTime = Clock.Now.Add(delay.Value);
        }
         
        //保存JobInfo
        await Store.InsertAsync(jobInfo);


        return jobInfo.Id;
    }
}

HangfireBackgroundJobManager

位于Volo.Abp.BackgroundJobs.HangFire包中,依赖Hangfire来管理触发时机和调用。

[Dependency(ReplaceServices = true)]
public class HangfireBackgroundJobManager : IBackgroundJobManager, ITransientDependency
{
    public virtual Task<string> EnqueueAsync<TArgs>(TArgs args, BackgroundJobPriority priority = BackgroundJobPriority.Normal, TimeSpan? delay = null)
    {
        return Task.FromResult(delay.HasValue
            ? BackgroundJob.Schedule<HangfireJobExecutionAdapter<TArgs>>(
                adapter => adapter.ExecuteAsync(args),
                delay.Value
            )
            : BackgroundJob.Enqueue<HangfireJobExecutionAdapter<TArgs>>(
                adapter => adapter.ExecuteAsync(args)
            ));
    }
}

该部分类图简要如下 204427296_c9564d1e-4a28-4e0a-88d7-3accde181a42 将期望执行的Job及参数移交到Hangfire的BackgroundJob类中,该类负责管理时间和触发时机,而具体的执行动作由HangfireBackgroundJobExecutionAdapter来执行,该类承担是否能够执行Job以及调用IBackgroundJobExecuter发起执行。

HangfireBackgroundJobManager->Hangfire.BackgroundJob->HangfireBackgroundJobExecutionAdapter->IBackgroundJobExecuter->BackgroundJob

对于Quartz和RabbitMQ流程上类似都是将Job移交到第三方组件管理,但具体的执行又会被回调到IBackgroundJobExecuter中。

AbpBackgroundJobsHangfireModule

值得注意,在AbpBackgroundJobsHangfireModule中也有一个类似HangfireBackgroundWorker一样创建null的BackgroundJobServer的机制,保持所有的Job都可以注册,编排期望的Job,但是在执行时都受阻,并不会真正的执行。

public class AbpBackgroundJobsHangfireModule : AbpModule
{
    public override void OnPreApplicationInitialization(ApplicationInitializationContext context)
    {
        // BackgroundJob开关关闭后,默认使用一个null的Hangfire.BackgroundJobServer
        var options = context.ServiceProvider.GetRequiredService<IOptions<AbpBackgroundJobOptions>>().Value;
        if (!options.IsJobExecutionEnabled)
        {
            var hangfireOptions = context.ServiceProvider.GetRequiredService<IOptions<AbpHangfireOptions>>().Value;
            hangfireOptions.BackgroundJobServerFactory = CreateOnlyEnqueueJobServer;
        }
    }


    //当开关关闭时,返回null
    private BackgroundJobServer CreateOnlyEnqueueJobServer(IServiceProvider serviceProvider)
    {
        serviceProvider.GetRequiredService<JobStorage>();
        return null;
    }
}

BackgroundJob触发执行

当期望执行时间到达,需要借助一些方式收集到符合条件的Job并执行,使用不同的第三方组件有不同的管理调度策略,默认ABP的是BackgroundJobWorker,对于Hangfire的则是Hangfire.Background来管理,Quartz使用Schedule,RabbitMQ则依靠延时队列,但最终都是调用IBackgroundJobExecuter来执行Job。此处解析ABP的默认BackgroundJobWorker和Hangfire回调过程。

204428543_f1f86fc4-79ef-4839-b705-8e06875a1618

BackgroundJobWorker

对应于ABP的DefaultBackgroundJobManager,ABP提供了BackgroundJobManager,位于Volo.Abp.BackgroundJobs包中,基于BackgroundWorker,周期性的获取符合执行条件的Jobs。

204429888_69287dfe-8e2b-42eb-b594-fa244e6e7048 在AbpBackgroundJobWorkerOptions中默认的执行周期JobPollPeriod为5秒,如有需求可以更改值。

[DependsOn(typeof(AbpBackgroundJobsModule))]
public class MyModule : AbpModule
{
    public override void ConfigureServices(ServiceConfigurationContext context)
    {
        Configure<AbpBackgroundJobWorkerOptions>(options =>
        {
            options.DefaultTimeout = 864000; //10 days (as seconds)
        });
    }
}

在BackgroundJobWorker中,通过DI获取到Store,查询到符合条件的Jobs,转交给BackgroundJobExecuter执行,如有失败,则换时间重试。执行顺利,则由Store再移除Job记录。

public class BackgroundJobWorker : AsyncPeriodicBackgroundWorkerBase, IBackgroundJobWorker
{
    protected AbpBackgroundJobOptions JobOptions { get; }
    protected AbpBackgroundJobWorkerOptions WorkerOptions { get; }
    protected IAbpDistributedLock DistributedLock { get; }


    //...
    
    protected override async Task DoWorkAsync(PeriodicBackgroundWorkerContext workerContext)
    {
        //...
        var store = workerContext.ServiceProvider.GetRequiredService<IBackgroundJobStore>();
        var waitingJobs = await store.GetWaitingJobsAsync(WorkerOptions.MaxJobFetchCount);


        var jobExecuter = workerContext.ServiceProvider.GetRequiredService<IBackgroundJobExecuter>();
        var clock = workerContext.ServiceProvider.GetRequiredService<IClock>();
        var serializer = workerContext.ServiceProvider.GetRequiredService<IBackgroundJobSerializer>();


        foreach (var jobInfo in waitingJobs)
        {
            jobInfo.TryCount++;
            jobInfo.LastTryTime = clock.Now;


            var jobConfiguration = JobOptions.GetJob(jobInfo.JobName);
            var jobArgs = serializer.Deserialize(jobInfo.JobArgs, jobConfiguration.ArgsType);
            var context = new JobExecutionContext(
                workerContext.ServiceProvider,
                jobConfiguration.JobType,
                jobArgs);


            await jobExecuter.ExecuteAsync(context);
            await store.DeleteAsync(jobInfo.Id);
        }
        //...
    }


    //...
}

在模块中,当BackgroundJob开关开启了,默认注册IBackgroundJobWorker。如果想要自定义,可以替换BackgroundJobWorker,再加入自定义的BackgroundJobWorker,但不用在注册。

public class AbpBackgroundJobsModule : AbpModule
{
    public async override Task OnApplicationInitializationAsync(ApplicationInitializationContext context)
    {
        var options = context.ServiceProvider.GetRequiredService<IOptions<AbpBackgroundJobOptions>>().Value;
        if (options.IsJobExecutionEnabled)
        {
            await context.AddBackgroundWorkerAsync<IBackgroundJobWorker>();
        }
    }


     //...
}

HangfireJobWorker

ABP中并无HangfireJobWorker,仅仅是我抽离出的一个概念,对应平级的BackgroundJobWorker,对于Hangfire其自身的BackgroundJob管理着所有的Job执行,如有符合条件的,Hangfire内部会筛选找到并调用执行,仍在HangfireBackgroundJobManager中,BackgroundJob.Schedule处的回调便是Hangfire中有符合条件的Job后的执行动作对应于BackgroundJobWorker的DoWork。

public class HangfireBackgroundJobManager : IBackgroundJobManager, ITransientDependency
{
    public virtual Task<string> EnqueueAsync<TArgs>(TArgs args, BackgroundJobPriority priority = BackgroundJobPriority.Normal,
        TimeSpan? delay = null)
    {
        return Task.FromResult(delay.HasValue
            ? BackgroundJob.Schedule<HangfireJobExecutionAdapter<TArgs>>(
                adapter => adapter.ExecuteAsync(args),
                delay.Value
            )
            : BackgroundJob.Enqueue<HangfireJobExecutionAdapter<TArgs>>(
                adapter => adapter.ExecuteAsync(args)
            ));
    }
}

对应于Quartz和RabbitMQ过程类似,此处不再提及。

扩展

BackgroundJob中提供了很多扩展点

  1. 可以参照IBackgroundJob或IAsyncBackgroundJob实现自定义的BackgroundJob,但需要同样扩展IBackgroundJobExecuter的实现以支持自定义的BackgroundJob类型。

  2. 对应于IBackgroundJobManager可以实现自定义的BackgroundJobManager,可以参照已有的实现,来管理入队策略或更换存储源来保存期望执行的Job信息。

  3. 对应于IBackgroundJobWorker,同样可以参照BackgroundJobWorker,可以实现自定义的BackgroundJobWorker。

  4. 对于IBackgroundJobStore,默认提供了内存中和保存到数据库中两种,也可以进行扩展,实现想要的保存方式,比如保存到Redis,文件等场景。

2024-04-05,望技术有成后能回来看见自己的脚步。