ABP Framework-BackgroundWorker源码解析

目录

https://abp.io/docs/6.0/Background-Workers

Version

6.0.3

Package

Volo.Abp.BackgroundWorkers.Hangfire //依赖Hangfire和Volo.Abp.BackgroundWorkers
Volo.Abp.BackgroundWorkers.Quartz //依赖Quartz和Volo.Abp.BackgroundWorkers
Volo.Abp.BackgroundWorkers

BackgroundWorker

和BackgroundJob不同,Worker更多在后台定期的工作,Job更多的是特定时间执行。

204253905_3e0c1e93-37b0-4f66-a375-d348c5289ffe 在源码中,IBackgroundWorker作为抽象,承担BackgroundWorkerManager和具体的BackgroundWork执行间的衔接,并且确定所有的Worker都为单例。

public interface IBackgroundWorker : IRunnable, ISingletonDependency
{
}

BackgroundWorkerBase

在BackgroundWorkerBase也没有实际的处理,只是封装了下Logger服务和ServiceProvider,以便于在单例的Worker中使用IOC服务。

public abstract class BackgroundWorkerBase : IBackgroundWorker
{
    public IAbpLazyServiceProvider LazyServiceProvider { get; set; }
    public IServiceProvider ServiceProvider { get; set; }
    protected ILoggerFactory LoggerFactory => LazyServiceProvider.LazyGetRequiredService<ILoggerFactory>();
    protected ILogger Logger => LazyServiceProvider.LazyGetService<ILogger>(provider => LoggerFactory?.CreateLogger(GetType().FullName) ?? NullLogger.Instance);


    public virtual Task StartAsync()
    {
        Logger.LogDebug("Started background worker: " + ToString());
        return Task.CompletedTask;
    }


    public virtual Task StopAsync()
    {
        Logger.LogDebug("Stopped background worker: " + ToString());
        return Task.CompletedTask;
    }


    public override string ToString()
    {
        return GetType().FullName;
    }
}

当项目中需要定义Worker时,直接继承BackgroundWorkerBase,这类Worker在应用程序启动时调用Start,应用关闭时调用Stop,使用场景较少。

public class MyWorker : BackgroundWorkerBase
{
    public override Task StartAsync(CancellationToken cancellationToken = default)
    {
        //...
    }


    public override Task StopAsync(CancellationToken cancellationToken = default)
    {
        //...
    }
}

PeriodicBackgroundWorkerBase

对于PeriodicBackgroundWorkerBase和AsyncPeriodicBackgroundWorkerBase,两者都是在BackgroundWorkerBase基础上省略了StartAsync和StopAsync方法,只需要关注执行任务。

public abstract class PeriodicBackgroundWorkerBase : BackgroundWorkerBase
{
    protected IServiceScopeFactory ServiceScopeFactory { get; }
    protected AbpTimer Timer { get; }


    protected PeriodicBackgroundWorkerBase(AbpTimer timer, IServiceScopeFactory serviceScopeFactory)
    {
        ServiceScopeFactory = serviceScopeFactory;
        Timer = timer;
        Timer.Elapsed += Timer_Elapsed;
    }


    public override async Task StartAsync(CancellationToken cancellationToken = default)
    {
        await base.StartAsync(cancellationToken);
        Timer.Start(cancellationToken);
    }


    public override async Task StopAsync(CancellationToken cancellationToken = default)
    {
        Timer.Stop(cancellationToken);
        await base.StopAsync(cancellationToken);
    }


    private void Timer_Elapsed(object sender, System.EventArgs e)
    {
        using (var scope = ServiceScopeFactory.CreateScope())
        {
            try
            {
                DoWork(new PeriodicBackgroundWorkerContext(scope.ServiceProvider));
            }
            catch (Exception ex)
            {
                var exceptionNotifier = scope.ServiceProvider.GetRequiredService<IExceptionNotifier>();
                AsyncHelper.RunSync(() => exceptionNotifier.NotifyAsync(new ExceptionNotificationContext(ex)));


                Logger.LogException(ex);
            }
        }
    }


    protected abstract void DoWork(PeriodicBackgroundWorkerContext workerContext);
}

在确定间隔执行时间的周期性Worker中,直接实现DoWork方法完成业务流程即可。这种在项目中使用更为常见。

public class PassiveUserCheckerWorker : AsyncPeriodicBackgroundWorkerBase
{
    public PassiveUserCheckerWorker(AbpAsyncTimer timer, IServiceScopeFactory serviceScopeFactory) : base(timer, serviceScopeFactory)
    {
        Timer.Period = 600000; //10 minutes
    }


