百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术文章 > 正文

Maomi.MQ 2.0 | 功能强大的 .NET 消息队列通讯模型框架

cac55 2025-03-24 14:18 31 浏览 0 评论

说明

作者:痴者工良

文档地址:https://mmq.whuanle.cn

仓库地址:https://github.com/whuanle/Maomi.MQ

作者博客:

  • o https://www.whuanle.cn
  • o https://www.cnblogs.com/whuanle

导读

Maomi.MQ 是一个简化了消息队列使用方式的通讯框架,目前支持了 RabbitMQ。

Maomi.MQ.RabbitMQ 是一个用于专为 RabbitMQ 设计的发布者和消费者通讯模型,大大简化了发布和消息的代码,并提供一系列简便和实用的功能,开发者可以通过框架提供的消费模型实现高性能消费、事件编排,框架还支持发布者确认机制、自定义重试机制、补偿机制、死信队列、延迟队列、连接通道复用等一系列的便利功能。开发者可以把更多的精力放到业务逻辑中,通过 Maomi.MQ.RabbitMQ 框架简化跨进程消息通讯模式,使得跨进程消息传递更加简单和可靠。

此外,框架通过 runtime 内置的 api 支持了分布式可观测性,可以通过进一步使用 OpenTelemetry 等框架进一步收集可观测性信息,推送到基础设施平台中。

快速开始

在本篇教程中,将介绍 Maomi.MQ.RabbitMQ 的使用方法,以便读者能够快速了解该框架的使用方式和特点。


创建一个 Web 项目(可参考 WebDemo 项目),引入 Maomi.MQ.RabbitMQ 包,在 Web 配置中注入服务:


// using Maomi.MQ;
// using RabbitMQ.Client;

builder.Services.AddMaomiMQ((MqOptionsBuilder options)=>
{
options.WorkId=1;
options.AppName="myapp";
options.Rabbit=(ConnectionFactory options)=>
{
options.HostName=Environment.GetEnvironmentVariable("RABBITMQ")!;
options.Port=5672;
options.ClientProvidedName=Assembly.GetExecutingAssembly().GetName().Name;
};
},[typeof(Program).Assembly]);

var app = builder.Build();

  • o WorkId: 指定用于生成分布式雪花 id 的节点 id,默认为 0。
    每条消息生成一个唯一的 id,便于追踪。如果不设置雪花id,在分布式服务中,多实例并行工作时,可能会产生相同的 id。
  • o AppName:用于标识消息的生产者,以及在日志和链路追踪中标识消息的生产者或消费者。
  • o Rabbit:RabbitMQ 客户端配置,请参考 ConnectionFactory

定义消息模型类,模型类是 MQ 通讯的消息基础,该模型类将会被序列化为二进制内容传递到 RabbitMQ 服务器中。


public classTestEvent
{
publicintId{get;set;}

public override string ToString()
{
returnId.ToString();
}
}

定义消费者,消费者需要实现 IConsumer 接口,以及使用 [Consumer] 特性注解配置消费者属性,如下所示,[Consumer("test")] 表示该消费者订阅的队列名称是 test

IConsumer 接口有三个方法,ExecuteAsync 方法用于处理消息,FaildAsync 会在 ExecuteAsync 异常时立即执行,如果代码一直异常,最终会调用 FallbackAsync 方法,Maomi.MQ 框架会根据 ConsumerState 值确定是否将消息放回队列重新消费,或者做其它处理动作。


[Consumer("test")]
publicclassMyConsumer:IConsumer<TestEvent>
{
// 消费
publicasyncTaskExecuteAsync(MessageHeader messageHeader,TestEvent message)
{
Console.WriteLine($"事件 id: {message.Id} {DateTime.Now}");
awaitTask.CompletedTask;
}

// 每次消费失败时执行
publicTaskFaildAsync(MessageHeader messageHeader,Exception ex,int retryCount,TestEvent message)
=>Task.CompletedTask;

// 补偿
publicTask<ConsumerState>FallbackAsync(MessageHeader messageHeader,TestEvent? message,Exception? ex)
=>Task.FromResult(ConsumerState.Ack);
}

