
RabbitMQ 中消息传递模型的核心思想是生产者从不直接向队列发送任何消息。实际上,生产者通常根本不知道消息是否会被传递到任何队列。
相反,生产者只能向交换器发送消息。交换器的工作是一件非常简单的事情。一方面它接收来自生产者的消息,另一方面它将它们推送到队列中。交换器必须确切地知道如何处理它收到的消息。是否应该将其附加到特定队列?它应该附加到许多队列中吗?或者它应该被丢弃。其规则由“交换类型”定义。
有几种可用的交换类型:direct、topic、headers 和fanout。
无名交换器
在本教程的前面部分中,我们对交换一无所知,但仍然能够将消息发送到队列。这是可能的,因为我们使用的是默认交换,我们通过空字符串 (" ") 来识别它。

以下代码:
channel.basicPublish( "" , "hello" , null , message.getBytes());
第一个参数是交换器的名称。空字符串表示默认或无名交换器。消息被路由到具有routingKey指定的名称的队列(如果存在)。
临时队列
在 Java 客户端中,当我们不向queueDeclare()提供参数时, 我们会创建一个具有生成名称的非持久、独占、自动删除队列:
String queueName = channel.queueDeclare().getQueue();
此时queueName包含一个随机队列名称。例如,它可能看起来像amq.gen-JzTY20BRgKO-HjmUJj0wLg。以下是声明的临时队列:

这种队列,在接收fanout类型的exchange的信息时,非常有用。
绑定binding

图中X(exchange)到queue之间边线,即为binding,即绑定。
如果没有声明绑定关系,则默认exchange会绑定到当声明的队列:

如果没有队列绑定到交换器,消息将丢失,但这对我们来说没关系;如果没有消费者在监听,我们可以安全地丢弃消息。
消息的过期时间
官网地址:https://www.rabbitmq.com/ttl.html
通过在参数中,通过在命令行设置message-ttl(豪秒)或在代码中设置x-message-ttl可以设置消息的过期时间。
以下声明放到这个队列里面的消息,将会在60秒后过期。

通过命令行,了可以声明设置一个队列的过期时间:

或使用工具类,指定过期时间

示例:
以下示例发送一个消息,并设置过期时间为5s,超过5s此信息没有被消费将会被MQ丢弃。
//声明消息的过期时间为5秒 AMQP . BasicProperties prop = new AMQP .BasicProperties().builder().expiration( "" +( 1000 * 5 )).build(); channel .basicPublish( "" , queueName , prop , "HelloWorld" .getBytes( StandardCharsets . UTF_8 ));
死信队列
https://www.rabbitmq.com/dlx.html
什么时候消息被认为是死的?
消息在到达 RabbitMQ 后变得无法传递的三种确定情况:
- 消费者使用basic.reject或 basic.nack否定确认消息,并将requeue参数设置为false。
- 消息由于每条消息的 TTL而过期;或者
- 消息被丢弃,因为它的队列超过了长度限制

为什么使用死信交换
附加到死信交换的队列收集丢弃的消息,接下来的步骤由您决定。换句话说 - 由您决定如何处理死信队列中的消息。如果实施得当,信息几乎不会丢失。
当您知道您有可能已被取消但仍需要处理的消息时,请附加死信交换。当您不能丢失具有过期 TTL 的消息或队列可能达到其容量时。
设置 RabbitMQ 死信交换
死信交换与其他交换没有什么不同:
只需正常指定交换并将其声明为队列的备份:

