自己造轮子是一件苦差事。 现在,您可以专注于业务开发,仅需集成 ⭐️Furion⭐️ 即可。
Skip to main content

22. 事件总线

v2.20 以下版本说明

Furion v2.20+ 版本后采用 Jaina 事件总线替换原有的 EventBus查看旧文档

22.1 快速入门

  1. 定义事件订阅者 ToDoEventSubscriber
// 实现 IEventSubscriber 接口
public class ToDoEventSubscriber : IEventSubscriber
{
private readonly ILogger<ToDoEventSubscriber> _logger;
public ToDoEventSubscriber(ILogger<ToDoEventSubscriber> logger)
{
_logger = logger;
}

[EventSubscribe("ToDo:Create")]
public async Task CreateToDo(EventHandlerExecutingContext context)
{
var todo = context.Source;
_logger.LogInformation("创建一个 ToDo:{Name}", todo.Payload);
await Task.CompletedTask;
}

// 支持多个
[EventSubscribe("ToDo:Create")]
[EventSubscribe("ToDo:Update")]
public async Task CreateOrUpdateToDo(EventHandlerExecutingContext context)
{
var todo = context.Source;
_logger.LogInformation("创建或更新一个 ToDo:{Name}", todo.Payload);
await Task.CompletedTask;
}
}
  1. 创建控制器 ToDoController,依赖注入 IEventPublisher 服务:
public class ToDoController : ControllerBase
{
// 依赖注入事件发布者 IEventPublisher
private readonly IEventPublisher _eventPublisher;
public ToDoController(IEventPublisher eventPublisher)
{
_eventPublisher = eventPublisher;
}

// 发布 ToDo:Create 消息
public async Task CreateDoTo(string name)
{
await _eventPublisher.PublishAsync(new ChannelEventSource("ToDo:Create", name));
}
}
  1. Startup.cs 注册 EventBus 服务:
// 注册 EventBus 服务
services.AddEventBus(buidler =>
{
// 注册 ToDo 事件订阅者
buidler.AddSubscriber<ToDoEventSubscriber>();

// 批量注册事件订阅者
builder.AddSubscribers(ass1, ass2, ....);
});
懒人提醒

Furion 中可以不用通过 buidler.AddSubscriber<T>() 方式一一注册,只需要实现 ISingleton 接口即可,如:

public class ToDoEventSubscriber : IEventSubscriber, ISingleton
{
}

这样就无需写 buidler.AddSubscriber<ToDoEventSubscriber>(); 代码了。

  1. 运行项目:
info: Jaina.Samples.ToDoEventSubscriber[0]
创建一个 ToDo:Jaina

22.2 自定义事件源

Furion 使用 IEventSource 作为消息载体,任何实现该接口的类都可以充当消息载体。

如需自定义,只需实现 IEventSource 接口即可:

public class ToDoEventSource : IEventSource
{
public ToDoEventSource(string eventId, string todoName)
{
EventId = eventId;
ToDoName = todoName;
}

// 自定义属性
public string ToDoName { get; }

/// <summary>
/// 事件 Id
/// </summary>
public string EventId { get; }

/// <summary>
/// 事件承载(携带)数据
/// </summary>
public object Payload { get; }

/// <summary>
/// 取消任务 Token
/// </summary>
/// <remarks>用于取消本次消息处理</remarks>
public CancellationToken CancellationToken { get; }

/// <summary>
/// 事件创建时间
/// </summary>
public DateTime CreatedTime { get; } = DateTime.UtcNow;
}

使用:

await _eventPublisher.PublishAsync(new ToDoEventSource ("ToDo:Create", "我的 ToDo Name"));

22.3 自定义事件源存储器

Fruion 默认采用 Channel 作为事件源 IEventSource 存储器,开发者可以使用任何消息队列组件进行替换,如 Kafka、RabbitMQ、ActiveMQ 等,也可以使用部分数据库 Redis、SQL Server、MySql 实现。

如需自定义,只需实现 IEventSourceStorer 接口即可:

public class RedisEventSourceStorer : IEventSourceStorer
{
private readonly IRedisClient _redisClient;

public RedisEventSourceStorer(IRedisClient redisClient)
{
_redisClient = redisClient;
}

// 往 Redis 中写入一条
public async ValueTask WriteAsync(IEventSource eventSource, CancellationToken cancellationToken)
{
await _redisClient.WriteAsync(...., cancellationToken);
}

// 从 Redis 中读取一条
public async ValueTask<IEventSource> ReadAsync(CancellationToken cancellationToken)
{
return await _redisClient.ReadAsync(...., cancellationToken);
}
}

最后,在注册 EventBus 服务中替换默认 IEventSourceStorer

services.AddEventBus(buidler =>
{
// 替换事件源存储器
buidler.ReplaceStorer(serviceProvider =>
{
var redisClient = serviceProvider.GetService<IRedisClient>();
return new RedisEventSourceStorer(redisClient);
});
});

