RabbitMQ 官方NET教程(二)【工作队列】
cac55 2024-11-15 16:38 22 浏览 0 评论
这篇中我们将会创建一个工作队列用来在工作者(consumer)间分发耗时任务。
工作队列的主要任务是:避免立刻执行资源密集型任务和避免必须等待其完成。相反地,我们进行任务调度:我们把任务封装为消息发送给队列。工作进行在后台运行并不断的从队列中取出任务然后执行。当你运行了多个工作进程时,任务队列中的任务将会被这些工作进程共享执行。
这样的概念在web应用中极其有用,当在很短的HTTP请求间需要执行复杂的任务。
准备
在本教程的前面部分,我们发送了一个包含Hello World!的消息。 现在我们将发送代替复杂任务的字符串。 我们没有一个现实世界的任务,比如图像被调整大小,或者是要渲染的pdf文件,所以假设我们很忙 - 通过使用Thread.sleep()函数来假冒它。 我们将把字符串中的点数作为其复杂度; 每个点都将占“work”的一秒钟。 例如,由Hello...描述的假任务将需要三秒钟。
我们将稍后从之前的例子中修改Send程序,以允许从命令行发送任意消息。 这个程序会将任务安排到我们的工作队列中,所以让我们命名为NewTask:
dotnet new console --name NewTask
mv NewTask/Program.cs NewTask/NewTask.cs
dotnet new console --name Worker
mv Worker/Program.cs Worker/Worker.cs
cd NewTask
dotnet add package RabbitMQ.Client
dotnet restore
cd ../Worker
dotnet add package RabbitMQ.Client
dotnet restore
var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
channel.BasicPublish(exchange: "",
routingKey: "task_queue",
basicProperties: properties,
body: body);
有些帮助从命令行参数获取消息:
private static string GetMessage(string[] args)
{
return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
}
我们的旧的Receive.cs脚本还需要一些更改:它需要为消息体中的每个点伪造一秒的工作时间。 它将处理RabbitMQ发送的消息并执行任务,因此我们将其复制到Worker项目并修改:
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
int dots = message.Split('.').Length - 1;
Thread.Sleep(dots * 1000);
Console.WriteLine(" [x] Done");
};
channel.BasicConsume(queue: "task_queue", noAck: true, consumer: consumer);
我们假任务到模拟执行时间:
int dots = message.Split('.').Length - 1;
Thread.Sleep(dots * 1000);
循环调度
使用任务队列的优点之一是能够轻松地并行工作。 如果我们正在建立积压的工作,我们可以增加更多的工作者,这样可以轻松扩展。
首先,我们同时尝试运行两个Worker实例。 他们都会从队列中获取消息,但是究竟如何? 让我们来看看。
你需要三个控制台打开。 两个将运行Worker程序。 这些控制台将是我们两个消费者 - C1和C2。
# shell 1
cd Worker
dotnet run
# => [*] Waiting for messages. To exit press CTRL+C
# shell 2
cd Worker
dotnet run
# => [*] Waiting for messages. To exit press CTRL+C
在第三个我们将发布新的任务。 一旦您已经开始使用消费者,您可以发布一些消息:
# shell 3
cd NewTask
dotnet run "First message."
dotnet run "Second message.."
dotnet run "Third message..."
dotnet run "Fourth message...."
dotnet run "Fifth message....."
让我们看看送给我们workers的内容:
# shell 1
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'First message.'
# => [x] Received 'Third message...'
# => [x] Received 'Fifth message.....'
# shell 2
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'Second message..'
# => [x] Received 'Fourth message....'
默认情况下,RabbitMQ将按顺序将每条消息发送给下一个消费者。 平均每个消费者将获得相同数量的消息。 这种分发消息的方式叫做循环(round-robin)。 与三名或更多的workers一起尝试。
消息应答(message acknowledgments)
执行一个任务需要花费几秒钟。你可能会担心当一个消费者在执行任务时发生中断。使用我们当前的代码,一旦RabbitMQ向客户发送消息,它立即将其从内存中删除。在这种情况下,如果杀死正在执行任务的某个工作者,我们会丢失它正在处理的信息。我们也会丢失已经转发给这个工作者且它还未执行的消息。
但是我们不想失去任何任务。如果一个worker挂了,我们希望把这个任务交给另一个工作者。
为了确保消息永远不会丢失,RabbitMQ支持消息确认。从消费者发送一个确认信息告诉RabbitMQ已经收到,处理了特定的消息,然后RabbitMQ可以自由删除它。
如果消费者死机(其通道关闭,连接关闭或TCP连接丢失),而不发送确认信息,RabbitMQ将会明白消息未被完全处理并重新排队。如果同时有其他消费者在线,则会迅速将其重新提供给另一个消费者。这样就可以确保没有消息丢失,即使工作者偶然死亡。
没有任何消息超时; RabbitMQ将在消费者挂了时重新发送消息。如果消费者处理一个信息需要耗费特别特别长的时间是允许的。
消息确认默认情况下打开。 在前面的例子中,我们通过将noAck(“no manual acks”)参数设置为true来明确地将其关闭。 一旦完成任务,现在该删除这个标志并发送正确的确认。
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
int dots = message.Split('.').Length - 1;
Thread.Sleep(dots * 1000);
Console.WriteLine(" [x] Done");
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: "task_queue", noAck: false, consumer: consumer);
使用这个代码,我们可以确定即使在处理消息时,使用CTRL + C杀死一个工作者,也不会丢失任何东西。工作者挂了之后不久,所有未确认的消息将被重新发送。
忘记确认
丢失BasicAck是一个常见的错误。 这是一个容易的错误,但后果是严重的。
当您的客户端退出(可能看起来像随机重新传递)时,消息将被重新传递,但是RabbitMQ将会消耗越来越多的内存,因为它将无法释放任何未包含的消息。
为了调试这种错误,您可以使用rabbitmqctl打印messages_unacknowledged字段:
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
在Windows上:
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
消息持久化(Message durability)
我们已经学会了如何确保即使消费者死亡,任务也不会丢失。 但是如果RabbitMQ服务器停止,我们的任务仍然会丢失。
当RabbitMQ退出或崩溃时,它会忘记队列和消息,除非你告诉它不要丢失。需要两件事来确保消息不会丢失:我们需要将所有队列和消息标记为持久化。
首先,我们需要确保RabbitMQ不会丢失我们的队列。 为了这样做,我们需要将其声明为持久的:
channel.QueueDeclare(queue: "hello",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
虽然这个命令本身是正确的,但是在我们目前的设置中是不行的。 这是因为我们已经定义了一个非持久化的名为hello的队列。 RabbitMQ不允许您重新定义具有不同参数的现有队列,并会向尝试执行此操作的任何程序返回错误。 但是有一个快速的解决方法 - 让我们用不同的名称声明一个队列,例如task_queue:
channel.QueueDeclare(queue: "task_queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
这个queueDeclare更改需要应用于生产者和消费者代码。
在这一点上,我们确信,即使RabbitMQ重新启动,task_queue队列也不会丢失。 现在我们需要将我们的消息标记为持久性 - 将IBasicProperties.SetPersistent设置为true。
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
注意消息持久性
将消息标记为持久性不能完全保证消息不会丢失。 虽然它告诉RabbitMQ将消息保存到磁盘,但是当RabbitMQ接受消息并且还没有保存时,仍然有一个很短的时间窗口。 此外,RabbitMQ不会对每个消息执行`fsync`(同步内存中所有已修改的文件数据到储存设备) - 它可能只是保存到缓存中,而不是真正写入磁盘。 持久性保证不强,但对我们的简单任务队列来说已经足够了。 如果您需要更强大的保证,那么您可以使用[publisher confirms](https://www.rabbitmq.com/confirms.html)。
公平转发(Fair dispatch)
或许会发现,目前的消息转发机制(Round-robin)并非是我们想要的。例如,这样一种情况,对于两个消费者,有一系列的任务,奇数任务特别耗时,而偶数任务却很轻松,这样造成一个消费者一直繁忙,另一个消费者却很快执行完任务后等待。
造成这样的原因是因为RabbitMQ仅仅是当消息到达队列进行转发消息。并不在乎有多少任务消费者并未传递一个应答给RabbitMQ。仅仅盲目转发所有的奇数给一个消费者,偶数给另一个消费者。
channel.BasicQos(0, 1, false);
注意队列大小
如果所有的工作者都处于繁忙状态,你的队列有可能被填充满。你可能会观察队列的使用情况,然后增加工作者,或者使用别的什么策略。
完整的代码
NewTask.cs 类:
using System;
using RabbitMQ.Client;
using System.Text;
class NewTask
{
public static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using(var connection = factory.CreateConnection())
using(var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "task_queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
channel.BasicPublish(exchange: "",
routingKey: "task_queue",
basicProperties: properties,
body: body);
Console.WriteLine(" [x] Sent {0}", message);
}
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
private static string GetMessage(string[] args)
{
return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
}
}
Worker.cs类:
using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using System.Threading;
class Worker
{
public static void Main()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using(var connection = factory.CreateConnection())
using(var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "task_queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
Console.WriteLine(" [*] Waiting for messages.");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
int dots = message.Split('.').Length - 1;
Thread.Sleep(dots * 1000);
Console.WriteLine(" [x] Done");
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: "task_queue",
noAck: false,
consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
相关推荐
- MIRIX重塑AI记忆:超Gemini 410%,节省99.9%内存,APP同步上线
-
MIRIX,一个由UCSD和NYU团队主导的新系统,正在重新定义AI的记忆格局。在过去的十年里,我们见证了大型语言模型席卷全球,从写作助手到代码生成器,无所不能。然而,即使最强大的模型依...
- 硬盘坏了怎么把数据弄出来对比10种硬盘数据恢复软件
-
机械硬盘或固态硬盘损坏导致数据丢失时,应立即停止对硬盘的读写操作,并根据损坏类型选择逻辑层恢复工具或专业物理恢复服务。紧急处置措施立即停止通电使用:发现硬盘异响、无法识别或数据异常时,需立即断开连接,...
- 蓝宝石B850A WIFI主板新玩法:内存小参调节体验
-
蓝宝石前段时间发布了一款性价比极高的主板:NITRO氮动B850AWIFI主板。这款主板的售价只要1349元,相比普遍1500元以上的B850主板,确实极具竞争力。虽然价格实惠,蓝宝石NITR...
- 内存卡损坏读不出怎么修复?这5个数据恢复工具汇总,3秒挽回!
-
在数字化生活的浪潮中,内存卡凭借小巧便携与大容量存储的特性,成为相机、手机、行车记录仪等设备存储数据的得力助手,承载着无数珍贵回忆与重要文件。然而,当内存卡突然损坏无法读取,无论是误删、格式化、病毒入...
- 内存卡修复不再难,2025年必学的6款软件工具
-
内存卡出现问题时,通常是因为文件系统损坏、物理损坏或病毒感染。通过专业的修复工具,我们可以尝试恢复数据并修复内存卡。内存卡修复利器:万兴恢复专家万兴恢复专家是一款功能强大的数据恢复软件,支持多种设备和...
- 有5款内存卡修复工具汇总,内存卡数据轻松找回!
-
在如今的数字时代,内存卡作为不可或缺的存储介质,广泛应用于相机、手机、行车记录仪等各类设备中,承载着我们珍贵的照片、视频以及重要文件。然而,数据丢失的风险却如影随形,误删、格式化、病毒入侵、硬件故障等...
- 揭秘:如何通过多种方式精准查询内存条型号及规避风险?
-
以下是内存条型号查询的常用方法及注意事项,综合了物理查看、软件检测、编码解析等多种方式:一、物理标签查看法1.拆机查看标签打开电脑主机/笔记本后盖找到内存条,观察标签上的型号标识。例如内存标签通常标...
- 内存卡数据恢复5个工具汇总推荐,轻松找回珍贵记忆!
-
在这个数字化时代,内存卡作为我们存储珍贵照片、重要文件的常用载体,广泛应用于手机、相机、平板电脑等设备。但数据丢失的意外却常常不期而至,误删除、格式化、病毒攻击,甚至内存卡的物理损坏,都可能让辛苦保存...
- 电脑内存智能监控清理,优化性能的实用软件
-
软件介绍Memorycleaner是一款内存清理软件。功能很强,效果很不错。Memorycleaner会在内存用量超出80%时,自动执行“裁剪进程工作集”“清理系统缓存”以及“用全部可能的方法清理...
- TechPowerUp MemTest64:内存稳定性测试利器
-
TechPowerUpMemTest64:内存稳定性测试利器一、软件简介TechPowerUpMemTest64,由知名硬件信息工具GPU-Z的出品公司TechPowerUp发布,是一款专为64位...
- 微软推出AI恶意软件检测智能体Project Ire,精确度高达98%
-
IT之家8月6日消息,当地时间周二,微软宣布推出可自主分析恶意软件的AI检测系统原型——ProjectIre。该项目由微软研究院、Defender研究团队及Discovery&a...
- 农村老木匠常用的20种老工具,手艺人靠它养活一家人,你认识几种
-
生活中的手艺老匠人是非常受到尊敬和崇拜的,特别是在农村曾经的老匠人都是家里的“座上宾”。对于民间传统的手艺人,有一种说法就是传统的八大匠:木匠、泥匠、篾匠、铁匠、船匠、石匠、油匠和剃头匠。木匠的祖始爷...
- 恶意木马新变种伪装成聊天工具诱人点击
-
国家计算机病毒应急处理中心通过对互联网监测发现,近期出现一种恶意木马程序变种Trojan_FakeQQ.CTU。该变种通过伪装成即时聊天工具,诱使计算机用户点击运行。该变种运行后,将其自身复制到受感染...
- 学习网络安全 这些工具你知道吗?
-
工欲善其事必先利其器,在新入门网络安全的小伙伴而言。这些工具你必须要有所了解。本文我们简单说说这些网络安全工具吧!Web安全类web类工具主要是通过各种扫描工具,发现web站点存在的各种漏洞...
- 5分钟盗走你的隐私照片,这个全球性漏洞到底有多可怕?
-
这个时代,大家对电脑出现漏洞,可能已经习以为常。但如果机哥告诉大家,这个漏洞能够在5分钟内,破解并盗取你所有加密文件,而且还无法通过软件和补丁修复...这可就有点吓人啦。事情是酱婶的。来自荷兰埃因...
你 发表评论:
欢迎- 一周热门
- 最近发表
- 标签列表
-
- 如何绘制折线图 (52)
- javaabstract (48)
- 新浪微博头像 (53)
- grub4dos (66)
- s扫描器 (51)
- httpfile dll (48)
- ps实例教程 (55)
- taskmgr (51)
- s spline (61)
- vnc远程控制 (47)
- 数据丢失 (47)
- wbem (57)
- flac文件 (72)
- 网页制作基础教程 (53)
- 镜像文件刻录 (61)
- ug5 0软件免费下载 (78)
- debian下载 (53)
- ubuntu10 04 (60)
- web qq登录 (59)
- 笔记本变成无线路由 (52)
- flash player 11 4 (50)
- 右键菜单清理 (78)
- cuteftp 注册码 (57)
- ospf协议 (53)
- ms17 010 下载 (60)