Maomi.MQ 还具有多种消费者模式,代码写法不一样,后续会详细讲解不同的消费者模式。


如果要发布消息,只需要注入 IMessagePublisher 服务即可。


[ApiController]
[Route("[controller]")]
publicclassIndexController:ControllerBase
{
privatereadonlyIMessagePublisher _messagePublisher;

publicIndexController(IMessagePublisher messagePublisher)
{
_messagePublisher = messagePublisher;
}

[HttpGet("publish")]
publicasyncTask<string>Publisher()
{
// 发布消息
await _messagePublisher.PublishAsync(exchange:string.Empty, routingKey:"test", message:newTestEvent
{
Id=123
});
return"ok";
}
}

启动 Web 服务,在 swagger 页面上请求 API 接口,MyConsumer 服务会立即接收到发布的消息。


如果是控制台项目,则需要引入 Microsoft.Extensions.Hosting 包,以便让消费者在后台订阅队列消费消息。

参考 ConsoleDemo 项目。


using Maomi.MQ;
usingMicrosoft.Extensions.Hosting;
usingMicrosoft.Extensions.Logging;
usingRabbitMQ.Client;
usingSystem.Reflection;

var host =newHostBuilder()
.ConfigureLogging(options =>
{
options.AddConsole();
options.AddDebug();
})
.ConfigureServices(services =>
{
services.AddMaomiMQ(options =>
{
options.WorkId=1;
options.AppName="myapp";
options.Rabbit=(ConnectionFactory options)=>
{
options.HostName=Environment.GetEnvironmentVariable("RABBITMQ")!;
options.Port=5672;
options.ClientProvidedName=Assembly.GetExecutingAssembly().GetName().Name;
};
},newSystem.Reflection.Assembly[]{typeof(Program).Assembly});

}).Build();

// 后台运行
var task = host.RunAsync();

Console.ReadLine();

消息发布者

消息发布者用于推送消息到 RabbitMQ 服务器中,Maomi.MQ 支持多种消息发布者模式,支持 RabbitMQ 事务模式等,示例项目请参考 PublisherWeb

Maomi.MQ 通过 IMessagePublisher 向开发者提供消息推送服务。


在发布消息之前,需要定义一个事件模型类,用于传递消息。


public classTestEvent
{
publicintId{get;set;}

public override string ToString()
{
returnId.ToString();
}
}

然后注入 IMessagePublisher 服务,发布消息:


[ApiController]
[Route("[controller]")]
publicclassIndexController:ControllerBase
{
privatereadonlyIMessagePublisher _messagePublisher;

publicIndexController(IMessagePublisher messagePublisher)
{
_messagePublisher = messagePublisher;
}

[HttpGet("publish")]
publicasyncTask<string>Publisher()
{
for(var i =0; i <100; i++)
{
await _messagePublisher.PublishAsync(exchange:string.Empty, routingKey:"publish", message:newTestEvent
{
Id= i
});
}

return"ok";
}
}

一般情况下,一个模型类只应该被一个消费者所使用,那么通过事件可以找到唯一的消费者,也就是通过事件类型找到消费者的 IConsumerOptions,此时框架可以使用对应的配置发送消息。

TestMessageEvent 模型只有一个消费者:


[Consumer("publish", Qos = 1, RetryFaildRequeue = true)]
public class TestEventConsumer : IConsumer<TestMessageEvent>
{
// ... ...
}

可以直接发送事件,不需要填写交换器(Exchange)和路由键(RoutingKey)。


[HttpGet("publish_message")]
publicasyncTask<string>PublisherMessage()
{
// 如果在本项目中 TestMessageEvent 只指定了一个消费者,那么通过 TestMessageEvent 自动寻找对应的配置
for(var i =0; i <100; i++)
{
await _messagePublisher.PublishAsync(model:newTestMessageEvent
{
Id= i
});
}

return"ok";
}

