using Furion;
using Furion.DatabaseAccessor;
using Furion.Schedule;
using Mapster;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace FlexJobApi.Core
{
///
/// 作业持久化(数据库)
///
public class DbJobPersistence : IJobPersistence, IDisposable
{
private readonly IServiceScope _serviceScope;
private readonly IRepository repScheduleJobDetail;
private readonly IRepository repScheduleJobTrigger;
private readonly IRepository repScheduleJobTriggerTimeline;
public DbJobPersistence(IServiceScopeFactory scopeFactory)
{
_serviceScope = scopeFactory.CreateScope();
var services = _serviceScope.ServiceProvider;
repScheduleJobDetail = services.GetService>();
repScheduleJobTrigger = services.GetService>();
repScheduleJobTriggerTimeline = services.GetService>();
}
///
/// 作业调度服务启动时
///
///
public Task> PreloadAsync(CancellationToken stoppingToken)
{
return Task.FromResult(App.EffectiveTypes.ScanToBuilders().AsEnumerable());
}
public Task OnLoadingAsync(SchedulerBuilder builder, CancellationToken stoppingToken)
{
// 标记从其他地方更新,比如数据库
return Task.FromResult(builder);
}
public async Task OnChangedAsync(PersistenceContext context)
{
switch (context.Behavior)
{
case PersistenceBehavior.Appended:
var insertEntity = new ScheduleJobDetail();
context.JobDetail.Adapt(insertEntity);
await repScheduleJobDetail.InsertNowAsync(insertEntity);
break;
case PersistenceBehavior.Updated:
var updateEntity = await repScheduleJobDetail.AsQueryable().FirstOrDefaultAsync(it => it.JobId == context.JobId);
if (updateEntity != null)
{
context.JobDetail.Adapt(updateEntity);
await repScheduleJobDetail.UpdateNowAsync(updateEntity);
}
break;
case PersistenceBehavior.Removed:
var deleteEntity = await repScheduleJobDetail.AsQueryable().FirstOrDefaultAsync(it => it.JobId == context.JobId);
if (deleteEntity != null)
{
await repScheduleJobDetail.DeleteNowAsync(deleteEntity);
}
break;
default:
break;
}
}
public async Task OnTriggerChangedAsync(PersistenceTriggerContext context)
{
switch (context.Behavior)
{
case PersistenceBehavior.Appended:
var insertEntity = new ScheduleJobTrigger();
context.Trigger.Adapt(insertEntity);
await repScheduleJobTrigger.InsertNowAsync(insertEntity);
break;
case PersistenceBehavior.Updated:
var updateEntity = await repScheduleJobTrigger.AsQueryable().FirstOrDefaultAsync(it => it.JobId == context.JobId && it.TriggerId == context.TriggerId);
if (updateEntity != null)
{
context.Trigger.Adapt(updateEntity);
await repScheduleJobTrigger.UpdateNowAsync(updateEntity);
}
break;
case PersistenceBehavior.Removed:
var deleteEntity = await repScheduleJobTrigger.AsQueryable().FirstOrDefaultAsync(it => it.JobId == context.JobId && it.TriggerId == context.TriggerId);
if (deleteEntity != null)
{
await repScheduleJobTrigger.DeleteNowAsync(deleteEntity);
}
break;
default:
break;
}
}
public async Task OnExecutionRecordAsync(PersistenceExecutionRecordContext context)
{
var entity = new ScheduleJobTriggerTimeline();
context.Timeline.Adapt(entity);
await repScheduleJobTriggerTimeline.InsertNowAsync(entity);
}
public void Dispose()
{
_serviceScope?.Dispose();
}
}
}