ABP Framework-BackgroundWorker源码解析
目录
https://abp.io/docs/6.0/Background-Workers
Version
6.0.3
Package
Volo.Abp.BackgroundWorkers.Hangfire //依赖Hangfire和Volo.Abp.BackgroundWorkers
Volo.Abp.BackgroundWorkers.Quartz //依赖Quartz和Volo.Abp.BackgroundWorkers
Volo.Abp.BackgroundWorkers
BackgroundWorker
和BackgroundJob不同,Worker更多在后台定期的工作,Job更多的是特定时间执行。
在源码中,IBackgroundWorker作为抽象,承担BackgroundWorkerManager和具体的BackgroundWork执行间的衔接,并且确定所有的Worker都为单例。
public interface IBackgroundWorker : IRunnable, ISingletonDependency
{
}
BackgroundWorkerBase
在BackgroundWorkerBase也没有实际的处理,只是封装了下Logger服务和ServiceProvider,以便于在单例的Worker中使用IOC服务。
public abstract class BackgroundWorkerBase : IBackgroundWorker
{
public IAbpLazyServiceProvider LazyServiceProvider { get; set; }
public IServiceProvider ServiceProvider { get; set; }
protected ILoggerFactory LoggerFactory => LazyServiceProvider.LazyGetRequiredService<ILoggerFactory>();
protected ILogger Logger => LazyServiceProvider.LazyGetService<ILogger>(provider => LoggerFactory?.CreateLogger(GetType().FullName) ?? NullLogger.Instance);
public virtual Task StartAsync()
{
Logger.LogDebug("Started background worker: " + ToString());
return Task.CompletedTask;
}
public virtual Task StopAsync()
{
Logger.LogDebug("Stopped background worker: " + ToString());
return Task.CompletedTask;
}
public override string ToString()
{
return GetType().FullName;
}
}
当项目中需要定义Worker时,直接继承BackgroundWorkerBase,这类Worker在应用程序启动时调用Start,应用关闭时调用Stop,使用场景较少。
public class MyWorker : BackgroundWorkerBase
{
public override Task StartAsync(CancellationToken cancellationToken = default)
{
//...
}
public override Task StopAsync(CancellationToken cancellationToken = default)
{
//...
}
}
PeriodicBackgroundWorkerBase
对于PeriodicBackgroundWorkerBase和AsyncPeriodicBackgroundWorkerBase,两者都是在BackgroundWorkerBase基础上省略了StartAsync和StopAsync方法,只需要关注执行任务。
public abstract class PeriodicBackgroundWorkerBase : BackgroundWorkerBase
{
protected IServiceScopeFactory ServiceScopeFactory { get; }
protected AbpTimer Timer { get; }
protected PeriodicBackgroundWorkerBase(AbpTimer timer, IServiceScopeFactory serviceScopeFactory)
{
ServiceScopeFactory = serviceScopeFactory;
Timer = timer;
Timer.Elapsed += Timer_Elapsed;
}
public override async Task StartAsync(CancellationToken cancellationToken = default)
{
await base.StartAsync(cancellationToken);
Timer.Start(cancellationToken);
}
public override async Task StopAsync(CancellationToken cancellationToken = default)
{
Timer.Stop(cancellationToken);
await base.StopAsync(cancellationToken);
}
private void Timer_Elapsed(object sender, System.EventArgs e)
{
using (var scope = ServiceScopeFactory.CreateScope())
{
try
{
DoWork(new PeriodicBackgroundWorkerContext(scope.ServiceProvider));
}
catch (Exception ex)
{
var exceptionNotifier = scope.ServiceProvider.GetRequiredService<IExceptionNotifier>();
AsyncHelper.RunSync(() => exceptionNotifier.NotifyAsync(new ExceptionNotificationContext(ex)));
Logger.LogException(ex);
}
}
}
protected abstract void DoWork(PeriodicBackgroundWorkerContext workerContext);
}
在确定间隔执行时间的周期性Worker中,直接实现DoWork方法完成业务流程即可。这种在项目中使用更为常见。
public class PassiveUserCheckerWorker : AsyncPeriodicBackgroundWorkerBase
{
public PassiveUserCheckerWorker(AbpAsyncTimer timer, IServiceScopeFactory serviceScopeFactory) : base(timer, serviceScopeFactory)
{
Timer.Period = 600000; //10 minutes
}
protected async override Task DoWorkAsync(PeriodicBackgroundWorkerContext workerContext)
{
Logger.LogInformation("Starting: Setting status of inactive users...");
// ...
Logger.LogInformation("Completed: Setting status of inactive users...");
}
}
注册BackgroundWorker
所有自定义的BackgroundWorker需要注册才能生效,参照文档中使用AddBackgroundWorker方法
[DependsOn(typeof(AbpBackgroundWorkersModule))]
public class MyModule : AbpModule
{
public override async Task OnApplicationInitializationAsync(
ApplicationInitializationContext context)
{
await context.AddBackgroundWorkerAsync<PassiveUserCheckerWorker>();
}
}
又或者手动添加,最终所有的IBackgroundWorker进入到IBackgroundWorkerManager的BackgroundWorkers属性中,后续小节中提及。
await context.ServiceProvider
.GetRequiredService<IBackgroundWorkerManager>()
.AddAsync(
context
.ServiceProvider
.GetRequiredService<PassiveUserCheckerWorker>()
);
从源码中查看AddBackgroundWorker,实则也是手动添加的过程,只是包装了一下。
public static class BackgroundWorkersApplicationInitializationContextExtensions
{
public async static Task<ApplicationInitializationContext> AddBackgroundWorkerAsync<TWorker>([NotNull] this ApplicationInitializationContext context, CancellationToken cancellationToken = default)
where TWorker : IBackgroundWorker
{
Check.NotNull(context, nameof(context));
await context.AddBackgroundWorkerAsync(typeof(TWorker), cancellationToken: cancellationToken);
return context;
}
public async static Task<ApplicationInitializationContext> AddBackgroundWorkerAsync([NotNull] this ApplicationInitializationContext context, [NotNull] Type workerType, CancellationToken cancellationToken = default)
{
Check.NotNull(context, nameof(context));
Check.NotNull(workerType, nameof(workerType));
if (!workerType.IsAssignableTo<IBackgroundWorker>())
{
throw new AbpException($"Given type ({workerType.AssemblyQualifiedName}) must implement the {typeof(IBackgroundWorker).AssemblyQualifiedName} interface, but it doesn't!");
}
await context.ServiceProvider
.GetRequiredService<IBackgroundWorkerManager>()
.AddAsync((IBackgroundWorker)context.ServiceProvider.GetRequiredService(workerType), cancellationToken);
return context;
}
}
对于注册所在位置,需要提及一下AbpModule的代码
public abstract class AbpModule
{
//...
public virtual Task OnApplicationInitializationAsync(ApplicationInitializationContext context)
{
OnApplicationInitialization(context);
return Task.CompletedTask;
}
public virtual void OnApplicationInitialization(ApplicationInitializationContext context)
{
}
//...
}
需要注意两个方法的依赖关系,当参照文档注册BackgroundWorker时,需要注意同样要调用OnApplicationInitialization(如果后台任务和业务应用服务为同一个服务内时)。
[DependsOn(typeof(AbpBackgroundWorkersModule))]
public class MyModule : AbpModule
{
public override void OnApplicationInitialization(ApplicationInitializationContext context)
{
var app = context.GetApplicationBuilder();
var env = context.GetEnvironment();
app.UseAbpExceptionHandling();
app.UseAbpRequestLocalization();
app.UseCorrelationId();
app.UseStaticFiles();
app.UseRouting();
app.UseCors();
app.UseAuthentication();
app.UseAbpOpenIddictValidation();
app.UseMultiTenancy();
app.UseAuthorization();
app.UseSwagger(env);
app.UseAuditing();
app.UseAbpSerilogEnrichers();
app.UseConfiguredEndpoints();
}
public override async Task OnApplicationInitializationAsync(ApplicationInitializationContext context)
{
OnApplicationInitialization(context);
await context.AddBackgroundWorkerAsync<PassiveUserCheckerWorker>();
}
}
此处记录一下我当时出现的错误,我在OnApplicationInitializationAsync中直接参照文档添加了Worker,类似如下代码结构,项目跑起来,Worker正常执行,结果Swagger打不开,所有请求都404,后续问题排查就是在于少了OnApplicationInitialization的调用。
[DependsOn(typeof(AbpBackgroundWorkersModule))]
public class MyModule : AbpModule
{
public override void OnApplicationInitialization(ApplicationInitializationContext context)
{
var app = context.GetApplicationBuilder();
// ...
app.UseConfiguredEndpoints();
}
public override async Task OnApplicationInitializationAsync(ApplicationInitializationContext context)
{
await context.AddBackgroundWorkerAsync<PassiveUserCheckerWorker>();
}
}
BackgroundWorkerManager
所有注册好的BackgroundWorker由BackgroundWorkerManager来统一管理启动和停止。
在注册时所有的BackgroundWorker会加入到BackgroundWorkerManager的BackgroundWorkers集合中。
public class BackgroundWorkerManager : IBackgroundWorkerManager, ISingletonDependency, IDisposable
{
protected bool IsRunning { get; private set; }
private readonly List<IBackgroundWorker> _backgroundWorkers;
public BackgroundWorkerManager()
{
_backgroundWorkers = new List<IBackgroundWorker>();
}
public virtual async Task AddAsync(IBackgroundWorker worker, CancellationToken cancellationToken = default)
{
_backgroundWorkers.Add(worker);
if (IsRunning)
{
await worker.StartAsync(cancellationToken);
}
}
public virtual async Task StartAsync(CancellationToken cancellationToken = default)
{
IsRunning = true;
foreach (var worker in _backgroundWorkers)
{
await worker.StartAsync(cancellationToken);
}
}
public virtual async Task StopAsync(CancellationToken cancellationToken = default)
{
IsRunning = false;
foreach (var worker in _backgroundWorkers)
{
await worker.StopAsync(cancellationToken);
}
}
}
需要额外注意下IsRunning参数,在AbpBackgroundWorkersModule或者一些需要执行Worker的模块中,调用了StartAsync方法,则IsRunning则为true,后续在通过BackgroundWorkerManager.AddAsync方法加入都会直接运行Worker。
public class AbpBackgroundWorkersModule : AbpModule
{
public async override Task OnApplicationInitializationAsync(ApplicationInitializationContext context)
{
var options = context.ServiceProvider.GetRequiredService<IOptions<AbpBackgroundWorkerOptions>>().Value;
if (options.IsEnabled)
{
await context.ServiceProvider
.GetRequiredService<IBackgroundWorkerManager>()
.StartAsync();
}
}
public async override Task OnApplicationShutdownAsync(ApplicationShutdownContext context)
{
var options = context.ServiceProvider.GetRequiredService<IOptions<AbpBackgroundWorkerOptions>>().Value;
if (options.IsEnabled)
{
await context.ServiceProvider
.GetRequiredService<IBackgroundWorkerManager>()
.StopAsync();
}
}
}
也就从中知道了,所有的Worker并不是等到项目启动那一刻,全部开始执行,而是边加入同时执行,甚至于说,在项目运行中,还可以加入新的Worker并且立即执行。
public class TheFirstWorker : AsyncPeriodicBackgroundWorkerBase
{
public TheFirstWorker(AbpAsyncTimer timer, IServiceScopeFactory serviceScopeFactory) : base(timer,serviceScopeFactory)
{
Timer.Period = 15000;
}
private bool IsExecuted { get; set; }
protected async override Task DoWorkAsync(PeriodicBackgroundWorkerContext workerContext)
{
Logger.LogError($"\r\n------TheFirstWorker--{DateTime.Now}");
if (IsExecuted)
{
return;
}
IsExecuted = true;
var backgroundWorkerManager = workerContext.ServiceProvider.GetRequiredService<IBackgroundWorkerManager>();
await backgroundWorkerManager.AddAsync(workerContext.ServiceProvider.GetRequiredService<TheSecondWorker>());
}
}
public class TheSecondWorker : AsyncPeriodicBackgroundWorkerBase
{
public TheSecondWorker(AbpAsyncTimer timer, IServiceScopeFactory serviceScopeFactory) : base(timer,serviceScopeFactory)
{
Timer.Period = 15000;
}
protected async override Task DoWorkAsync(PeriodicBackgroundWorkerContext workerContext)
{
Logger.LogError($"\r\n------TheSecondWorker--{DateTime.Now}");
}
}
AbpBackgroundWorkersModule作为最早设定IsRunning参数是否执行,当IsEnabled设定为false时,则所有的Worker都只是加入到了BackgroundWorkers集合中,都不运行。
public class AbpBackgroundWorkerOptions
{
public bool IsEnabled { get; set; } = true;
}
Configure<AbpBackgroundWorkerOptions>(options =>
{
options.IsEnabled = false;
});
Hangfire&QuartzBackgroundWorker
简要提及下Hangfire和Quartz的Worker,两者都借助于IBackgroundWorker简介操作各自的HangfireBackgroundWorker和QuartzBackgroundWorker,其内部则分别使用Hangfire和Quartz相关功能,借助BackgroundWorker承载内部实现。
以Hangfire模块为例,Quartz过程类似。
[DependsOn(
typeof(AbpBackgroundWorkersModule),
typeof(AbpHangfireModule))]
public class AbpBackgroundWorkersHangfireModule : AbpModule
{
public override void ConfigureServices(ServiceConfigurationContext context)
{
context.Services.AddSingleton(typeof(HangfirePeriodicBackgroundWorkerAdapter<>));
}
public async override Task OnPreApplicationInitializationAsync(ApplicationInitializationContext context)
{
var options = context.ServiceProvider.GetRequiredService<IOptions<AbpBackgroundWorkerOptions>>().Value;
if (!options.IsEnabled)
{
var hangfireOptions = context.ServiceProvider.GetRequiredService<IOptions<AbpHangfireOptions>>().Value;
hangfireOptions.BackgroundJobServerFactory = CreateOnlyEnqueueJobServer;
}
await context.ServiceProvider
.GetRequiredService<IBackgroundWorkerManager>()
.StartAsync();
}
public override void OnPreApplicationInitialization(ApplicationInitializationContext context)
{
AsyncHelper.RunSync(() => OnPreApplicationInitializationAsync(context));
}
private BackgroundJobServer CreateOnlyEnqueueJobServer(IServiceProvider serviceProvider)
{
serviceProvider.GetRequiredService<JobStorage>();
return null;
}
}
如果使用了Hangfire,则IBackgroundWorkerManager的实例将由HangfireBackgroundWorkerManager接管。
进入到Hangfire中,由Hangfire的BackgroudJobServer来管理所有的Worker。
[Dependency(ReplaceServices = true)]
public class HangfireBackgroundWorkerManager : BackgroundWorkerManager, ISingletonDependency
{
protected AbpHangfireBackgroundJobServer BackgroundJobServer { get; set; }
protected IServiceProvider ServiceProvider { get; }
public async override Task StartAsync(CancellationToken cancellationToken = default)
{
// 启用Hanfire自身的BackgroundJobServer
BackgroundJobServer = ServiceProvider.GetRequiredService<AbpHangfireBackgroundJobServer>();
// 运行已有的Worker
await base.StartAsync(cancellationToken);
}
}
而在Worker上,不同的Worker类型则选择使用不同的处理方式
[Dependency(ReplaceServices = true)]
public class HangfireBackgroundWorkerManager : BackgroundWorkerManager, ISingletonDependency
{
//...
public async override Task AddAsync(IBackgroundWorker worker, CancellationToken cancellationToken = default)
{
switch (worker)
{
case IHangfireBackgroundWorker hangfireBackgroundWorker:
{
// 由Hangfire的RecurringJob来接管触发时间和执行动作
var unProxyWorker = ProxyHelper.UnProxy(hangfireBackgroundWorker);
if (hangfireBackgroundWorker.RecurringJobId.IsNullOrWhiteSpace())
{
RecurringJob.AddOrUpdate(
() => ((IHangfireBackgroundWorker)unProxyWorker).DoWorkAsync(cancellationToken),
hangfireBackgroundWorker.CronExpression, hangfireBackgroundWorker.TimeZone,
hangfireBackgroundWorker.Queue);
}
else
{
RecurringJob.AddOrUpdate(hangfireBackgroundWorker.RecurringJobId,
() => ((IHangfireBackgroundWorker)unProxyWorker).DoWorkAsync(cancellationToken),
hangfireBackgroundWorker.CronExpression, hangfireBackgroundWorker.TimeZone,
hangfireBackgroundWorker.Queue);
}
break;
}
case AsyncPeriodicBackgroundWorkerBase or PeriodicBackgroundWorkerBase:
{
// 转换执行时间到Cron表达式
var timer = worker.GetType()
.GetProperty("Timer", BindingFlags.Instance | BindingFlags.NonPublic)?.GetValue(worker);
var period = worker is AsyncPeriodicBackgroundWorkerBase ? ((AbpAsyncTimer)timer)?.Period : ((AbpTimer)timer)?.Period;
if (period == null)
{
return;
}
// Hangfire周期性BackgroundWorker来代接管执行动作,包一层
var adapterType = typeof(HangfirePeriodicBackgroundWorkerAdapter<>).MakeGenericType(ProxyHelper.GetUnProxiedType(worker));
var workerAdapter = Activator.CreateInstance(adapterType) as IHangfireBackgroundWorker;
// 同样由Hangfire的RecurringJob来接管触发时间和执行动作
RecurringJob.AddOrUpdate(() => workerAdapter.DoWorkAsync(cancellationToken), GetCron(period.Value), workerAdapter.TimeZone, workerAdapter.Queue);
break;
}
default:
await base.AddAsync(worker, cancellationToken);
break;
}
}
}
如果是IHangfireBackgroundWorker的实例,则转换为Hangfire的RecurringJob定期执行,触发时间到了,则进入到HangfireBackgroundWorker中执行代码,整个过程交由Hangfire来决定什么时候触发,原有执行代码的DoWork方法不变。
如果是周期性的BackgroundWorker,与上相同,将其执行周期取出,执行动作则包一层给HangfirePeriodicBackgroundWorkerAdapter来代替执行,内部还是调用原有的周期性BackgroundWorker,整个执行过程还是由RecurringJob来管理。RecurringJob->HangfirePeriodicBackgroundWorkerAdapter<>->PeriodBackgroundWorker
如果是常规的BackgroundWorker,则还是走默认的处理过程。
注意事项
如上当Hangfire接管BackgroundWorker时,如果BackgroundWorker开关默认关闭,则Hangfire会创建一个null的BackgroundServer。
public class AbpBackgroundWorkersHangfireModule : AbpModule
{
public async override Task OnPreApplicationInitializationAsync(ApplicationInitializationContext context)
{
var options = context.ServiceProvider.GetRequiredService<IOptions<AbpBackgroundWorkerOptions>>().Value;
if (!options.IsEnabled)
{
var hangfireOptions = context.ServiceProvider.GetRequiredService<IOptions<AbpHangfireOptions>>().Value;
hangfireOptions.BackgroundJobServerFactory = CreateOnlyEnqueueJobServer;
}
//...
}
// 该方法返回了一个null的BackgroundJobServer, 所有执行动作不会真实生效
private BackgroundJobServer CreateOnlyEnqueueJobServer(IServiceProvider serviceProvider)
{
serviceProvider.GetRequiredService<JobStorage>();
return null;
}
}
所有被Hangfire BackgroundWorker包装的Worker都可在Hangfire面板中可见,但执行无效,断点调试不会进入到实际的Dowork方法中。
但有一类Worker例外,尽管开关关闭,仍会在项目启动时候执行,便是常规BackgroundWorker,因为Hangfire接管,原先BackgroundWorkers直接管控所有Worker在开关关闭后不执行。
public class AbpBackgroundWorkersModule : AbpModule
{
public async override Task OnApplicationInitializationAsync(ApplicationInitializationContext context)
{
var options = context.ServiceProvider.GetRequiredService<IOptions<AbpBackgroundWorkerOptions>>().Value;
if (options.IsEnabled)
{
await context.ServiceProvider
.GetRequiredService<IBackgroundWorkerManager>()
.StartAsync();
}
}
}
但AbpBackgroundWorkersHangfireModule中会执行到StartAsync使得IsRunning设置为true。(后续abp版本中出现了变更,修复了该问题)
public class AbpBackgroundWorkersHangfireModule : AbpModule
{
//...
public async override Task OnPreApplicationInitializationAsync(ApplicationInitializationContext context)
{
// 此处会令IsRunning为true,所有Worker都Start
await context.ServiceProvider
.GetRequiredService<IBackgroundWorkerManager>()
.StartAsync();
}
//...
}
当新建一个继承自非Hangfire的且非周期性的BackgroundWorkerBase能够正常执行在开关关闭下的Hangfire正常执行(原先BackgroundWorker全局控制IsRunning为false,现在Hangfire模块下令其为true)。
该版本源码在后续版本中存在变动,所述内容可能不符合新版本需求。
2024-03-17,望技术有成后能回来看见自己的脚步。