IMessagePublisher

IMessagePublisher 是 Maomi.MQ 的基础消息发布接口,有以下方法:


// 消息发布者.
publicinterfaceIMessagePublisher
{
TaskPublishAsync<TMessage>(string exchange, // 交换器名称.
string routingKey,// 队列/路由键名称.
TMessage message, // 事件对象.
Action<BasicProperties> properties,
CancellationToken cancellationToken =default)
whereTMessage:class;

TaskPublishAsync<TMessage>(string exchange,
string routingKey,
TMessage message,
BasicProperties? properties =default,
CancellationToken cancellationToken =default);

TaskPublishAsync<TMessage>(TMessage message,
Action<BasicProperties>? properties =,
CancellationToken cancellationToken =default)
whereTMessage:class;

TaskPublishAsync<TMessage>(TMessage model,
BasicProperties? properties =default,
CancellationToken cancellationToken =default);

TaskCustomPublishAsync<TMessage>(string exchange,
string routingKey,
TMessage message,
BasicProperties? properties =default,
CancellationToken cancellationToken =default);
}

Maomi.MQ 的消息发布接口就这么几个,由于直接公开了 BasicProperties ,因此开发者完全自由配置 RabbitMQ 原生的消息属性,所以接口比较简单,开发者使用接口时可以灵活一些,使用难度也不大。


BasicProperties 是 RabbitMQ 中的消息基础属性对象,直接面向开发者,可以消息的发布和消费变得灵活和丰富功能,例如,可以通过 BasicProperties 配置单条消息的过期时间:


await _messagePublisher.PublishAsync(exchange:string.Empty, routingKey:"publish", message:newTestEvent
{
Id= i
},(BasicProperties p)=>
{
p.Expiration="1000";
});

Maomi.MQ 通过 DefaultMessagePublisher 类型实现了 IMessagePublisher,DefaultMessagePublisher 默认生命周期是 Scoped:


services.AddScoped<IMessagePublisher, DefaultMessagePublisher>();

开发者也可以自行实现 IMessagePublisher 接口,实现自己的消息发布模型,具体示例请参考 DefaultMessagePublisher 类型。

原生通道

开发者可以通过 ConnectionPool 服务获取原生连接对象,直接在 IConnection 上使用 RabbitMQ 的接口发布消息:


private readonly ConnectionPool _connectionPool;

var connectionObject = _connectionPool.Get();
connectionObject.DefaultChannel.BasicPublishAsync(... ...);

常驻内存连接对象

Maomi.MQ 通过 ConnectionPool 管理

相关推荐

上网行为管理有什么用,行为审计软件解决方案?

上网行为管理有什么用,行为审计软件解决方案?我们生活在互联网的时代,网络是比较复杂的,有时候经常会出现一些网络违规现象,这时候就可以进行上网行为管理了,现在有很多的公司都会进行上网行为管理,因为这样可...

上网行为管理软件如何监控员工访问网址信息

上网行为管理软件可以通过域之盾软件来监控员工访问的网址信息:主要方式↓1、网页日志记录上网行为管理软件可以通过网页日志记录功能,完整地记录员工在工作中访问的所有网站,包括访问时间、访问页面、访问方式等...

企业如何确保上网行为管理顺利进行?

企业的稳定长久发展离不开对员工的上网行为管理,因为员工的上网行为关乎到工作效率、生产效率、以及重要的数据信息安全问题。一旦有不规范的上网行为,便容易造成企业终端数据泄密事件,对企业造成重大的财产损失以...

员工上网行为监控如何实现?六个妙招!帮你轻松管理员工上网!

员工上网行为不仅关乎个人工作效率,更直接影响到企业的信息安全和整体运营。不当的上网行为,如访问非法网站、泄露公司机密、长时间闲聊等,都可能给企业带来不可估量的损失。因此,监控员工上网行为,不仅是为了提...