    protected async override Task DoWorkAsync(PeriodicBackgroundWorkerContext workerContext)
    {
        Logger.LogInformation("Starting: Setting status of inactive users...");
        // ...
        Logger.LogInformation("Completed: Setting status of inactive users...");
    }
}

注册BackgroundWorker

所有自定义的BackgroundWorker需要注册才能生效,参照文档中使用AddBackgroundWorker方法

[DependsOn(typeof(AbpBackgroundWorkersModule))]
public class MyModule : AbpModule
{
    public override async Task OnApplicationInitializationAsync(
        ApplicationInitializationContext context)
    {
        await context.AddBackgroundWorkerAsync<PassiveUserCheckerWorker>();
    }
}

又或者手动添加,最终所有的IBackgroundWorker进入到IBackgroundWorkerManager的BackgroundWorkers属性中,后续小节中提及。

await context.ServiceProvider
    .GetRequiredService<IBackgroundWorkerManager>()
    .AddAsync(
        context
            .ServiceProvider
            .GetRequiredService<PassiveUserCheckerWorker>()
    );

从源码中查看AddBackgroundWorker,实则也是手动添加的过程,只是包装了一下。

public static class BackgroundWorkersApplicationInitializationContextExtensions
{
    public async static Task<ApplicationInitializationContext> AddBackgroundWorkerAsync<TWorker>([NotNull] this ApplicationInitializationContext context, CancellationToken cancellationToken = default)
        where TWorker : IBackgroundWorker
    {
        Check.NotNull(context, nameof(context));
        await context.AddBackgroundWorkerAsync(typeof(TWorker), cancellationToken: cancellationToken);
        return context;
    }


    public async static Task<ApplicationInitializationContext> AddBackgroundWorkerAsync([NotNull] this ApplicationInitializationContext context, [NotNull] Type workerType, CancellationToken cancellationToken = default)
    {
        Check.NotNull(context, nameof(context));
        Check.NotNull(workerType, nameof(workerType));


        if (!workerType.IsAssignableTo<IBackgroundWorker>())
        {
            throw new AbpException($"Given type ({workerType.AssemblyQualifiedName}) must implement the {typeof(IBackgroundWorker).AssemblyQualifiedName} interface, but it doesn't!");
        }


        await context.ServiceProvider
            .GetRequiredService<IBackgroundWorkerManager>()
            .AddAsync((IBackgroundWorker)context.ServiceProvider.GetRequiredService(workerType), cancellationToken);


        return context;
    }
}

对于注册所在位置,需要提及一下AbpModule的代码

public abstract class AbpModule
{
    //...
    
    public virtual Task OnApplicationInitializationAsync(ApplicationInitializationContext context)
    {
        OnApplicationInitialization(context);
        return Task.CompletedTask;
    }


    public virtual void OnApplicationInitialization(ApplicationInitializationContext context)
    {
    }


    //...
}

需要注意两个方法的依赖关系,当参照文档注册BackgroundWorker时,需要注意同样要调用OnApplicationInitialization(如果后台任务和业务应用服务为同一个服务内时)。

[DependsOn(typeof(AbpBackgroundWorkersModule))]
public class MyModule : AbpModule
{
    public override void OnApplicationInitialization(ApplicationInitializationContext context)
    {
        var app = context.GetApplicationBuilder();
        var env = context.GetEnvironment();


        app.UseAbpExceptionHandling();
        app.UseAbpRequestLocalization();
        app.UseCorrelationId();
        app.UseStaticFiles();
        app.UseRouting();
        app.UseCors();
        app.UseAuthentication();
        app.UseAbpOpenIddictValidation();
        app.UseMultiTenancy();
        app.UseAuthorization();
        app.UseSwagger(env);
        app.UseAuditing();
        app.UseAbpSerilogEnrichers();
        app.UseConfiguredEndpoints();
    }


    public override async Task OnApplicationInitializationAsync(ApplicationInitializationContext context)
    {
        OnApplicationInitialization(context);
        await context.AddBackgroundWorkerAsync<PassiveUserCheckerWorker>();
    }
}

此处记录一下我当时出现的错误,我在OnApplicationInitializationAsync中直接参照文档添加了Worker,类似如下代码结构,项目跑起来,Worker正常执行,结果Swagger打不开,所有请求都404,后续问题排查就是在于少了OnApplicationInitialization的调用。

[DependsOn(typeof(AbpBackgroundWorkersModule))]
public class MyModule : AbpModule
{
    public override void OnApplicationInitialization(ApplicationInitializationContext context)
    {
        var app = context.GetApplicationBuilder();
        // ...
        app.UseConfiguredEndpoints();
    }


