using Furion;
|
using Furion.DatabaseAccessor;
|
using Furion.Schedule;
|
using Mapster;
|
using Microsoft.EntityFrameworkCore;
|
using Microsoft.Extensions.DependencyInjection;
|
using System;
|
using System.Collections.Generic;
|
using System.Linq;
|
using System.Text;
|
using System.Threading;
|
using System.Threading.Tasks;
|
|
namespace FlexJobApi.Core
|
{
|
/// <summary>
|
/// 作业持久化(数据库)
|
/// </summary>
|
public class DbJobPersistence : IJobPersistence, IDisposable
|
{
|
private readonly IServiceScope _serviceScope;
|
private readonly IRepository<ScheduleJobDetail> repScheduleJobDetail;
|
private readonly IRepository<ScheduleJobTrigger> repScheduleJobTrigger;
|
private readonly IRepository<ScheduleJobTriggerTimeline, LogDbContextLocator> repScheduleJobTriggerTimeline;
|
|
public DbJobPersistence(IServiceScopeFactory scopeFactory)
|
{
|
_serviceScope = scopeFactory.CreateScope();
|
var services = _serviceScope.ServiceProvider;
|
|
repScheduleJobDetail = services.GetService<IRepository<ScheduleJobDetail>>();
|
repScheduleJobTrigger = services.GetService<IRepository<ScheduleJobTrigger>>();
|
repScheduleJobTriggerTimeline = services.GetService<IRepository<ScheduleJobTriggerTimeline, LogDbContextLocator>>();
|
}
|
|
/// <summary>
|
/// 作业调度服务启动时
|
/// </summary>
|
/// <returns></returns>
|
public async Task<IEnumerable<SchedulerBuilder>> PreloadAsync(CancellationToken stoppingToken) // Furion 4.9.1.59 之前为 public IEnumerable<SchedulerBuilder> Preload()
|
{
|
var schedulers = new List<SchedulerBuilder>();
|
var jobDetails = await repScheduleJobDetail.AsQueryable().AsNoTracking().ToListAsync();
|
var jobTriggers = await repScheduleJobTrigger.AsQueryable().AsNoTracking().ToListAsync();
|
foreach (var jobDetail in jobDetails)
|
{
|
var triggers = jobTriggers.Where(it => it.JobId == jobDetail.JobId)
|
.Select(it => TriggerBuilder.From(it.ToJson()))
|
.ToArray();
|
var jobBuilder = JobBuilder.From(jobDetail.ToJson());
|
var scheduler = SchedulerBuilder.Create(jobBuilder, triggers);
|
schedulers.Add(scheduler);
|
}
|
return schedulers;
|
}
|
|
public Task<SchedulerBuilder> OnLoadingAsync(SchedulerBuilder builder, CancellationToken stoppingToken)
|
{
|
// 标记从其他地方更新,比如数据库
|
return Task.FromResult(builder);
|
}
|
|
public async Task OnChangedAsync(PersistenceContext context)
|
{
|
switch (context.Behavior)
|
{
|
case PersistenceBehavior.Appended:
|
var insertEntity = new ScheduleJobDetail();
|
context.JobDetail.Adapt(insertEntity);
|
await repScheduleJobDetail.InsertNowAsync(insertEntity);
|
break;
|
case PersistenceBehavior.Updated:
|
var updateEntity = await repScheduleJobDetail.AsQueryable().FirstOrDefaultAsync(it => it.JobId == context.JobId);
|
if (updateEntity != null)
|
{
|
context.JobDetail.Adapt(updateEntity);
|
await repScheduleJobDetail.UpdateNowAsync(updateEntity);
|
}
|
break;
|
case PersistenceBehavior.Removed:
|
var deleteEntity = await repScheduleJobDetail.AsQueryable().FirstOrDefaultAsync(it => it.JobId == context.JobId);
|
if (deleteEntity != null)
|
{
|
await repScheduleJobDetail.DeleteNowAsync(deleteEntity);
|
}
|
break;
|
default:
|
break;
|
}
|
}
|
|
public async Task OnTriggerChangedAsync(PersistenceTriggerContext context)
|
{
|
switch (context.Behavior)
|
{
|
case PersistenceBehavior.Appended:
|
var insertEntity = new ScheduleJobTrigger();
|
context.Trigger.Adapt(insertEntity);
|
await repScheduleJobTrigger.InsertNowAsync(insertEntity);
|
break;
|
case PersistenceBehavior.Updated:
|
var updateEntity = await repScheduleJobTrigger.AsQueryable().FirstOrDefaultAsync(it => it.JobId == context.JobId && it.TriggerId == context.TriggerId);
|
if (updateEntity != null)
|
{
|
context.Trigger.Adapt(updateEntity);
|
await repScheduleJobTrigger.UpdateNowAsync(updateEntity);
|
}
|
break;
|
case PersistenceBehavior.Removed:
|
var deleteEntity = await repScheduleJobTrigger.AsQueryable().FirstOrDefaultAsync(it => it.JobId == context.JobId && it.TriggerId == context.TriggerId);
|
if (deleteEntity != null)
|
{
|
await repScheduleJobTrigger.DeleteNowAsync(deleteEntity);
|
}
|
break;
|
default:
|
break;
|
}
|
}
|
|
public async Task OnExecutionRecordAsync(PersistenceExecutionRecordContext context)
|
{
|
var entity = new ScheduleJobTriggerTimeline();
|
context.Timeline.Adapt(entity);
|
await repScheduleJobTriggerTimeline.InsertNowAsync(entity);
|
}
|
|
public void Dispose()
|
{
|
_serviceScope?.Dispose();
|
}
|
}
|
}
|