RabbitMQ的死信队列,延时队列_天天速递
2023-06-28 12:07:50 博客园


(资料图片仅供参考)

死信队列简介

RabbitMQ 的死信队列(Dead Letter Queue)是一种特殊的队列,用于存储那些被标记为“死信”的消息。所谓死信即无法被正常消费和处理的消息,通常是由于一些特定的情况或条件导致的,比如过期、重试次数超过限制等。

普通消息成为死信的常见原因有

死信处理过程

消息被拒

class Program{static IConnectionFactory factory = new ConnectionFactory(){HostName = "192.168.100.2",UserName = "uat",Password = "137955aaA",VirtualHost = "uat_vhost"};/// /// 生产者/// public static void SendMessage(){//死信交换机string dlxexChange = "dlx.exchange";//死信队列string dlxQueueName = "dlx.queue";//消息交换机string exchange = "direct.exchange";//消息队列string queueName = "direct.queue";using (var connection = factory.CreateConnection()){using (var channel = connection.CreateModel()){//创建死信交换机channel.ExchangeDeclare(dlxexChange, type: ExchangeType.Direct, durable: true, autoDelete: false);//创建死信队列channel.QueueDeclare(dlxQueueName, durable: true, exclusive: false, autoDelete: false);//死信队列绑定死信交换机channel.QueueBind(dlxQueueName, dlxexChange, routingKey: dlxQueueName);// 创建消息交换机channel.ExchangeDeclare(exchange, type: ExchangeType.Direct, durable: true, autoDelete: false);//创建消息队列,并指定死信队列channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments:new Dictionary { { "x-dead-letter-exchange",dlxexChange}, //设置当前队列的DLX(死信交换机) { "x-dead-letter-routing-key",dlxQueueName}, //设置DLX的路由key,DLX会根据该值去找到死信消息存放的队列 });//消息队列绑定消息交换机channel.QueueBind(queueName, exchange, routingKey: queueName);string message = "hello rabbitmq message";var properties = channel.CreateBasicProperties();properties.Persistent = true;//发布消息channel.BasicPublish(exchange: exchange, routingKey: queueName, basicProperties: properties, body: Encoding.UTF8.GetBytes(message));Console.WriteLine($"向队列:{queueName}发送消息:{message}");}}}/// /// 消费者/// public static void Consumer(){//死信交换机string dlxexChange = "dlx.exchange";//死信队列string dlxQueueName = "dlx.queue";//消息交换机string exchange = "direct.exchange";//消息队列string queueName = "direct.queue";var connection = factory.CreateConnection();{//创建信道var channel = connection.CreateModel();{//创建死信交换机channel.ExchangeDeclare(dlxexChange, type: ExchangeType.Direct, durable: true, autoDelete: false);//创建死信队列channel.QueueDeclare(dlxQueueName, durable: true, exclusive: false, autoDelete: false);//死信队列绑定死信交换机channel.QueueBind(dlxQueueName, dlxexChange, routingKey: dlxQueueName);// 创建消息交换机channel.ExchangeDeclare(exchange, type: ExchangeType.Direct, durable: true, autoDelete: false);//创建消息队列,并指定死信队列channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments:new Dictionary { { "x-dead-letter-exchange",dlxexChange}, //设置当前队列的DLX { "x-dead-letter-routing-key",dlxQueueName}, //设置DLX的路由key,DLX会根据该值去找到死信消息存放的队列 });//消息队列绑定消息交换机channel.QueueBind(queueName, exchange, routingKey: queueName);var consumer = new EventingBasicConsumer(channel);channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: true);consumer.Received += (model, ea) =>{//处理业务var message = Encoding.UTF8.GetString(ea.Body.ToArray());Console.WriteLine($"队列{queueName}消费消息:{message},不做ack确认");//不ack(BasicNack),且不把消息放回队列(requeue:false)channel.BasicNack(ea.DeliveryTag, false, requeue: false);};channel.BasicConsume(queueName, autoAck: false, consumer);}}}static void Main(string[] args){SendMessage();Consumer();Console.ReadLine();}}

不ack(BasicNack),且不把消息放回队列(requeue:false),产生死信队列

延时队列(相同的过期时间)

延时队列通常与死信队列结合使用,以实现消息的延迟投递和处理。它的基本思想是给消息队列设置消息的过期时间(TTL),当消息在一定时间内未被消费者处理时,将其标记为过期并发送到死信队列。

class Program{static IConnectionFactory factory = new ConnectionFactory(){HostName = "192.168.100.2",UserName = "uat",Password = "137955aaA",VirtualHost = "uat_vhost"};/// /// 生产者/// public static void SendMessage(){//死信交换机string dlxexChange = "dlx.exchange";//死信队列string dlxQueueName = "dlx.queue";//消息交换机string exchange = "direct.exchange";//消息队列string queueName = "direct.queue.delay";using (var connection = factory.CreateConnection()){using (var channel = connection.CreateModel()){//创建死信交换机channel.ExchangeDeclare(dlxexChange, type: ExchangeType.Direct, durable: true, autoDelete: false);//创建死信队列channel.QueueDeclare(dlxQueueName, durable: true, exclusive: false, autoDelete: false);//死信队列绑定死信交换机channel.QueueBind(dlxQueueName, dlxexChange, routingKey: dlxQueueName);// 创建消息交换机channel.ExchangeDeclare(exchange, type: ExchangeType.Direct, durable: true, autoDelete: false);//创建消息队列,并指定死信队列channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments:new Dictionary { { "x-dead-letter-exchange",dlxexChange}, //设置当前队列的DLX(死信交换机) { "x-dead-letter-routing-key",dlxQueueName}, //设置DLX的路由key,DLX会根据该值去找到死信消息存放的队列 { "x-message-ttl",3000} });//消息队列绑定消息交换机channel.QueueBind(queueName, exchange, routingKey: queueName);string message = "hello rabbitmq message";var properties = channel.CreateBasicProperties();properties.Persistent = true;//发布消息channel.BasicPublish(exchange: exchange, routingKey: queueName, basicProperties: properties, body: Encoding.UTF8.GetBytes(message));Console.WriteLine($"{DateTime.Now}队列:{queueName}发送消息:{message}");}}}/// /// 消费延时队列/// public static void Consumer_delay(){//死信交换机string dlxexChange = "dlx.exchange";//死信队列string dlxQueueName = "dlx.queue";var connection = factory.CreateConnection();{//创建信道var channel = connection.CreateModel();{//创建死信交换机channel.ExchangeDeclare(dlxexChange, type: ExchangeType.Direct, durable: true, autoDelete: false);//创建死信队列channel.QueueDeclare(dlxQueueName, durable: true, exclusive: false, autoDelete: false);//死信队列绑定死信交换机channel.QueueBind(dlxQueueName, dlxexChange, routingKey: dlxQueueName);var consumer = new EventingBasicConsumer(channel);channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: true);consumer.Received += (model, ea) =>{//处理业务var message = Encoding.UTF8.GetString(ea.Body.ToArray());Console.WriteLine($"{DateTime.Now}队列{dlxQueueName}消费消息:{message}");channel.BasicAck(ea.DeliveryTag, false);};channel.BasicConsume(dlxQueueName, autoAck: false, consumer);}}}static void Main(string[] args){Task.Factory.StartNew(() =>{for (int i = 0; i < 10; i++){SendMessage();System.Threading.Thread.Sleep(1000);}});Consumer_delay();Console.ReadLine();}}

延时队列(不同的过期时间)

延时队列处理,不同的过期时间的消息,会堵塞后面的消息,导致过期了还没消费。可以通过rabbitmq_delayed_message_exchange 插件,解决这个问题。通过该插件,可以在 RabbitMQ 中实现延时消息投递和消费。

class Program{static IConnectionFactory factory = new ConnectionFactory(){HostName = "192.168.100.2",UserName = "uat",Password = "137955aaA",VirtualHost = "uat_vhost"};/// /// 生产者/// public static void SendMessage(){//延时交换机string delayExchange = "dlx.exchange.delayed";//延时队列string delayQueueName = "dlx.queue.delayed";using (var connection = factory.CreateConnection()){using (var channel = connection.CreateModel()){//创建延时交换机channel.ExchangeDeclare(delayExchange, type: "x-delayed-message", durable: true, autoDelete: false, new Dictionary {{ "x-delayed-type","direct"}});//创建死信队列channel.QueueDeclare(delayQueueName, durable: true, exclusive: false, autoDelete: false);//死信队列绑定死信交换机channel.QueueBind(delayQueueName, delayExchange, routingKey: delayQueueName);string message = "hello rabbitmq message 10s后处理";var properties = channel.CreateBasicProperties();properties.Persistent = true;properties.Headers = new Dictionary { { "x-delay", "10000" } };//发布消息channel.BasicPublish(exchange: delayExchange, routingKey: delayQueueName, basicProperties: properties, body: Encoding.UTF8.GetBytes(message));Console.WriteLine($"{DateTime.Now}队列:{delayQueueName}发送消息:{message}");message = "hello rabbitmq message 5s后处理";properties = channel.CreateBasicProperties();properties.Persistent = true;properties.Headers = new Dictionary { { "x-delay", "5000" } };channel.BasicPublish(exchange: delayExchange,routingKey: delayQueueName,basicProperties: properties,body: Encoding.UTF8.GetBytes(message));Console.WriteLine($"{DateTime.Now}队列:{delayQueueName}发送消息:{message}");}}}/// /// 消费延时队列/// public static void Consumer_delay(){//死信队列string dlxQueueName = "dlx.queue.delayed";var connection = factory.CreateConnection();{//创建信道var channel = connection.CreateModel();{var consumer = new EventingBasicConsumer(channel);channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: true);consumer.Received += (model, ea) =>{//处理业务var message = Encoding.UTF8.GetString(ea.Body.ToArray());Console.WriteLine($"{DateTime.Now}队列{dlxQueueName}消费消息:{message}");System.Threading.Thread.Sleep(20);channel.BasicAck(ea.DeliveryTag, false);};channel.BasicConsume(dlxQueueName, autoAck: false, consumer);}}}static void Main(string[] args){SendMessage();Consumer_delay();Console.ReadLine();}}

热门推荐

文章排行