    public override async Task OnApplicationInitializationAsync(ApplicationInitializationContext context)
    {
        await context.AddBackgroundWorkerAsync<PassiveUserCheckerWorker>();
    }
}

BackgroundWorkerManager

所有注册好的BackgroundWorker由BackgroundWorkerManager来统一管理启动和停止。

204256264_29b97e59-5d01-4746-be78-ebb19bdbd627 在注册时所有的BackgroundWorker会加入到BackgroundWorkerManager的BackgroundWorkers集合中。

public class BackgroundWorkerManager : IBackgroundWorkerManager, ISingletonDependency, IDisposable
{
    protected bool IsRunning { get; private set; }
    private readonly List<IBackgroundWorker> _backgroundWorkers;


    public BackgroundWorkerManager()
    {
        _backgroundWorkers = new List<IBackgroundWorker>();
    }


    public virtual async Task AddAsync(IBackgroundWorker worker, CancellationToken cancellationToken = default)
    {
        _backgroundWorkers.Add(worker);
        if (IsRunning)
        {
            await worker.StartAsync(cancellationToken);
        }
    }


    public virtual async Task StartAsync(CancellationToken cancellationToken = default)
    {
        IsRunning = true;
        foreach (var worker in _backgroundWorkers)
        {
            await worker.StartAsync(cancellationToken);
        }
    }


    public virtual async Task StopAsync(CancellationToken cancellationToken = default)
    {
        IsRunning = false;
        foreach (var worker in _backgroundWorkers)
        {
            await worker.StopAsync(cancellationToken);
        }
    }
}

需要额外注意下IsRunning参数,在AbpBackgroundWorkersModule或者一些需要执行Worker的模块中,调用了StartAsync方法,则IsRunning则为true,后续在通过BackgroundWorkerManager.AddAsync方法加入都会直接运行Worker。

public class AbpBackgroundWorkersModule : AbpModule
{
    public async override Task OnApplicationInitializationAsync(ApplicationInitializationContext context)
    {
        var options = context.ServiceProvider.GetRequiredService<IOptions<AbpBackgroundWorkerOptions>>().Value;
        if (options.IsEnabled)
        {
            await context.ServiceProvider
                .GetRequiredService<IBackgroundWorkerManager>()
                .StartAsync();
        }
    }


    public async override Task OnApplicationShutdownAsync(ApplicationShutdownContext context)
    {
        var options = context.ServiceProvider.GetRequiredService<IOptions<AbpBackgroundWorkerOptions>>().Value;
        if (options.IsEnabled)
        {
            await context.ServiceProvider
                .GetRequiredService<IBackgroundWorkerManager>()
                .StopAsync();
        }
    }
}

也就从中知道了,所有的Worker并不是等到项目启动那一刻,全部开始执行,而是边加入同时执行,甚至于说,在项目运行中,还可以加入新的Worker并且立即执行。

public class TheFirstWorker : AsyncPeriodicBackgroundWorkerBase
{
    public TheFirstWorker(AbpAsyncTimer timer, IServiceScopeFactory serviceScopeFactory) : base(timer,serviceScopeFactory)
    {
        Timer.Period = 15000;
    }


    private bool IsExecuted { get; set; }


    protected async override Task DoWorkAsync(PeriodicBackgroundWorkerContext workerContext)
    {
        Logger.LogError($"\r\n------TheFirstWorker--{DateTime.Now}");
        if (IsExecuted)
        {
           return;
        }
        
        IsExecuted = true;
        var backgroundWorkerManager = workerContext.ServiceProvider.GetRequiredService<IBackgroundWorkerManager>();
        await backgroundWorkerManager.AddAsync(workerContext.ServiceProvider.GetRequiredService<TheSecondWorker>());
    }
}


public class TheSecondWorker : AsyncPeriodicBackgroundWorkerBase
{
    public TheSecondWorker(AbpAsyncTimer timer, IServiceScopeFactory serviceScopeFactory) : base(timer,serviceScopeFactory)
    {
        Timer.Period = 15000;
    }


    protected async override Task DoWorkAsync(PeriodicBackgroundWorkerContext workerContext)
    {
        Logger.LogError($"\r\n------TheSecondWorker--{DateTime.Now}");
    }
}

AbpBackgroundWorkersModule作为最早设定IsRunning参数是否执行,当IsEnabled设定为false时,则所有的Worker都只是加入到了BackgroundWorkers集合中,都不运行。

public class AbpBackgroundWorkerOptions
{
    public bool IsEnabled { get; set; } = true;
}


Configure<AbpBackgroundWorkerOptions>(options =>
{
    options.IsEnabled = false;
});

