自己造轮子是一件苦差事。 现在,您可以专注于业务开发,仅需集成 ⭐️Furion⭐️ 即可。
Skip to main content

33. IPC 进程通信

33.1 什么是 IPC

引用百度百科

IPC(Inter-Process Communication,进程间通信)。进程间通信是指两个进程的数据之间产生交互。

通俗点说,IPC 可以实现不同应用程序间通信(交互数据)。

33.2 实现 IPC 通信方式

  • 半双工 Unix 管道
  • FIFOs(命名管道)
  • 消息队列(常用模式)
  • 信号量
  • 共享内存(常用模式,Furion 框架默认实现方式)
  • 网络 Socket(常用模式)

33.3 IPC 通信模式

IPC 本身指的是 进程间 通信,但 Furion 框架将内置 进程间/内 两种进程通信模式。

  • 进程内通信Furion 采用 Channel 管道提供进程内通信
  • 进程外通信Furion 采用 MemoryMapperFile 共享内存方式实现进程外通信(后续版本完善)

33.4 进程内通信(线程间)

进程内通信俗称线程间通信,Furion 框架采用 C# 提供的 Channel(管道) + Lazy + Task.Factory 实现长时间高性能的线程间通信机制。Channel 管道也是目前 .NET/C# 实现 生产者-订阅者 模式最简易且最为强大的实现。

33.4.1 了解 Channel

Channel 是在 .NET Core 2.1+ 版本之后加入。Channel 底层实现是一个高效的、线程安全的队列,可以在线程之间传递数据。

Channel 的主要应用场景是 发布/订阅、观察者模式 中使用,如:事件总线 就是最好的实现方式。通过 Channel 实现 生产-消费 机制可以减少项目间的耦合,提高应用吞吐量。

Furion 框架提供了 ChannelContext<TMessage, THandler> 密封类,提供 UnBoundedChannelBoundedChannel 两种管道通信模式。

  • UnBoundedChannel:具有无限容量的 Channel, 生产者可以全速进行生产数据,但如果消费者的消费速度低于生产者,Channel 的资源使用会无限增加,会有服务器资源耗尽的可能。
  • BoundedChannel:具有有限容量的 ChannelFurion 框架默认为 1000,到达上限后,生产者进入等待写入直到有空闲,好处是可以控制生产的速度,控制系统资源的使用。(推荐)

33.4.2 常规使用

创建 ChannelHandler<TMessage> 管道处理程序

using Furion.IPCChannel;
using System;
using System.Threading.Tasks;

namespace Furion.Core
{
/// <summary>
/// 创建管道处理程序(处理 String 类型消息)
/// </summary>
public class MyChannelHandler : ChannelHandler<string>
{
/// <summary>
/// 接受到管道消息后处理程序
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
public override Task InvokeAsync(string message)
{
Console.WriteLine(message);

return Task.CompletedTask;
}
}
}
note

ChannelHandler<TMessage> 泛型类型决定了你要接受那种类型的消息,不同类型消息将会自动过滤筛选。

使用 ChannelContext<TMessage, THandler> 发送消息

public async Task SendAsync()
{
for (int i = 0; i < 100; i++)
{
// 使用有限容量生产数据
await ChannelContext<string, MyChannelHandler>.BoundedChannel.Writer.WriteAsync($"Loop {i} times.");
}
}

以上代码也可以通过 ChannelContext<string, MyChannelHandler>.BoundedChannel.Writer.TryWrite() 同步写入。

33.4.3 实现多订阅

默认情况下,Furion 初始化了一个长时间的 Task 任务进行数据检查及订阅,如需实现多订阅模式,可创建新的订阅任务即可:

var reader = ChannelContext<string, MyChannelHandler>.BoundedChannel.Reader;

// 创建长时间线程管道读取器
_ = Task.Factory.StartNew(async () =>
{
while (await reader.WaitToReadAsync())
{
if (!reader.TryRead(out var message)) continue;
// 默认重试 3 次(每次间隔 1s)
await Retry.Invoke(async () => await Activator.CreateInstance<MyChannelHandler>().InvokeAsync(message), 3, 1000, finalThrow: false);
}
}, TaskCreationOptions.LongRunning);

33.4.4 更多 Channel 知识

可查阅 Dotnet Core 下的 Channel, 你用了吗? 博客教程(😃 写的不错)

33.4.5 CallContext 方式

Furion v2.18+ 版本提供了 CallContext 静态类,内部使用 AsyncLocal<T> 实现,也可以实现线程间通信,如:

CallContext.SetLocalValue("name", "Furion");
CallContext.GetLocalValue("name");

CallContext<int>.SetLocalValue("count", 1);
CallContext<int>.GetLocalValue("count");

34.5 进程外通信(共享内存)

Furion 目前暂未提供的进程外通信功能,将在后续版本实现(主要是模块设计还未想好,技术已实现)。

主要是通过 MemoryMapperFile 实现共享内存达到进程外通信功能,了解更多 MemoryMapperFile

33.6 反馈与建议

与我们交流

给 Furion 提 Issue