sunpengfei
2025-08-29 7c901453d4f4a6971986f63bdaeed85914f9e536
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
using Furion;
using Furion.DatabaseAccessor;
using Furion.Schedule;
using Mapster;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
 
namespace FlexJobApi.Core
{
    /// <summary>
    /// 作业持久化(数据库)
    /// </summary>
    public class DbJobPersistence : IJobPersistence, IDisposable
    {
        private readonly IServiceScope _serviceScope;
        private readonly IRepository<ScheduleJobDetail> repScheduleJobDetail;
        private readonly IRepository<ScheduleJobTrigger> repScheduleJobTrigger;
        private readonly IRepository<ScheduleJobTriggerTimeline, LogDbContextLocator> repScheduleJobTriggerTimeline;
 
        public DbJobPersistence(IServiceScopeFactory scopeFactory)
        {
            _serviceScope = scopeFactory.CreateScope();
            var services = _serviceScope.ServiceProvider;
 
            repScheduleJobDetail = services.GetService<IRepository<ScheduleJobDetail>>();
            repScheduleJobTrigger = services.GetService<IRepository<ScheduleJobTrigger>>();
            repScheduleJobTriggerTimeline = services.GetService<IRepository<ScheduleJobTriggerTimeline, LogDbContextLocator>>();
        }
 
        /// <summary>
        /// 作业调度服务启动时
        /// </summary>
        /// <returns></returns>
        public Task<IEnumerable<SchedulerBuilder>> PreloadAsync(CancellationToken stoppingToken)
        {
            return Task.FromResult(App.EffectiveTypes.ScanToBuilders().AsEnumerable());
        }
 
        public Task<SchedulerBuilder> OnLoadingAsync(SchedulerBuilder builder, CancellationToken stoppingToken)
        {
            // 标记从其他地方更新,比如数据库
            return Task.FromResult(builder);
        }
 
        public async Task OnChangedAsync(PersistenceContext context)
        {
            switch (context.Behavior)
            {
                case PersistenceBehavior.Appended:
                    var insertEntity = new ScheduleJobDetail();
                    context.JobDetail.Adapt(insertEntity);
                    await repScheduleJobDetail.InsertNowAsync(insertEntity);
                    break;
                case PersistenceBehavior.Updated:
                    var updateEntity = await repScheduleJobDetail.AsQueryable().FirstOrDefaultAsync(it => it.JobId == context.JobId);
                    if (updateEntity != null)
                    {
                        context.JobDetail.Adapt(updateEntity);
                        await repScheduleJobDetail.UpdateNowAsync(updateEntity);
                    }
                    break;
                case PersistenceBehavior.Removed:
                    var deleteEntity = await repScheduleJobDetail.AsQueryable().FirstOrDefaultAsync(it => it.JobId == context.JobId);
                    if (deleteEntity != null)
                    {
                        await repScheduleJobDetail.DeleteNowAsync(deleteEntity);
                    }
                    break;
                default:
                    break;
            }
        }
 
        public async Task OnTriggerChangedAsync(PersistenceTriggerContext context)
        {
            switch (context.Behavior)
            {
                case PersistenceBehavior.Appended:
                    var insertEntity = new ScheduleJobTrigger();
                    context.Trigger.Adapt(insertEntity);
                    await repScheduleJobTrigger.InsertNowAsync(insertEntity);
                    break;
                case PersistenceBehavior.Updated:
                    var updateEntity = await repScheduleJobTrigger.AsQueryable().FirstOrDefaultAsync(it => it.JobId == context.JobId && it.TriggerId == context.TriggerId);
                    if (updateEntity != null)
                    {
                        context.Trigger.Adapt(updateEntity);
                        await repScheduleJobTrigger.UpdateNowAsync(updateEntity);
                    }
                    break;
                case PersistenceBehavior.Removed:
                    var deleteEntity = await repScheduleJobTrigger.AsQueryable().FirstOrDefaultAsync(it => it.JobId == context.JobId && it.TriggerId == context.TriggerId);
                    if (deleteEntity != null)
                    {
                        await repScheduleJobTrigger.DeleteNowAsync(deleteEntity);
                    }
                    break;
                default:
                    break;
            }
        }
 
        public async Task OnExecutionRecordAsync(PersistenceExecutionRecordContext context)
        {
            var entity = new ScheduleJobTriggerTimeline();
            context.Timeline.Adapt(entity);
            await repScheduleJobTriggerTimeline.InsertNowAsync(entity);
        }
 
        public void Dispose()
        {
            _serviceScope?.Dispose();
        }
    }
}