终端管理系统规范企业上网行为管理

企业内部经常会遇到不同的上网行为管理问题,如职员在上班时间炒股、打游戏、上网聊天等不正当的上网行为,用U盘、硬盘等移动设备随意拷贝资料,终端资产难以管理等,降低办公效率的同时,也增加了企业内部泄密风...

企业如何进行上网行为管理?_如何管理企业网络

企业如何进行上网行为管理?为了保障网络安全,提高员工工作效率,企业有必要部署上网行为管理。上网行为管理可对企业内部员工的上网行为进行全方位有效管理,保护Web访问安全,降低互联网使用风险,避免机密信息...

演员赵露思的官方微博注销的原因是什么?

根据多方权威媒体报道及平台验证,演员赵露思的微博账号已于2025年8月19日正式注销。目前搜索该账号显示“因用户自行申请关闭,现已无法查看”。这一结果源于她8月13日在直播中宣布的注销决定,当时她直言...

分手传闻仅4月,关晓彤的一张海边亲吻照,撕碎了鹿晗最后的体面

“原来八年真能被四个月的‘海边吻照’一键清空。”热搜上那张模糊的侧脸一贴,评论瞬间爆炸:关晓彤亲的是李昀锐,鹿晗的生日祝福还停留在去年。八年,够让一部剧从开播到大结局,也够让一对顶流情侣把微博背景换成...

计算机基础知识(五)(3)_计算机基础知识100道

四、如何使用计算机4、软件的通用使用方法计算机发展到现在,正常使用时均采用窗口式界面。我们的介绍不涉及苹果机。操作系统从DOS(DOS阶段有两类,一类是微软的,称为MS—DOS;另一类称为PC—DOS...

笔记本的这些基础知识,你再不知道,就真的被社会淘汰了!

请您在阅读前点击上面的“关注”二字,后续会为您提供更多有价值的电脑知识,感谢每一位朋友的支持!这篇文章依然是给大家讲讲关于笔记本电脑的基础知识,新手小白可一定要收藏起来,以后想看的时候可以找出来就看,...

电脑基础知识,引起电脑故障的原因

引起电脑出现故障的原因非常多,概括来说主要包括以下几个方面的问题。操作不当:操作不当是指误删除文件或非法关机等不当操作。操作不当通常会造成电脑程序无法运行或电脑无法启动,修复此类故障,只要将删除或损坏...

笔记本电脑新手使用教程,笔记本电脑使用技巧

学电脑能够快速入门是每个新手梦寐以求的事情,但是不是每个人都能快速入门的。但是如果定制好合理计划,循序渐进,就会收到非常好的效果。今天小编来跟大家说说笔记本电脑新手教程的详细介绍-装机吧,大家一起...

电脑基础知识:(二)电脑主机_电脑主机组成图解

你好!我是麦秋~前言现代生活随着互联网通讯的快速发展,通讯技术日趋完美,应用软件日益普及,电脑的使用与维护则成为人们日常生活中密不可分的、重要的生活内容。电脑恐怕是中老年人的短板,然而又是急需解决的问...

电脑基础知识:(六)电脑应用程序_电脑的应用程序是什么意思

你好!我是麦秋~应用软件是专为解决一些具体问题的软件,是体现电脑用途的部分,种类繁多,如:办公软件、游戏软件、杀毒软件等;自媒体平台上的西瓜、头条、抖音、剪映等,对于上述软件主要是操作和使用上的问题,...

键盘操作方法大全_键盘操作教程

【键盘操作方法大全】键盘可不仅仅能帮我们打字哦,还有很多快捷的操作你都知道吗?除了Ctrl+C、Ctrl+V以外,再多学几种吧,让你用起电脑来十指如飞~别再慢慢用鼠标点了,用开始键+Tab键切换程序让...

取消回复欢迎 发表评论: