(资料图片仅供参考)
死信队列简介
RabbitMQ 的死信队列(Dead Letter Queue)是一种特殊的队列,用于存储那些被标记为“死信”的消息。所谓死信即无法被正常消费和处理的消息,通常是由于一些特定的情况或条件导致的,比如过期、重试次数超过限制等。
普通消息成为死信的常见原因有
- 消息被拒(basic.reject or basic.nack)丢弃消息(requeue=false);
- 消息过期:当消息的生存周期超过了设置的过期时间,即消息在队列中等待被消费的时间超过了预定的时间。(可以通过设置 x-message-ttl 参数来指定消息的过期时间 或者 可以为每条消息单独设置过期时间。通过在消息的属性中设置 expiration 字段,以毫秒为单位指定消息的过期时间)
- 队列达到最大长度:当队列中的消息数量超过了设置的最大长度时,新到达的消息无法进入队列,而被视为死信。(创建队列时指定" x-max-length参数设置队列最大消息数量)
死信处理过程
消息被拒
class Program{static IConnectionFactory factory = new ConnectionFactory(){HostName = "192.168.100.2",UserName = "uat",Password = "137955aaA",VirtualHost = "uat_vhost"};/// /// 生产者/// public static void SendMessage(){//死信交换机string dlxexChange = "dlx.exchange";//死信队列string dlxQueueName = "dlx.queue";//消息交换机string exchange = "direct.exchange";//消息队列string queueName = "direct.queue";using (var connection = factory.CreateConnection()){using (var channel = connection.CreateModel()){//创建死信交换机channel.ExchangeDeclare(dlxexChange, type: ExchangeType.Direct, durable: true, autoDelete: false);//创建死信队列channel.QueueDeclare(dlxQueueName, durable: true, exclusive: false, autoDelete: false);//死信队列绑定死信交换机channel.QueueBind(dlxQueueName, dlxexChange, routingKey: dlxQueueName);// 创建消息交换机channel.ExchangeDeclare(exchange, type: ExchangeType.Direct, durable: true, autoDelete: false);//创建消息队列,并指定死信队列channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments:new Dictionary { { "x-dead-letter-exchange",dlxexChange}, //设置当前队列的DLX(死信交换机) { "x-dead-letter-routing-key",dlxQueueName}, //设置DLX的路由key,DLX会根据该值去找到死信消息存放的队列 });//消息队列绑定消息交换机channel.QueueBind(queueName, exchange, routingKey: queueName);string message = "hello rabbitmq message";var properties = channel.CreateBasicProperties();properties.Persistent = true;//发布消息channel.BasicPublish(exchange: exchange, routingKey: queueName, basicProperties: properties, body: Encoding.UTF8.GetBytes(message));Console.WriteLine($"向队列:{queueName}发送消息:{message}");}}}/// /// 消费者/// public static void Consumer(){//死信交换机string dlxexChange = "dlx.exchange";//死信队列string dlxQueueName = "dlx.queue";//消息交换机string exchange = "direct.exchange";//消息队列string queueName = "direct.queue";var connection = factory.CreateConnection();{//创建信道var channel = connection.CreateModel();{//创建死信交换机channel.ExchangeDeclare(dlxexChange, type: ExchangeType.Direct, durable: true, autoDelete: false);//创建死信队列channel.QueueDeclare(dlxQueueName, durable: true, exclusive: false, autoDelete: false);//死信队列绑定死信交换机channel.QueueBind(dlxQueueName, dlxexChange, routingKey: dlxQueueName);// 创建消息交换机channel.ExchangeDeclare(exchange, type: ExchangeType.Direct, durable: true, autoDelete: false);//创建消息队列,并指定死信队列channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments:new Dictionary { { "x-dead-letter-exchange",dlxexChange}, //设置当前队列的DLX { "x-dead-letter-routing-key",dlxQueueName}, //设置DLX的路由key,DLX会根据该值去找到死信消息存放的队列 });//消息队列绑定消息交换机channel.QueueBind(queueName, exchange, routingKey: queueName);var consumer = new EventingBasicConsumer(channel);channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: true);consumer.Received += (model, ea) =>{//处理业务var message = Encoding.UTF8.GetString(ea.Body.ToArray());Console.WriteLine($"队列{queueName}消费消息:{message},不做ack确认");//不ack(BasicNack),且不把消息放回队列(requeue:false)channel.BasicNack(ea.DeliveryTag, false, requeue: false);};channel.BasicConsume(queueName, autoAck: false, consumer);}}}static void Main(string[] args){SendMessage();Consumer();Console.ReadLine();}}
不ack(BasicNack),且不把消息放回队列(requeue:false),产生死信队列
延时队列(相同的过期时间)
延时队列通常与死信队列结合使用,以实现消息的延迟投递和处理。它的基本思想是给消息队列设置消息的过期时间(TTL),当消息在一定时间内未被消费者处理时,将其标记为过期并发送到死信队列。
class Program{static IConnectionFactory factory = new ConnectionFactory(){HostName = "192.168.100.2",UserName = "uat",Password = "137955aaA",VirtualHost = "uat_vhost"};/// /// 生产者/// public static void SendMessage(){//死信交换机string dlxexChange = "dlx.exchange";//死信队列string dlxQueueName = "dlx.queue";//消息交换机string exchange = "direct.exchange";//消息队列string queueName = "direct.queue.delay";using (var connection = factory.CreateConnection()){using (var channel = connection.CreateModel()){//创建死信交换机channel.ExchangeDeclare(dlxexChange, type: ExchangeType.Direct, durable: true, autoDelete: false);//创建死信队列channel.QueueDeclare(dlxQueueName, durable: true, exclusive: false, autoDelete: false);//死信队列绑定死信交换机channel.QueueBind(dlxQueueName, dlxexChange, routingKey: dlxQueueName);// 创建消息交换机channel.ExchangeDeclare(exchange, type: ExchangeType.Direct, durable: true, autoDelete: false);//创建消息队列,并指定死信队列channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments:new Dictionary { { "x-dead-letter-exchange",dlxexChange}, //设置当前队列的DLX(死信交换机) { "x-dead-letter-routing-key",dlxQueueName}, //设置DLX的路由key,DLX会根据该值去找到死信消息存放的队列 { "x-message-ttl",3000} });//消息队列绑定消息交换机channel.QueueBind(queueName, exchange, routingKey: queueName);string message = "hello rabbitmq message";var properties = channel.CreateBasicProperties();properties.Persistent = true;//发布消息channel.BasicPublish(exchange: exchange, routingKey: queueName, basicProperties: properties, body: Encoding.UTF8.GetBytes(message));Console.WriteLine($"{DateTime.Now}队列:{queueName}发送消息:{message}");}}}/// /// 消费延时队列/// public static void Consumer_delay(){//死信交换机string dlxexChange = "dlx.exchange";//死信队列string dlxQueueName = "dlx.queue";var connection = factory.CreateConnection();{//创建信道var channel = connection.CreateModel();{//创建死信交换机channel.ExchangeDeclare(dlxexChange, type: ExchangeType.Direct, durable: true, autoDelete: false);//创建死信队列channel.QueueDeclare(dlxQueueName, durable: true, exclusive: false, autoDelete: false);//死信队列绑定死信交换机channel.QueueBind(dlxQueueName, dlxexChange, routingKey: dlxQueueName);var consumer = new EventingBasicConsumer(channel);channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: true);consumer.Received += (model, ea) =>{//处理业务var message = Encoding.UTF8.GetString(ea.Body.ToArray());Console.WriteLine($"{DateTime.Now}队列{dlxQueueName}消费消息:{message}");channel.BasicAck(ea.DeliveryTag, false);};channel.BasicConsume(dlxQueueName, autoAck: false, consumer);}}}static void Main(string[] args){Task.Factory.StartNew(() =>{for (int i = 0; i < 10; i++){SendMessage();System.Threading.Thread.Sleep(1000);}});Consumer_delay();Console.ReadLine();}}
延时队列(不同的过期时间)
延时队列处理,不同的过期时间的消息,会堵塞后面的消息,导致过期了还没消费。可以通过rabbitmq_delayed_message_exchange 插件,解决这个问题。通过该插件,可以在 RabbitMQ 中实现延时消息投递和消费。
class Program{static IConnectionFactory factory = new ConnectionFactory(){HostName = "192.168.100.2",UserName = "uat",Password = "137955aaA",VirtualHost = "uat_vhost"};/// /// 生产者/// public static void SendMessage(){//延时交换机string delayExchange = "dlx.exchange.delayed";//延时队列string delayQueueName = "dlx.queue.delayed";using (var connection = factory.CreateConnection()){using (var channel = connection.CreateModel()){//创建延时交换机channel.ExchangeDeclare(delayExchange, type: "x-delayed-message", durable: true, autoDelete: false, new Dictionary {{ "x-delayed-type","direct"}});//创建死信队列channel.QueueDeclare(delayQueueName, durable: true, exclusive: false, autoDelete: false);//死信队列绑定死信交换机channel.QueueBind(delayQueueName, delayExchange, routingKey: delayQueueName);string message = "hello rabbitmq message 10s后处理";var properties = channel.CreateBasicProperties();properties.Persistent = true;properties.Headers = new Dictionary { { "x-delay", "10000" } };//发布消息channel.BasicPublish(exchange: delayExchange, routingKey: delayQueueName, basicProperties: properties, body: Encoding.UTF8.GetBytes(message));Console.WriteLine($"{DateTime.Now}队列:{delayQueueName}发送消息:{message}");message = "hello rabbitmq message 5s后处理";properties = channel.CreateBasicProperties();properties.Persistent = true;properties.Headers = new Dictionary { { "x-delay", "5000" } };channel.BasicPublish(exchange: delayExchange,routingKey: delayQueueName,basicProperties: properties,body: Encoding.UTF8.GetBytes(message));Console.WriteLine($"{DateTime.Now}队列:{delayQueueName}发送消息:{message}");}}}/// /// 消费延时队列/// public static void Consumer_delay(){//死信队列string dlxQueueName = "dlx.queue.delayed";var connection = factory.CreateConnection();{//创建信道var channel = connection.CreateModel();{var consumer = new EventingBasicConsumer(channel);channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: true);consumer.Received += (model, ea) =>{//处理业务var message = Encoding.UTF8.GetString(ea.Body.ToArray());Console.WriteLine($"{DateTime.Now}队列{dlxQueueName}消费消息:{message}");System.Threading.Thread.Sleep(20);channel.BasicAck(ea.DeliveryTag, false);};channel.BasicConsume(dlxQueueName, autoAck: false, consumer);}}}static void Main(string[] args){SendMessage();Consumer_delay();Console.ReadLine();}}