sunpengfei
2025-08-22 f7441ac8e0ef8778c4271d4ffec890a7e39bd3ab
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
using Furion.EventBus;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
 
namespace FlexJobApi.Core
{
    public sealed class RabbitMQEventSourceStorer : IEventSourceStorer, IDisposable
    {
        /// <summary>
        /// 内存通道事件源存储器
        /// </summary>
        private readonly Channel<IEventSource> _channel;
 
        /// <summary>
        /// 通道对象
        /// </summary>
        private readonly IModel _model;
 
        /// <summary>
        /// 连接对象
        /// </summary>
        private readonly IConnection _connection;
 
        /// <summary>
        /// 路由键
        /// </summary>
        private readonly string _routeKey;
 
        /// <summary>
        /// 构造函数
        /// </summary>
        /// <param name="factory">连接工厂</param>
        /// <param name="routeKey">路由键</param>
        /// <param name="capacity">存储器最多能够处理多少消息,超过该容量进入等待写入</param>
        public RabbitMQEventSourceStorer(ConnectionFactory factory, string routeKey, int capacity)
        {
            // 配置通道,设置超出默认容量后进入等待
            var boundedChannelOptions = new BoundedChannelOptions(capacity)
            {
                FullMode = BoundedChannelFullMode.Wait
            };
 
            // 创建有限容量通道
            _channel = Channel.CreateBounded<IEventSource>(boundedChannelOptions);
 
            // 创建连接
            _connection = factory.CreateConnection();
            _routeKey = routeKey;
 
            // 创建通道
            _model = _connection.CreateModel();
 
            // 声明路由队列
            _model.QueueDeclare(routeKey, false, false, false, null);
 
            // 创建消息订阅者
            var consumer = new EventingBasicConsumer(_model);
 
            // 订阅消息并写入内存 Channel
            consumer.Received += (ch, ea) =>
            {
                // 读取原始消息
                var stringEventSource = Encoding.UTF8.GetString(ea.Body.ToArray());
 
                // 转换为 IEventSource,这里可以选择自己喜欢的序列化工具,如果自定义了 EventSource,注意属性是可读可写
                var eventSource = JsonSerializer.Deserialize<ChannelEventSource>(stringEventSource);
 
                // 写入内存管道存储器
                Task.Run(async () =>
                {
                    await _channel.Writer.WriteAsync(eventSource);
                });
 
                // 确认该消息已被消费
                _model.BasicAck(ea.DeliveryTag, false);
            };
 
            // 启动消费者 设置为手动应答消息
            _model.BasicConsume(routeKey, false, consumer);
        }
 
        /// <summary>
        /// 将事件源写入存储器
        /// </summary>
        /// <param name="eventSource">事件源对象</param>
        /// <param name="cancellationToken">取消任务 Token</param>
        /// <returns><see cref="ValueTask"/></returns>
        public async ValueTask WriteAsync(IEventSource eventSource, CancellationToken cancellationToken)
        {
            // 空检查
            if (eventSource == default)
            {
                throw new ArgumentNullException(nameof(eventSource));
            }
 
            // 这里判断是否是 ChannelEventSource 或者 自定义的 EventSource
            if (eventSource is ChannelEventSource source)
            {
                // 序列化,这里可以选择自己喜欢的序列化工具
                var data = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(source));
 
                // 发布
                _model.BasicPublish("", _routeKey, null, data);
            }
            else
            {
                // 这里处理动态订阅问题
                await _channel.Writer.WriteAsync(eventSource, cancellationToken);
            }
        }
 
        /// <summary>
        /// 从存储器中读取一条事件源
        /// </summary>
        /// <param name="cancellationToken">取消任务 Token</param>
        /// <returns>事件源对象</returns>
        public async ValueTask<IEventSource> ReadAsync(CancellationToken cancellationToken)
        {
            // 读取一条事件源
            var eventSource = await _channel.Reader.ReadAsync(cancellationToken);
            return eventSource;
        }
 
        /// <summary>
        /// 释放非托管资源
        /// </summary>
        public void Dispose()
        {
            _model.Dispose();
            _connection.Dispose();
        }
    }
}