22. 事件总线
22.1 快速入门
- 定义事件订阅者
ToDoEventSubscriber
:
// 实现 IEventSubscriber 接口
public class ToDoEventSubscriber : IEventSubscriber
{
private readonly ILogger<ToDoEventSubscriber> _logger;
public ToDoEventSubscriber(ILogger<ToDoEventSubscriber> logger)
{
_logger = logger;
}
[EventSubscribe("ToDo:Create")]
public async Task CreateToDo(EventHandlerExecutingContext context)
{
var todo = context.Source;
_logger.LogInformation("创建一个 ToDo:{Name}", todo.Payload);
await Task.CompletedTask;
}
// 支持多个
[EventSubscribe("ToDo:Create")]
[EventSubscribe("ToDo:Update")]
public async Task CreateOrUpdateToDo(EventHandlerExecutingContext context)
{
var todo = context.Source;
_logger.LogInformation("创建或更新一个 ToDo:{Name}", todo.Payload);
await Task.CompletedTask;
}
}
- 创建控制器
ToDoController
,依赖注入IEventPublisher
服务:
public class ToDoController : ControllerBase
{
// 依赖注入事件发布者 IEventPublisher
private readonly IEventPublisher _eventPublisher;
public ToDoController(IEventPublisher eventPublisher)
{
_eventPublisher = eventPublisher;
}
// 发布 ToDo:Create 消息
public async Task CreateDoTo(string name)
{
await _eventPublisher.PublishAsync(new ChannelEventSource("ToDo:Create", name));
}
}
- 在
Startup.cs
注册EventBus
服务:
// 注册 EventBus 服务
services.AddEventBus(buidler =>
{
// 注册 ToDo 事件订阅者
buidler.AddSubscriber<ToDoEventSubscriber>();
// 批量注册事件订阅者
builder.AddSubscribers(ass1, ass2, ....);
});
懒人提醒
在 Furion
中可以不用通过 buidler.AddSubscriber<T>()
方式一一注册,只需要实现 ISingleton
接口即可,如:
public class ToDoEventSubscriber : IEventSubscriber, ISingleton
{
}
这样就无需写 代码了。buidler.AddSubscriber<ToDoEventSubscriber>();
- 运行项目:
info: Jaina.Samples.ToDoEventSubscriber[0]
创建一个 ToDo:Jaina
22.2 自定义事件源
Furion
使用 IEventSource
作为消息载体,任何实现该接口的类都可以充当消息载体。
如需自定义,只需实现 IEventSource
接口即可:
public class ToDoEventSource : IEventSource
{
public ToDoEventSource(string eventId, string todoName)
{
EventId = eventId;
ToDoName = todoName;
}
// 自定义属性
public string ToDoName { get; }
/// <summary>
/// 事件 Id
/// </summary>
public string EventId { get; }
/// <summary>
/// 事件承载(携带)数据
/// </summary>
public object Payload { get; }
/// <summary>
/// 取消任务 Token
/// </summary>
/// <remarks>用于取消本次消息处理</remarks>
public CancellationToken CancellationToken { get; }
/// <summary>
/// 事件创建时间
/// </summary>
public DateTime CreatedTime { get; } = DateTime.UtcNow;
}
使用:
await _eventPublisher.PublishAsync(new ToDoEventSource ("ToDo:Create", "我的 ToDo Name"));
22.3 自定义事件源存储器
Fruion
默认采用 Channel
作为事件源 IEventSource
存储器,开发者可以使用任何消息队列组件进行替换,如 Kafka、RabbitMQ、ActiveMQ
等,也可以使用部分数据库 Redis、SQL Server、MySql
实现。
如需自定义,只需实现 IEventSourceStorer
接口即可:
public class RedisEventSourceStorer : IEventSourceStorer
{
private readonly IRedisClient _redisClient;
public RedisEventSourceStorer(IRedisClient redisClient)
{
_redisClient = redisClient;
}
// 往 Redis 中写入一条
public async ValueTask WriteAsync(IEventSource eventSource, CancellationToken cancellationToken)
{
await _redisClient.WriteAsync(...., cancellationToken);
}
// 从 Redis 中读取一条
public async ValueTask<IEventSource> ReadAsync(CancellationToken cancellationToken)
{
return await _redisClient.ReadAsync(...., cancellationToken);
}
}
最后,在注册 EventBus
服务中替换默认 IEventSourceStorer
:
services.AddEventBus(buidler =>
{
// 替换事件源存储器
buidler.ReplaceStorer(serviceProvider =>
{
var redisClient = serviceProvider.GetService<IRedisClient>();
return new RedisEventSourceStorer(redisClient);
});
});
22.4 自定义事件发布者
Furion
默认内置基于 Channel
的事件发布者 ChannelEventPublisher
。
如需自定义,只需实现 IEventPublisher
接口即可:
public class ToDoEventPublisher : IEventPublisher
{
private readonly IEventSourceStorer _eventSourceStorer;
public ChannelEventPublisher(IEventSourceStorer eventSourceStorer)
{
_eventSourceStorer = eventSourceStorer;
}
public async Task PublishAsync(IEventSource eventSource)
{
await _eventSourceStorer.WriteAsync(eventSource, eventSource.CancellationToken);
}
}
最后,在注册 EventBus
服务中替换默认 IEventPublisher
:
services.AddEventBus(buidler =>
{
// 替换事件源存储器
buidler.ReplacePublisher<ToDoEventPublisher>();
});
22.5 添加事件执行监视器
Furion
提供了 IEventHandlerMonitor
监视器接口,实现该接口可以监视所有订阅事件,包括 执行之前、执行之后,执行异常,共享上下文数据
。
如添加 ToDoEventHandlerMonitor
:
public class ToDoEventHandlerMonitor : IEventHandlerMonitor
{
private readonly ILogger<ToDoEventHandlerMonitor> _logger;
public ToDoEventHandlerMonitor(ILogger<ToDoEventHandlerMonitor> logger)
{
_logger = logger;
}
public Task OnExecutingAsync(EventHandlerExecutingContext context)
{
_logger.LogInformation("执行之前:{EventId}", context.Source.EventId);
return Task.CompletedTask;
}
public Task OnExecutedAsync(EventHandlerExecutedContext context)
{
_logger.LogInformation("执行之后:{EventId}", context.Source.EventId);
if (context.Exception != null)
{
_logger.LogError(context.Exception, "执行出错啦:{EventId}", context.Source.EventId);
}
return Task.CompletedTask;
}
}
最后,在注册 EventBus
服务中注册 ToDoEventHandlerMonitor
:
services.AddEventBus(buidler =>
{
// 添加事件执行监视器
buidler.AddMonitor<ToDoEventHandlerMonitor>();
});
22.6 添加事件执行器
Furion
提供了 IEventHandlerExecutor
执行器接口,可以让开发者自定义事件处理函数执行策略,如 超时控制,失败重试、熔断等等
。
如添加 RetryEventHandlerExecutor
:
public class RetryEventHandlerExecutor : IEventHandlerExecutor
{
public async Task ExecuteAsync(EventHandlerExecutingContext context, Func<EventHandlerExecutingContext, Task> handler)
{
// 如果执行失败,每隔 1s 重试,最多三次
await Retry(async () => {
await handler(context);
}, 3, 1000);
}
}
最后,在注册 EventBus
服务中注册 RetryEventHandlerExecutor
:
services.AddEventBus(buidler =>
{
// 添加事件执行器
buidler.AddExecutor<RetryEventHandlerExecutor>();
});
22.7 使用有作用域的服务
在 Furion
中, Event Bus
所有服务均注册为单例,如需使用作用域服务(单例服务可直接注入),可通过依赖注入 IServiceProvider
实例并通过 CreateScope()
创建一个作用域,如:
public class ToDoEventSubscriber : IEventSubscriber
{
private readonly ILogger<ToDoEventSubscriber> _logger;
public ToDoEventSubscriber(IServiceProvider services
, ILogger<ToDoEventSubscriber> logger)
{
Services = services;
_logger = logger;
}
public IServiceProvider Services { get; }
[EventSubscribe("ToDo:Create")]
public async Task CreateToDo(EventHandlerExecutingContext context)
{
using (var scope = Services.CreateScope())
{
var scopedProcessingService = scope.ServiceProvider.GetRequiredService<IScopedProcessingService>();
// ....
}
}
}
22.8 订阅执行任务意外异常
services.AddEventBus(buidler =>
{
// 订阅 EventBus 未捕获异常
buidler.UnobservedTaskExceptionHandler = (obj, args) =>
{
// ....
};
});
22.9 EventBusOptionsBuilder
配置
EventBusOptionsBuilder
是 AddEventBus
构建服务选项,该选项包含以下属性和方法:
- 属性
ChannelCapacity
:默认内存通道容量UnobservedTaskExceptionHandler
:订阅执行任务未察觉异常
- 方法
AddSubscriber<TEventSubscriber>
:添加订阅者ReplacePublisher<TEventPublisher>
:替换发布者ReplaceStorer(Func<IServiceProvider, IEventSourceStorer>)
:替换存储器AddMonitor<TEventHandlerMonitor>
:添加监视器AddExecutor<TEventHandlerExecutor>
:添加执行器
22.10 反馈与建议
与我们交流
给 Furion 提 Issue。