Hangfire&QuartzBackgroundWorker

简要提及下Hangfire和Quartz的Worker,两者都借助于IBackgroundWorker简介操作各自的HangfireBackgroundWorker和QuartzBackgroundWorker,其内部则分别使用Hangfire和Quartz相关功能,借助BackgroundWorker承载内部实现。

204257598_d8f965f5-64c9-4efe-83af-a5bb0b92b0dd 以Hangfire模块为例,Quartz过程类似。

[DependsOn(
    typeof(AbpBackgroundWorkersModule),
    typeof(AbpHangfireModule))]
public class AbpBackgroundWorkersHangfireModule : AbpModule
{
    public override void ConfigureServices(ServiceConfigurationContext context)
    {
        context.Services.AddSingleton(typeof(HangfirePeriodicBackgroundWorkerAdapter<>));
    }
    
    public async override Task OnPreApplicationInitializationAsync(ApplicationInitializationContext context)
    {
        var options = context.ServiceProvider.GetRequiredService<IOptions<AbpBackgroundWorkerOptions>>().Value;
        if (!options.IsEnabled)
        {
            var hangfireOptions = context.ServiceProvider.GetRequiredService<IOptions<AbpHangfireOptions>>().Value;
            hangfireOptions.BackgroundJobServerFactory = CreateOnlyEnqueueJobServer;
        }
        
        await context.ServiceProvider
            .GetRequiredService<IBackgroundWorkerManager>()
            .StartAsync(); 
    }


    public override void OnPreApplicationInitialization(ApplicationInitializationContext context)
    {
        AsyncHelper.RunSync(() => OnPreApplicationInitializationAsync(context));
    }
    
    private BackgroundJobServer CreateOnlyEnqueueJobServer(IServiceProvider serviceProvider)
    {
        serviceProvider.GetRequiredService<JobStorage>();
        return null;
    }
}

如果使用了Hangfire,则IBackgroundWorkerManager的实例将由HangfireBackgroundWorkerManager接管。 204258949_b51eeeda-d8a6-401f-97e2-c8e0ae0e0f83 进入到Hangfire中,由Hangfire的BackgroudJobServer来管理所有的Worker。

[Dependency(ReplaceServices = true)]
public class HangfireBackgroundWorkerManager : BackgroundWorkerManager, ISingletonDependency
{
    protected AbpHangfireBackgroundJobServer BackgroundJobServer { get; set; }
    protected IServiceProvider ServiceProvider { get; }


    public async override Task StartAsync(CancellationToken cancellationToken = default)
    {
        // 启用Hanfire自身的BackgroundJobServer
        BackgroundJobServer = ServiceProvider.GetRequiredService<AbpHangfireBackgroundJobServer>();


        // 运行已有的Worker
        await base.StartAsync(cancellationToken);
    }
}

而在Worker上,不同的Worker类型则选择使用不同的处理方式

[Dependency(ReplaceServices = true)]
public class HangfireBackgroundWorkerManager : BackgroundWorkerManager, ISingletonDependency
{
    //...
   