只需要给正常的队列指定一个死信交换机及路由key即可以实现过期数据自动被转存到死信队列。
代码示例1-过期的消息自动进入死信队列:
使用生产者来声明正常队列到死信队列的数据流:
String queueName = "HelloQueue" ; try ( Connection connection = ConnUtils . newConnection ()){ Channel channel = connection .createChannel(); //声明一个死信交换机用于接收过期的消息 channel .exchangeDeclare( "dead_exchanger" , "direct" , true ); //声明一个死信队列,并绑定死信交换机,即接收列信交换机传递的数据 channel .queueDeclare( "dead_queue" , true , false , false , null ); //绑定死信队列+死信交换机,并指定一个routingKey channel .queueBind( "dead_queue" , "dead_exchanger" , "dead_routingKey" ); //声明参数,指定消息出现死信后的去处 Map < String , Object > argus = Map . of ( "x-dead-letter-exchange" , "dead_exchanger" , "x-dead-letter-routing-key" , "dead_routingKey" ); //声明正常队列 channel .queueDeclare( queueName , true , false , false , argus ); //声明消息的过期时间为5秒 AMQP . BasicProperties prop = new AMQP .BasicProperties().builder().expiration( "" +( 1000 * 5 )).build(); channel .basicPublish( "" , queueName , prop , "HelloWorld" .getBytes( StandardCharsets . UTF_8 )); log .info( "送信息完成" );}
截图:
注意绑定关系:

测试,将数据发送到队列,等待,5秒,因为没有被消费,所以,消息自动进入列信队列,进入死信队列的消息,也可以被绑定到这个死信队列的消费者正常消费(这样就形成了延迟队列)。

代码示例2-消费者否的消息自动进入死信队列:
基于上述代码,我们现在取消过期时间的设置,现在通过消费者来否定确认消息。
注意 :消费者这边声明死信队列的过程,必须与生产者那边完全相同。然后消费者通过Nack()否定确认消息,消费即可以进入死信队列。
完整代码:
String queueName = "HelloQueue" ; Connection con = ConnUtils . newConnection (); Channel channel = con .createChannel(); //声明一个死信交换机用于接收过期的消息 channel .exchangeDeclare( "dead_exchanger" , "direct" , true ); //声明一个死信队列,并绑定死信交换机,即接收列信交换机传递的数据 channel .queueDeclare( "dead_queue" , true , false , false , null ); //绑定死信队列+死信交换机,并指定一个routingKey channel .queueBind( "dead_queue" , "dead_exchanger" , "dead_routingKey" ); //声明参数,指定消息出现死信后的去处 Map < String , Object > argus = Map . of ( "x-dead-letter-exchange" , "dead_exchanger" , "x-dead-letter-routing-key" , "dead_routingKey" ); //声明正常队列 channel .queueDeclare( queueName , true , false , false , argus ); //接收到数据以后的回调函数 DeliverCallback callback = (consumerTag, message) -> { byte [] body = message.getBody(); String str = new String( body ); long tag = message.getEnvelope().getDeliveryTag(); log .info( "接收到信息:{},tagStr:{},tag:{}" , str , consumerTag, tag ); //否定确认,且不回原队列 channel .basicNack(message.getEnvelope().getDeliveryTag(), false , false );}; //取消处理信息的回调 CancelCallback cancelCallback = consumerTag -> { }; //消费监听 channel .basicConsume( queueName , false , callback , cancelCallback );
截图:

代码示例3-消息达到上限:
参考地址:
https://www.cloudamqp.com/blog/part2-rabbitmq-best-practice-for-high-performance.html。
https://www.rabbitmq.com/maxlength.html
RabbitMQ 中的队列是单线程的,一个队列最多可以处理大约 5 万条消息。如果您有多个队列和消费者,并且如果您有与底层节点上的核心一样多的队列,您将在多核系统上实现更好的吞吐量。
RabbitMQ 管理界面收集和计算集群中每个队列的指标。如果您有成千上万的活动队列和消费者,这可能会减慢服务器的速度。如果队列太多,CPU 和 RAM 使用率也可能会受到负面影响。
默认最大队列长度限制行为
当设置了最大队列长度或大小并达到最大值时,RabbitMQ 的默认行为是从队列的前面丢弃或 死信消息(即队列中最旧的消息)。要修改此行为,请使用下面描述的溢出设置。
当队列的容量已达到上界,再次添加消息时,则最先放到队列中的消息默认会被丢弃。
通过以下设置,可以修改溢出时的策略:
Overflow behaviour can be set by supplying the x-overflow queue declaration argument with a string value. Possible values are drop-head (default), reject-publish or reject-publish-dlx.
drop-head : 删除最旧的消息,这是默认值。
reject-publish: 不再接收新的消息。
reject-publish-dlx:与reject-publish不同,会将后发布的消息,放到死信队列。
完整代码:
Channel channel = connection .createChannel(); //声明一个死信交换机用于接收过期的消息 channel .exchangeDeclare( "dead_exchanger" , "direct" , true ); //声明一个死信队列,并绑定死信交换机,即接收列信交换机传递的数据 channel .queueDeclare( "dead_queue" , true , false , false , null ); //绑定死信队列+死信交换机,并指定一个routingKey channel .queueBind( "dead_queue" , "dead_exchanger" , "dead_routingKey" ); //声明参数,指定消息出现死信后的去处 Map < String , Object > argus = Map . of ( "x-dead-letter-exchange" , "dead_exchanger" , "x-dead-letter-routing-key" , "dead_routingKey" , "x-max-length" , 2 , "x-overflow" , "reject-publish-dlx" ); //声明正常队列,设置容量为2条消息,默认 channel .queueDeclare( queueName , true , false , false , argus ); for ( int i = 1 ; i <= 3 ; i ++) { //发布3条 String msg = "Hello" + i ; channel .basicPublish( "" , queueName , null , msg .getBytes( StandardCharsets . UTF_8 ));} log .info( "送信息完成" );
截图:
在声明队列时,设置x-max-length,x-overflow。

查看UI,可以看到HelloQueue保留了两条信息,而最后发布的消息,则直接进入了死信队列:

重试死信
您可以通过将原始交换附加到死信队列来创建使用死信队列的重新传递机制。但是,如果不加以检查,可能会创建无限循环的重新传递消息,这可能会阻塞死信队列。

创建一个单独的死信队列来存储消息,然后再将它们推送到交换器或将它们路由回原始队列。
要进一步改进系统,请在消息正文中包含一个递增的属性,指示接收到消息的次数。这需要在单独的消费者中处理死信,但最终允许您丢弃消息或将它们推送到存储中。