using Furion; using Furion.DatabaseAccessor; using Furion.Schedule; using Mapster; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.DependencyInjection; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace FlexJobApi.Core { /// /// 作业持久化(数据库) /// public class DbJobPersistence : IJobPersistence, IDisposable { private readonly IServiceScope _serviceScope; private readonly IRepository repScheduleJobDetail; private readonly IRepository repScheduleJobTrigger; private readonly IRepository repScheduleJobTriggerTimeline; public DbJobPersistence(IServiceScopeFactory scopeFactory) { _serviceScope = scopeFactory.CreateScope(); var services = _serviceScope.ServiceProvider; repScheduleJobDetail = services.GetService>(); repScheduleJobTrigger = services.GetService>(); repScheduleJobTriggerTimeline = services.GetService>(); } /// /// 作业调度服务启动时 /// /// public async Task> PreloadAsync(CancellationToken stoppingToken) // Furion 4.9.1.59 之前为 public IEnumerable Preload() { var schedulers = new List(); var jobDetails = await repScheduleJobDetail.AsQueryable().AsNoTracking().ToListAsync(); var jobTriggers = await repScheduleJobTrigger.AsQueryable().AsNoTracking().ToListAsync(); foreach (var jobDetail in jobDetails) { var triggers = jobTriggers.Where(it => it.JobId == jobDetail.JobId) .Select(it => TriggerBuilder.From(it.ToJson())) .ToArray(); var jobBuilder = JobBuilder.From(jobDetail.ToJson()); var scheduler = SchedulerBuilder.Create(jobBuilder, triggers); schedulers.Add(scheduler); } return schedulers; } public Task OnLoadingAsync(SchedulerBuilder builder, CancellationToken stoppingToken) { // 标记从其他地方更新,比如数据库 return Task.FromResult(builder); } public async Task OnChangedAsync(PersistenceContext context) { switch (context.Behavior) { case PersistenceBehavior.Appended: var insertEntity = new ScheduleJobDetail(); context.JobDetail.Adapt(insertEntity); await repScheduleJobDetail.InsertNowAsync(insertEntity); break; case PersistenceBehavior.Updated: var updateEntity = await repScheduleJobDetail.AsQueryable().FirstOrDefaultAsync(it => it.JobId == context.JobId); if (updateEntity != null) { context.JobDetail.Adapt(updateEntity); await repScheduleJobDetail.UpdateNowAsync(updateEntity); } break; case PersistenceBehavior.Removed: var deleteEntity = await repScheduleJobDetail.AsQueryable().FirstOrDefaultAsync(it => it.JobId == context.JobId); if (deleteEntity != null) { await repScheduleJobDetail.DeleteNowAsync(deleteEntity); } break; default: break; } } public async Task OnTriggerChangedAsync(PersistenceTriggerContext context) { switch (context.Behavior) { case PersistenceBehavior.Appended: var insertEntity = new ScheduleJobTrigger(); context.Trigger.Adapt(insertEntity); await repScheduleJobTrigger.InsertNowAsync(insertEntity); break; case PersistenceBehavior.Updated: var updateEntity = await repScheduleJobTrigger.AsQueryable().FirstOrDefaultAsync(it => it.JobId == context.JobId && it.TriggerId == context.TriggerId); if (updateEntity != null) { context.Trigger.Adapt(updateEntity); await repScheduleJobTrigger.UpdateNowAsync(updateEntity); } break; case PersistenceBehavior.Removed: var deleteEntity = await repScheduleJobTrigger.AsQueryable().FirstOrDefaultAsync(it => it.JobId == context.JobId && it.TriggerId == context.TriggerId); if (deleteEntity != null) { await repScheduleJobTrigger.DeleteNowAsync(deleteEntity); } break; default: break; } } public async Task OnExecutionRecordAsync(PersistenceExecutionRecordContext context) { var entity = new ScheduleJobTriggerTimeline(); context.Timeline.Adapt(entity); await repScheduleJobTriggerTimeline.InsertNowAsync(entity); } public void Dispose() { _serviceScope?.Dispose(); } } }