22.4 自定义事件发布者

Furion 默认内置基于 Channel 的事件发布者 ChannelEventPublisher

如需自定义,只需实现 IEventPublisher 接口即可:

public class ToDoEventPublisher : IEventPublisher
{
private readonly IEventSourceStorer _eventSourceStorer;

public ChannelEventPublisher(IEventSourceStorer eventSourceStorer)
{
_eventSourceStorer = eventSourceStorer;
}

public async Task PublishAsync(IEventSource eventSource)
{
await _eventSourceStorer.WriteAsync(eventSource, eventSource.CancellationToken);
}
}

最后,在注册 EventBus 服务中替换默认 IEventPublisher

services.AddEventBus(buidler =>
{
// 替换事件源存储器
buidler.ReplacePublisher<ToDoEventPublisher>();
});

22.5 添加事件执行监视器

Furion 提供了 IEventHandlerMonitor 监视器接口,实现该接口可以监视所有订阅事件,包括 执行之前、执行之后,执行异常,共享上下文数据

如添加 ToDoEventHandlerMonitor

public class ToDoEventHandlerMonitor : IEventHandlerMonitor
{
private readonly ILogger<ToDoEventHandlerMonitor> _logger;
public ToDoEventHandlerMonitor(ILogger<ToDoEventHandlerMonitor> logger)
{
_logger = logger;
}

public Task OnExecutingAsync(EventHandlerExecutingContext context)
{
_logger.LogInformation("执行之前:{EventId}", context.Source.EventId);
return Task.CompletedTask;
}

public Task OnExecutedAsync(EventHandlerExecutedContext context)
{
_logger.LogInformation("执行之后:{EventId}", context.Source.EventId);

if (context.Exception != null)
{
_logger.LogError(context.Exception, "执行出错啦:{EventId}", context.Source.EventId);
}

return Task.CompletedTask;
}
}

最后,在注册 EventBus 服务中注册 ToDoEventHandlerMonitor

services.AddEventBus(buidler =>
{
// 添加事件执行监视器
buidler.AddMonitor<ToDoEventHandlerMonitor>();
});

22.6 添加事件执行器

Furion 提供了 IEventHandlerExecutor 执行器接口,可以让开发者自定义事件处理函数执行策略,如 超时控制,失败重试、熔断等等

如添加 RetryEventHandlerExecutor

public class RetryEventHandlerExecutor : IEventHandlerExecutor
{
public async Task ExecuteAsync(EventHandlerExecutingContext context, Func<EventHandlerExecutingContext, Task> handler)
{
// 如果执行失败,每隔 1s 重试,最多三次
await Retry(async () => {
await handler(context);
}, 3, 1000);
}
}

最后,在注册 EventBus 服务中注册 RetryEventHandlerExecutor

services.AddEventBus(buidler =>
{
// 添加事件执行器
buidler.AddExecutor<RetryEventHandlerExecutor>();
});

22.7 使用有作用域的服务

Furion 中, Event Bus 所有服务均注册为单例,如需使用作用域服务(单例服务可直接注入),可通过依赖注入 IServiceProvider 实例并通过 CreateScope() 创建一个作用域,如:

public class ToDoEventSubscriber : IEventSubscriber
{
private readonly ILogger<ToDoEventSubscriber> _logger;

public ToDoEventSubscriber(IServiceProvider services
, ILogger<ToDoEventSubscriber> logger)
{
Services = services;
_logger = logger;
}

public IServiceProvider Services { get; }

[EventSubscribe("ToDo:Create")]
public async Task CreateToDo(EventHandlerExecutingContext context)
{
using (var scope = Services.CreateScope())
{
var scopedProcessingService = scope.ServiceProvider.GetRequiredService<IScopedProcessingService>();
// ....
}
}
}

22.8 订阅执行任务意外异常

services.AddEventBus(buidler =>
{
// 订阅 EventBus 未捕获异常
buidler.UnobservedTaskExceptionHandler = (obj, args) =>
{
// ....
};
});

22.9 EventBusOptionsBuilder 配置

EventBusOptionsBuilderAddEventBus 构建服务选项,该选项包含以下属性和方法:

  • 属性
    • ChannelCapacity:默认内存通道容量
    • UnobservedTaskExceptionHandler:订阅执行任务未察觉异常
  • 方法
    • AddSubscriber<TEventSubscriber>:添加订阅者
    • ReplacePublisher<TEventPublisher>:替换发布者
    • ReplaceStorer(Func<IServiceProvider, IEventSourceStorer>):替换存储器
    • AddMonitor<TEventHandlerMonitor>:添加监视器
    • AddExecutor<TEventHandlerExecutor>:添加执行器

22.10 反馈与建议

与我们交流

给 Furion 提 Issue