  1. 2023-06-28RabbitMQ的死信队列,延时队列_天天速递
  2. 2023-06-28金融风险有哪些种类_金融风险有哪些
  3. 2023-06-28SHEIN否认向美递表,跨境电商领域IPO战况如何?|世界要闻
  4. 2023-06-28以“六个必须坚持”为指导 加快构建新发展格局 天天简讯
  5. 2023-06-28每日消息!长沙市新型农业经营主体引才可获补助
  6. 2023-06-28北海市气温1月到12月气温(北海气温)
  7. 2023-06-28热点评!6月27日基金净值:嘉实致安3个月定期债券最新净值1.1224,涨0.09%
  8. 2023-06-28环球观热点:凯投宏观:加拿大经济增长仍将令人失望 明年将降息至3%
  9. 2023-06-28大牛股爆出利空 子公司涉嫌传播虚假信息 被证监会立案!
  10. 2023-06-28世界快资讯:桐屿街道:定期给企业“体检”
  11. 2023-06-28东证期货增资5亿元-环球热点
  12. 2023-06-28小米手机电池虚电修复方法大全-小米手机电池虚电修复-热点评
  13. 2023-06-28小小动画7 小小动画
  14. 2023-06-27十首节奏感强的英文歌 三首超好听的英文歌
  15. 2023-06-27当前讯息:宁夏建材: 宁夏建材董事会关于本次交易是否符合《上市公司监管指引第9号——上市公司筹划和实施重大资产重组的监管要求》第四条规定的说明
  16. 2023-06-27【环球报资讯】当代置业:2022上半年期内亏损14.84亿元
  17. 2023-06-27美专栏作家刊文:美国全球地位被国内政治两极化削弱
  18. 2023-06-27保定城改双子户认定问题:网友咨询南大园乡分户条件的答复... 今日热搜
  19. 2023-06-27圆通速递拟斥资3-5亿回购股份 用于员工持股或股权激励计划 焦点快播
  20. 2023-06-27【全球新视野】OPPO Find X5 Pro天玑版怎么查看内存占用