33. IPC 进程通信
33.1 什么是 IPC
引用百度百科
IPC(Inter-Process Communication,进程间通信)。进程间通信是指两个进程的数据之间产生交互。
通俗点说,IPC
可以实现不同应用程序间通信(交互数据)。
33.2 实现 IPC
通信方式
- 半双工 Unix 管道
- FIFOs(命名管道)
- 消息队列(常用模式)
- 信号量
- 共享内存(常用模式,
Furion
框架默认实现方式) - 网络 Socket(常用模式)
33.3 IPC
通信模式
IPC
本身指的是 进程间
通信,但 Furion
框架将内置 进程间/内
两种进程通信模式。
进程内通信
:Furion
采用Channel
管道提供进程内通信进程外通信
:Furion
采用MemoryMapperFile
共享内存方式实现进程外通信(后续版本完善)
33.4 进程内通信(线程间)
进程内通信俗称线程间通信,Furion
框架采用 C#
提供的 Channel(管道)
+ Lazy
+ Task.Factory
实现长时间高性能的线程间通信机制。Channel
管道也是目前 .NET/C#
实现 生产者-订阅者
模式最简易且最为强大的实现。
33.4.1 了解 Channel
Channel
是在 .NET Core 2.1+
版本之后加入。Channel
底层实现是一个高效的、线程安全的队列,可以在线程之间传递数据。
Channel
的主要应用场景是 发布/订阅、观察者模式
中使用,如:事件总线
就是最好的实现方式。通过 Channel
实现 生产-消费
机制可以减少项目间的耦合,提高应用吞吐量。
Furion
框架提供了 ChannelContext<TMessage, THandler>
密封类,提供 UnBoundedChannel
和 BoundedChannel
两种管道通信模式。
UnBoundedChannel
:具有无限容量的Channel
, 生产者可以全速进行生产数据,但如果消费者的消费速度低于生产者,Channel
的资源使用会无限增加,会有服务器资源耗尽的可能。BoundedChannel
:具有有限容量的Channel
,Furion
框架默认为1000
,到达上限后,生产者进入等待写入直到有空闲,好处是可以控制生产的速度,控制系统资源的使用。(推荐)
33.4.2 常规使用
创建 ChannelHandler<TMessage>
管道处理程序
using Furion.IPCChannel;
using System;
using System.Threading.Tasks;
namespace Furion.Core
{
/// <summary>
/// 创建管道处理程序(处理 String 类型消息)
/// </summary>
public class MyChannelHandler : ChannelHandler<string>
{
/// <summary>
/// 接受到管道消息后处理程序
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
public override Task InvokeAsync(string message)
{
Console.WriteLine(message);
return Task.CompletedTask;
}
}
}
note
ChannelHandler<TMessage>
泛型类型决定了你要接受那种类型的消息,不同类型消息将会自动过滤筛选。
使用 ChannelContext<TMessage, THandler>
发送消息
public async Task SendAsync()
{
for (int i = 0; i < 100; i++)
{
// 使用有限容量生产数据
await ChannelContext<string, MyChannelHandler>.BoundedChannel.Writer.WriteAsync($"Loop {i} times.");
}
}
以上代码也可以通过 ChannelContext<string, MyChannelHandler>.BoundedChannel.Writer.TryWrite()
同步写入。
33.4.3 实现多订阅
默认情况下,Furion
初始化了一个长时间的 Task
任务进行数据检查及订阅,如需实现多订阅模式,可创建新的订阅任务即可:
var reader = ChannelContext<string, MyChannelHandler>.BoundedChannel.Reader;
// 创建长时间线程管道读取器
_ = Task.Factory.StartNew(async () =>
{
while (await reader.WaitToReadAsync())
{
if (!reader.TryRead(out var message)) continue;
// 默认重试 3 次(每次间隔 1s)
await Retry.Invoke(async () => await Activator.CreateInstance<MyChannelHandler>().InvokeAsync(message), 3, 1000, finalThrow: false);
}
}, TaskCreationOptions.LongRunning);
33.4.4 更多 Channel
知识
可查阅 Dotnet Core 下的 Channel, 你用了吗? 博客教程(😃 写的不错)
33.4.5 CallContext
方式
在 Furion v2.18+
版本提供了 CallContext
静态类,内部使用 AsyncLocal<T>
实现,也可以实现线程间通信,如:
CallContext.SetLocalValue("name", "Furion");
CallContext.GetLocalValue("name");
CallContext<int>.SetLocalValue("count", 1);
CallContext<int>.GetLocalValue("count");
34.5 进程外通信(共享内存)
Furion
目前暂未提供的进程外通信功能,将在后续版本实现(主要是模块设计还未想好,技术已实现)。
主要是通过 MemoryMapperFile
实现共享内存达到进程外通信功能,了解更多 MemoryMapperFile
33.6 反馈与建议
与我们交流
给 Furion 提 Issue。