ABP Framework-BackgroundJob源码解析
目录
https://abp.io/docs/6.0/Background-Jobs
Version
6.0.3
Package
Volo.Abp.BackgroundJobs.HangFire //依赖Hangfire和Volo.Abp.BackgroundJobs.Abstractions
Volo.Abp.BackgroundJobs.Quartz //依赖Quartz和Volo.Abp.BackgroundJobs.Abstractions
Volo.Abp.BackgroundJobs.RabbitMQ //依赖RabbitMQ和Volo.Abp.BackgroundJobs.Abstractions
Volo.Abp.BackgroundJobs //依赖BackgroundWorker和Volo.Abp.BackgroundJobs.Abstractions,与上第三方组件平级
Volo.Abp.BackgroundJobs.Abstractions
//独立模块
Volo.Abp.BackgroundJobs.Domain
Volo.Abp.BackgroundJobs.Domain.Shared
Volo.Abp.BackgroundJobs.EntityFrameworkCore
Volo.Abp.BackgroundJobs.MongoDB
BackgroundJob
JobArgs
作为最小执行单元,使用时只需继承BackgroundJob或AsyncBackgroundJob。
public class EmailSendingJob : AsyncBackgroundJob<EmailSendingArgs>, ITransientDependency
{
private readonly IEmailSender _emailSender;
public EmailSendingJob(IEmailSender emailSender)
{
_emailSender = emailSender;
}
public override async Task ExecuteAsync(EmailSendingArgs args)
{
await _emailSender.SendAsync(args.EmailAddress, args.Subject, args.Body);
}
}
前提是需要一个JobArgs,来传递Job执行需要的一些参数。
public class EmailSendingArgs
{
public string EmailAddress { get; set; }
public string Subject { get; set; }
public string Body { get; set; }
}
文档中提到BackgroundJobName,在代码中体现为BackgroundJobNameAttribute。
实则是在当使用了RabbitMQ或者类似的第三方组件时才需要拿到名字作为队列名,如果在Hangfire或者Quartz或者ABP自身的BackgroundJobWorker管理,都无需要使用到BackgroundJobNameAttribute。当然如果加上了,也没有影响,只是加入Job时,会以加上的名字作为Job名,加入到执行队列中。以DefaultBackgroundJobManager为例
[Dependency(ReplaceServices = true)]
public class DefaultBackgroundJobManager : IBackgroundJobManager, ITransientDependency
{
//...
public virtual async Task<string> EnqueueAsync<TArgs>(TArgs args, BackgroundJobPriority priority = BackgroundJobPriority.Normal, TimeSpan? delay = null)
{
var jobName = BackgroundJobNameAttribute.GetName<TArgs>();
var jobId = await EnqueueAsync(jobName, args, priority, delay);
return jobId.ToString();
}
//...
}
当取JobArgs的名字时,优先从Attribute中取,如果没有则使用类名
public class BackgroundJobNameAttribute : Attribute, IBackgroundJobNameProvider
{
//...
public static string GetName([NotNull] Type jobArgsType)
{
Check.NotNull(jobArgsType, nameof(jobArgsType));
return jobArgsType.GetCustomAttributes(true)
.OfType<IBackgroundJobNameProvider>()
.FirstOrDefault()?.Name
?? jobArgsType.FullName;
}
}
BackgroundJob
回归到最小执行单元BackgroundJob,ABP中提供了两类抽象接口与抽象实现
BackgroundJob中并没有什么实质性内容,仅作为抽象封装。
public abstract class AsyncBackgroundJob<TArgs> : IAsyncBackgroundJob<TArgs>
{
public ILogger<AsyncBackgroundJob<TArgs>> Logger { get; set; }
protected AsyncBackgroundJob()
{
Logger = NullLogger<AsyncBackgroundJob<TArgs>>.Instance;
}
public abstract Task ExecuteAsync(TArgs args);
}
BackgroundJob注册
项目启动时,会扫描到所有继承自IBackgroundJob和IAsyncBackgroundJob的实现完成注册并将注册内容存储到AbpBackgroundJobOptions中。
[DependsOn(typeof(AbpJsonModule))]
public class AbpBackgroundJobsAbstractionsModule : AbpModule
{
public override void PreConfigureServices(ServiceConfigurationContext context)
{
RegisterJobs(context.Services);
}
private static void RegisterJobs(IServiceCollection services)
{
var jobTypes = new List<Type>();
// 扫描实现加入到JobTypes中
services.OnRegistred(context =>
{
if (ReflectionHelper.IsAssignableToGenericType(context.ImplementationType, typeof(IBackgroundJob<>)) ||
ReflectionHelper.IsAssignableToGenericType(context.ImplementationType, typeof(IAsyncBackgroundJob<>)))
{
jobTypes.Add(context.ImplementationType);
}
});
// 扫描到的结果转换存储到AbpBackgroundJobOptions中
services.Configure<AbpBackgroundJobOptions>(options =>
{
foreach (var jobType in jobTypes)
{
options.AddJob(jobType);
}
});
}
}
在Options中,一个属性是控制整个BackgroundJob是否启用,另外的则是使用字典存储所有注册的Job,简化代码如下。
public class AbpBackgroundJobOptions
{
private readonly Dictionary<Type, BackgroundJobConfiguration> _jobConfigurationsByArgsType;
public bool IsJobExecutionEnabled { get; set; } = true;
public BackgroundJobConfiguration GetJob(Type argsType)
{
return _jobConfigurationsByArgsType.GetOrDefault(argsType);
}
public void AddJob(BackgroundJobConfiguration jobConfiguration)
{
_jobConfigurationsByArgsType[jobConfiguration.ArgsType] = jobConfiguration;
}
}
此处依赖BackgroundJobConfiguration类,该类仅作为一个记录中转,记录Job本身的类型,JobArgs类型,Job名,对这三者信息的封装。
public class BackgroundJobConfiguration
{
public Type ArgsType { get; }
public Type JobType { get; }
public string JobName { get; }
public BackgroundJobConfiguration(Type jobType)
{
JobType = jobType;
//从JobType中分析出JobArgs的类型,取其泛型参数具体类型
ArgsType = BackgroundJobArgsHelper.GetJobArgsType(jobType);
JobName = BackgroundJobNameAttribute.GetName(ArgsType);
}
}
BackgroundJobExecuter
BackgroundJob类型上分为了两类,ABP代码中并不直接对这两类Job分别调用,而是再包一层,对这两类的调用抽象,得到一个唯一的操作入口。
ExecuteAsync方法简要如下代码,JobExecutionContext承担要执行的Job信息的承载,和BackgroundJobConfiguration的职责类似。其中完成按类别调用实际的Job,该方法支持override,如果想要自定义一个BackgroundJob类型,可以在此基础上参考。上层的Hangfire/Quartz/RabbitMQ/ABP-BackgroundJobWorker等都是基于IBackgroundJobExecuter来执行,因此此处扩展并不会对上层产生影响。
public class BackgroundJobExecuter : IBackgroundJobExecuter, ITransientDependency
{
//...
public virtual async Task ExecuteAsync(JobExecutionContext context)
{
// 从DI中获得Job实例
var job = context.ServiceProvider.GetService(context.JobType);
// 按照继承接口类型找到执行方法
var jobExecuteMethod = context.JobType.GetMethod(nameof(IBackgroundJob<object>.Execute)) ??
context.JobType.GetMethod(nameof(IAsyncBackgroundJob<object>.ExecuteAsync));
//...
try
{
// 分类执行对应方法
if (jobExecuteMethod.Name == nameof(IAsyncBackgroundJob<object>.ExecuteAsync))
{
await ((Task)jobExecuteMethod.Invoke(job, new[] { context.JobArgs }));
}
else
{
jobExecuteMethod.Invoke(job, new[] { context.JobArgs });
}
}
catch (Exception ex)
{
//...
// 异常转换为BackgroundJob独有异常,便于捕获方便后续Job重试
throw new BackgroundJobExecutionException("A background job execution is failed. See inner exception for details.", ex)
{
JobType = context.JobType.AssemblyQualifiedName,
JobArgs = context.JobArgs
};
}
}
}
对于上层调用方(Hangfire/Quartz/RabbitMQ/ABP-BackgroundJob)对BackgroundJobExecuter的具体使用,跳转到下下小节中提及。
BackgroundJobManager
BackgroundJob定义好了,在业务编排中期望在某些点加入一个Job,以便于在特定时间点执行Job。ABP封装了IBackgroundJobManager,用来管理Job入队编排操作。
在编排时Job加入到执行队列(默认是入库)。在加入时可以指定期望的Job,执行时间,执行优先级等参数。
此处以ABP自身的BackgroundJobManager和基于Hangfire实现的 BackgroundJobManager解析。
DefaultBackgroundJobManager
ABP实现了默认的BackgroundJobManager(位于Volo.Abp.BackgroundJobs包),将待执行Job信息存储到库中。再由BackgroundJobWorker捞起符合条件的Job执行。
其中数据来源依赖于IBackgroudJobStore,借助仓储从数据库存入Job数据,并返回JobId,方便于一些场景下跟踪Job执行进度。
[Dependency(ReplaceServices = true)]
public class DefaultBackgroundJobManager : IBackgroundJobManager, ITransientDependency
{
//...
public virtual async Task<string> EnqueueAsync<TArgs>(TArgs args, BackgroundJobPriority priority = BackgroundJobPriority.Normal, TimeSpan? delay = null)
{
var jobName = BackgroundJobNameAttribute.GetName<TArgs>();
var jobId = await EnqueueAsync(jobName, args, priority, delay);
return jobId.ToString();
}
protected virtual async Task<Guid> EnqueueAsync(string jobName, object args, BackgroundJobPriority priority = BackgroundJobPriority.Normal, TimeSpan? delay = null)
{
var jobInfo = new BackgroundJobInfo
{
Id = GuidGenerator.Create(),
JobName = jobName,
JobArgs = Serializer.Serialize(args),
Priority = priority,
CreationTime = Clock.Now,
NextTryTime = Clock.Now
};
if (delay.HasValue)
{
jobInfo.NextTryTime = Clock.Now.Add(delay.Value);
}
//保存JobInfo
await Store.InsertAsync(jobInfo);
return jobInfo.Id;
}
}
HangfireBackgroundJobManager
位于Volo.Abp.BackgroundJobs.HangFire包中,依赖Hangfire来管理触发时机和调用。
[Dependency(ReplaceServices = true)]
public class HangfireBackgroundJobManager : IBackgroundJobManager, ITransientDependency
{
public virtual Task<string> EnqueueAsync<TArgs>(TArgs args, BackgroundJobPriority priority = BackgroundJobPriority.Normal, TimeSpan? delay = null)
{
return Task.FromResult(delay.HasValue
? BackgroundJob.Schedule<HangfireJobExecutionAdapter<TArgs>>(
adapter => adapter.ExecuteAsync(args),
delay.Value
)
: BackgroundJob.Enqueue<HangfireJobExecutionAdapter<TArgs>>(
adapter => adapter.ExecuteAsync(args)
));
}
}
该部分类图简要如下
将期望执行的Job及参数移交到Hangfire的BackgroundJob类中,该类负责管理时间和触发时机,而具体的执行动作由HangfireBackgroundJobExecutionAdapter来执行,该类承担是否能够执行Job以及调用IBackgroundJobExecuter发起执行。
HangfireBackgroundJobManager->Hangfire.BackgroundJob->HangfireBackgroundJobExecutionAdapter->IBackgroundJobExecuter->BackgroundJob
对于Quartz和RabbitMQ流程上类似都是将Job移交到第三方组件管理,但具体的执行又会被回调到IBackgroundJobExecuter中。
AbpBackgroundJobsHangfireModule
值得注意,在AbpBackgroundJobsHangfireModule中也有一个类似HangfireBackgroundWorker一样创建null的BackgroundJobServer的机制,保持所有的Job都可以注册,编排期望的Job,但是在执行时都受阻,并不会真正的执行。
public class AbpBackgroundJobsHangfireModule : AbpModule
{
public override void OnPreApplicationInitialization(ApplicationInitializationContext context)
{
// BackgroundJob开关关闭后,默认使用一个null的Hangfire.BackgroundJobServer
var options = context.ServiceProvider.GetRequiredService<IOptions<AbpBackgroundJobOptions>>().Value;
if (!options.IsJobExecutionEnabled)
{
var hangfireOptions = context.ServiceProvider.GetRequiredService<IOptions<AbpHangfireOptions>>().Value;
hangfireOptions.BackgroundJobServerFactory = CreateOnlyEnqueueJobServer;
}
}
//当开关关闭时,返回null
private BackgroundJobServer CreateOnlyEnqueueJobServer(IServiceProvider serviceProvider)
{
serviceProvider.GetRequiredService<JobStorage>();
return null;
}
}
BackgroundJob触发执行
当期望执行时间到达,需要借助一些方式收集到符合条件的Job并执行,使用不同的第三方组件有不同的管理调度策略,默认ABP的是BackgroundJobWorker,对于Hangfire的则是Hangfire.Background来管理,Quartz使用Schedule,RabbitMQ则依靠延时队列,但最终都是调用IBackgroundJobExecuter来执行Job。此处解析ABP的默认BackgroundJobWorker和Hangfire回调过程。
BackgroundJobWorker
对应于ABP的DefaultBackgroundJobManager,ABP提供了BackgroundJobManager,位于Volo.Abp.BackgroundJobs包中,基于BackgroundWorker,周期性的获取符合执行条件的Jobs。
在AbpBackgroundJobWorkerOptions中默认的执行周期JobPollPeriod为5秒,如有需求可以更改值。
[DependsOn(typeof(AbpBackgroundJobsModule))]
public class MyModule : AbpModule
{
public override void ConfigureServices(ServiceConfigurationContext context)
{
Configure<AbpBackgroundJobWorkerOptions>(options =>
{
options.DefaultTimeout = 864000; //10 days (as seconds)
});
}
}
在BackgroundJobWorker中,通过DI获取到Store,查询到符合条件的Jobs,转交给BackgroundJobExecuter执行,如有失败,则换时间重试。执行顺利,则由Store再移除Job记录。
public class BackgroundJobWorker : AsyncPeriodicBackgroundWorkerBase, IBackgroundJobWorker
{
protected AbpBackgroundJobOptions JobOptions { get; }
protected AbpBackgroundJobWorkerOptions WorkerOptions { get; }
protected IAbpDistributedLock DistributedLock { get; }
//...
protected override async Task DoWorkAsync(PeriodicBackgroundWorkerContext workerContext)
{
//...
var store = workerContext.ServiceProvider.GetRequiredService<IBackgroundJobStore>();
var waitingJobs = await store.GetWaitingJobsAsync(WorkerOptions.MaxJobFetchCount);
var jobExecuter = workerContext.ServiceProvider.GetRequiredService<IBackgroundJobExecuter>();
var clock = workerContext.ServiceProvider.GetRequiredService<IClock>();
var serializer = workerContext.ServiceProvider.GetRequiredService<IBackgroundJobSerializer>();
foreach (var jobInfo in waitingJobs)
{
jobInfo.TryCount++;
jobInfo.LastTryTime = clock.Now;
var jobConfiguration = JobOptions.GetJob(jobInfo.JobName);
var jobArgs = serializer.Deserialize(jobInfo.JobArgs, jobConfiguration.ArgsType);
var context = new JobExecutionContext(
workerContext.ServiceProvider,
jobConfiguration.JobType,
jobArgs);
await jobExecuter.ExecuteAsync(context);
await store.DeleteAsync(jobInfo.Id);
}
//...
}
//...
}
在模块中,当BackgroundJob开关开启了,默认注册IBackgroundJobWorker。如果想要自定义,可以替换BackgroundJobWorker,再加入自定义的BackgroundJobWorker,但不用在注册。
public class AbpBackgroundJobsModule : AbpModule
{
public async override Task OnApplicationInitializationAsync(ApplicationInitializationContext context)
{
var options = context.ServiceProvider.GetRequiredService<IOptions<AbpBackgroundJobOptions>>().Value;
if (options.IsJobExecutionEnabled)
{
await context.AddBackgroundWorkerAsync<IBackgroundJobWorker>();
}
}
//...
}
HangfireJobWorker
ABP中并无HangfireJobWorker,仅仅是我抽离出的一个概念,对应平级的BackgroundJobWorker,对于Hangfire其自身的BackgroundJob管理着所有的Job执行,如有符合条件的,Hangfire内部会筛选找到并调用执行,仍在HangfireBackgroundJobManager中,BackgroundJob.Schedule处的回调便是Hangfire中有符合条件的Job后的执行动作对应于BackgroundJobWorker的DoWork。
public class HangfireBackgroundJobManager : IBackgroundJobManager, ITransientDependency
{
public virtual Task<string> EnqueueAsync<TArgs>(TArgs args, BackgroundJobPriority priority = BackgroundJobPriority.Normal,
TimeSpan? delay = null)
{
return Task.FromResult(delay.HasValue
? BackgroundJob.Schedule<HangfireJobExecutionAdapter<TArgs>>(
adapter => adapter.ExecuteAsync(args),
delay.Value
)
: BackgroundJob.Enqueue<HangfireJobExecutionAdapter<TArgs>>(
adapter => adapter.ExecuteAsync(args)
));
}
}
对应于Quartz和RabbitMQ过程类似,此处不再提及。
扩展
BackgroundJob中提供了很多扩展点
可以参照IBackgroundJob或IAsyncBackgroundJob实现自定义的BackgroundJob,但需要同样扩展IBackgroundJobExecuter的实现以支持自定义的BackgroundJob类型。
对应于IBackgroundJobManager可以实现自定义的BackgroundJobManager,可以参照已有的实现,来管理入队策略或更换存储源来保存期望执行的Job信息。
对应于IBackgroundJobWorker,同样可以参照BackgroundJobWorker,可以实现自定义的BackgroundJobWorker。
对于IBackgroundJobStore,默认提供了内存中和保存到数据库中两种,也可以进行扩展,实现想要的保存方式,比如保存到Redis,文件等场景。
2024-04-05,望技术有成后能回来看见自己的脚步。