    public async override Task AddAsync(IBackgroundWorker worker, CancellationToken cancellationToken = default)
    {
        switch (worker)
        {
            case IHangfireBackgroundWorker hangfireBackgroundWorker:
            {
                // 由Hangfire的RecurringJob来接管触发时间和执行动作
                var unProxyWorker = ProxyHelper.UnProxy(hangfireBackgroundWorker);
                if (hangfireBackgroundWorker.RecurringJobId.IsNullOrWhiteSpace())
                {
                    RecurringJob.AddOrUpdate(
                        () => ((IHangfireBackgroundWorker)unProxyWorker).DoWorkAsync(cancellationToken),
                        hangfireBackgroundWorker.CronExpression, hangfireBackgroundWorker.TimeZone,
                        hangfireBackgroundWorker.Queue);
                }
                else
                {
                    RecurringJob.AddOrUpdate(hangfireBackgroundWorker.RecurringJobId,
                        () => ((IHangfireBackgroundWorker)unProxyWorker).DoWorkAsync(cancellationToken),
                        hangfireBackgroundWorker.CronExpression, hangfireBackgroundWorker.TimeZone,
                        hangfireBackgroundWorker.Queue);
                }
                break;
            }
            case AsyncPeriodicBackgroundWorkerBase or PeriodicBackgroundWorkerBase:
            {
                // 转换执行时间到Cron表达式
                var timer = worker.GetType()
                    .GetProperty("Timer", BindingFlags.Instance | BindingFlags.NonPublic)?.GetValue(worker);


                var period = worker is AsyncPeriodicBackgroundWorkerBase ? ((AbpAsyncTimer)timer)?.Period : ((AbpTimer)timer)?.Period;
                if (period == null)
                {
                    return;
                }


                // Hangfire周期性BackgroundWorker来代接管执行动作,包一层
                var adapterType = typeof(HangfirePeriodicBackgroundWorkerAdapter<>).MakeGenericType(ProxyHelper.GetUnProxiedType(worker));
                var workerAdapter = Activator.CreateInstance(adapterType) as IHangfireBackgroundWorker;
                
                // 同样由Hangfire的RecurringJob来接管触发时间和执行动作
                RecurringJob.AddOrUpdate(() => workerAdapter.DoWorkAsync(cancellationToken), GetCron(period.Value), workerAdapter.TimeZone, workerAdapter.Queue);


                break;
            }
            default:
                await base.AddAsync(worker, cancellationToken);
                break;
        }
    }
}
  • 如果是IHangfireBackgroundWorker的实例,则转换为Hangfire的RecurringJob定期执行,触发时间到了,则进入到HangfireBackgroundWorker中执行代码,整个过程交由Hangfire来决定什么时候触发,原有执行代码的DoWork方法不变。

  • 如果是周期性的BackgroundWorker,与上相同,将其执行周期取出,执行动作则包一层给HangfirePeriodicBackgroundWorkerAdapter来代替执行,内部还是调用原有的周期性BackgroundWorker,整个执行过程还是由RecurringJob来管理。RecurringJob->HangfirePeriodicBackgroundWorkerAdapter<>->PeriodBackgroundWorker

  • 如果是常规的BackgroundWorker,则还是走默认的处理过程。

注意事项

如上当Hangfire接管BackgroundWorker时,如果BackgroundWorker开关默认关闭,则Hangfire会创建一个null的BackgroundServer。

public class AbpBackgroundWorkersHangfireModule : AbpModule
{
    public async override Task OnPreApplicationInitializationAsync(ApplicationInitializationContext context)
    {
        var options = context.ServiceProvider.GetRequiredService<IOptions<AbpBackgroundWorkerOptions>>().Value;
        if (!options.IsEnabled)
        {
            var hangfireOptions = context.ServiceProvider.GetRequiredService<IOptions<AbpHangfireOptions>>().Value;
            hangfireOptions.BackgroundJobServerFactory = CreateOnlyEnqueueJobServer;
        }


        //...
    }


    // 该方法返回了一个null的BackgroundJobServer, 所有执行动作不会真实生效
    private BackgroundJobServer CreateOnlyEnqueueJobServer(IServiceProvider serviceProvider)
    {
        serviceProvider.GetRequiredService<JobStorage>();
        return null;
    }
}

所有被Hangfire BackgroundWorker包装的Worker都可在Hangfire面板中可见,但执行无效,断点调试不会进入到实际的Dowork方法中。 204259930_ac3fb845-e7a4-45c8-a51f-b05976e2fdcc 但有一类Worker例外,尽管开关关闭,仍会在项目启动时候执行,便是常规BackgroundWorker,因为Hangfire接管,原先BackgroundWorkers直接管控所有Worker在开关关闭后不执行。

public class AbpBackgroundWorkersModule : AbpModule
{
    public async override Task OnApplicationInitializationAsync(ApplicationInitializationContext context)
    {
        var options = context.ServiceProvider.GetRequiredService<IOptions<AbpBackgroundWorkerOptions>>().Value;
        if (options.IsEnabled)
        {
            await context.ServiceProvider
                .GetRequiredService<IBackgroundWorkerManager>()
                .StartAsync();
        }
    }
}

但AbpBackgroundWorkersHangfireModule中会执行到StartAsync使得IsRunning设置为true。(后续abp版本中出现了变更,修复了该问题)

public class AbpBackgroundWorkersHangfireModule : AbpModule
{
    //...


    public async override Task OnPreApplicationInitializationAsync(ApplicationInitializationContext context)
    {
        // 此处会令IsRunning为true,所有Worker都Start
        await context.ServiceProvider
            .GetRequiredService<IBackgroundWorkerManager>()
            .StartAsync(); 
    }


    //...
}

当新建一个继承自非Hangfire的且非周期性的BackgroundWorkerBase能够正常执行在开关关闭下的Hangfire正常执行(原先BackgroundWorker全局控制IsRunning为false,现在Hangfire模块下令其为true)。 204301232_970498d8-584e-4fdc-944a-cecceacebb46

该版本源码在后续版本中存在变动,所述内容可能不符合新版本需求。

2024-03-17,望技术有成后能回来看见自己的脚步。