EZLippi-浮生志

RabbitMQ的一些实践总结

MQ使用场景

  1. 流程异步化
  2. 业务解耦
  3. 流量削峰填谷

    RabbitMQ核心概念

先来看一眼rabbitMQ的架构图:

  1. broker:接受客户端连接,实现AMQP协议。

  2. connection:和具体broker建立网络连接。

  3. channel:逻辑概念,几乎所有操作都在channel中进行,channel是消息读写的通道,一个connection可以建立多个channel。

  4. message:应用程序和broker之间传递的数据,由properties和body组成。properties可以对消息进行修饰,比如消息的TTL,correlationId等高级特性;body是消息实体内容。

  5. Virtual host:虚拟主机,用于逻辑隔离,最上层消息的路由。一个Virtual host可以若干个Exchange和Queue,同一个Virtual host不能有同名的Exchange或Queue。

  6. Exchange:交换器,接受消息,根据路由键转发消息到绑定的队列上。

  7. binding:Exchange和Queue之间的绑定关系,告诉exchange把消息路由到哪个队列

  8. routing key:exchange结合routing key来确定如何路由一条消息。

  9. Queue:消息队列,用来存放消息的队列。

消息确认机制

结合上图中消息发送的链路, 我们可以看出一条消息发出去后可能在哪些环节出问题:

  1. Producer发出后由于网络故障Broker没有收到
  2. Producer发出后Broker宕机导致消息丢失
  3. Broker发送给Consumer后由于网络故障Consumer没有收到
  4. Broker发送给Consumer后Consumer宕机导致没有处理

RabbitMQ提供了发布者确认和消费者确认机制来解决这些问题,发布者确认是指Broker收到Producer的消息后,会给Producer发送一条确认消息,如果消息的durable属性为true, Broker会等消息成功持久化到磁盘后再发送publisher-confirm,发布者确认是异步的,不会影响发布的性能;消费者确认是指消费者收到消息后需要发送一条确认消息给
broker(可以开启自动确认模式,但可能丢消息),broker如果没有收到consumer的确认,会把消息重发.

生产者增加确认机制非常简单,channel开启confirm模式,然后增加监听, 如果使用的spring-rabbit框架, 把connection-factory的publisher-confirms配置为true,
并在RabbitTemplate配置一个confirm-callback的Listener:

1
2
3
4
5
6
7
8
9
10
11
  <rabbit:connection-factory id="rabbitConnectFactory"  publisher-confirms="true"/>
<bean id="confirmCallback" class="com.xxx.MessageConfirmCallback"/>
<rabbit:template id="rabbitTemplate" connection-factory="rabbitConnectFactory" confirm-callback="confirmCallback"/>

public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {

@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {

}
}

有一些场景, 我们需要确保消息被正确的路由到队列:

  1. 如果消息发送到交换器后找不到可路由的队列,这时候消息会被丢弃, publisher-confirm返回的ack为true,相当于消息石沉大海了.

这种情况需要发送的时候把mandatory设置true,并且设置一个return-callback,当RabbitMQ找不到可路由的队列时返回publish-return告诉你无法路由的原因,可以记录日志或者重试.
如果用的spring-rabbit框架,把rabbit:template的mandatory属性设置为true,并且配置return-callback,只配置return-callback没有配置mandatory时callback不会生效

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
  <bean id="returnCallback" class="com.xxx.MessageReturnCallback">
<property name="rabbitTemplate" ref="rabbitTemplate"/>
</bean>

<rabbit:template id="rabbitTemplate" mandatory="true"
connection-factory="rabbitConnectFactory" confirm-callback="confirmCallback"
return-callback="returnCallback" message-converter="messageConverter"/>

public class ReturnCallbackService implements RabbitTemplate.ReturnCallback {

@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("returnedMessage ===> replyCode={} ,replyText={} ,exchange={} ,routingKey={}", replyCode, replyText, exchange, routingKey);
}
}
  1. 消息被拒绝时进入死信队列

设想有这种场景, Consumer侧接收消息的Listener抛出了未捕获的异常, spring-rabbit的默认处理策略是返回Basic.Reject给broker,并且设置requeue=true,
消息会被重新入队列再次被消费,如果这种异常是无法恢复的(比如出现了NPE),消息会一直在Broker和消费者之间无限投递,
经过测试分析发现,当消息重新投递到消息队列时,这条消息不会回到队列尾部,仍是在队列头部,消费者会立刻消费这条消息,业务处理再抛出异常,消息再重新入队,如此反复进行

这可能不是我们期望的逻辑,比较好的方式是把消息放入死信队列,
后面修复程序后再对死信队列中的消息进行消费.

消息变为死信的几种情况:

  1. 消息被拒绝(basic.reject/basic.nack)同时requeue=false(不重回队列)

  2. TTL过期

  3. 队列达到最大长度

死信队列如何使用,以spring-rabbit为例:
rabbit:listener-container 把requeue-rejected属性设置为false,表示消息拒绝时不再重新入队列, exchange配置x-dead-letter-exchange(在后台或者代码中给
死信交换器绑定好死信队列),

1
2
3
4
5
6
7
8
<rabbit:queue id="rabbit.queue.test" name="rabbit.queue.test">
<rabbit:queue-arguments>
<entry key="x-dead-letter-exchange" value="dlx_exchange"/>
</rabbit:queue-arguments>
</rabbit:queue>

<rabbit:listener-container acknowledge="auto" requeue-rejected="false">
</rabbit:listener-container>

配置好之后如果消费者出现未捕获的异常,或者消费者手动抛出AmqpRejectAndDontRequeueException,消息会进入到死信队列

  1. 消息进死信队列前重试几次

考虑这样一种场景, 收到rabbitMQ消息后调用RPC接口请求数据超时了,可能只是网络抖动或者服务端突然响应慢了一下导致的, 这种情况下可以在消费端结合spring-retry
框架进行重试, 需要特别注意的是spring-retry重试是在当前接收线程处理的,重试的次数和总时长不应太长,否则如果重试一直失败会严重影响性能,spring-rabbit
和spring-retry结合起来也很方便,配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
  <bean id="messageRecoverer" class="com.xxxx.rabbit.MessageRecover" />

<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
<property name="backOffPolicy">
<bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
<property name="initialInterval" value="500" />
<property name="maxInterval" value="3000" />
</bean>
</property>
<property name="retryPolicy">
<bean class="org.springframework.retry.policy.SimpleRetryPolicy">
<property name="maxAttempts" value="3" />
</bean>
</property>
</bean>

<bean id="retryInterceptorFactoryBean" class="org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterceptorFactoryBean">
<property name="messageRecoverer" ref="messageRecoverer" />
<property name="retryOperations" ref="retryTemplate" />
</bean>


<bean id="someListenerContainer" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
<property name="prefetchCount" value="10" />
<property name="defaultRequeueRejected" value="false" />
<property name="acknowledgeMode" value="AUTO" />
<property name="queues" ref="rabbit.queue.test" />
<property name="messageListener" ref="commonMessageListener" />
<property name="adviceChain">
<array>
<ref bean="retryInterceptorFactoryBean" />
</array>
</property>
</bean>

//重试多少次都失败后执行的Recover逻辑, 把消息持久化到磁盘或数据库,报警,或者抛出AmqpRejectAndDontRequeueException进死信队列
public class MessageRecover implements MessageRecoverer {

@Override
public void recover(Message message, Throwable throwable) {
//消息持久化逻辑
//报警
logger.error("xxx");
throw new AmqpRejectAndDontRequeueException(throwable);
}

高可用

为了防止丢消息,交换器,队列和消息都应该设置成持久化,具体配置就是把durable设置为true.

我们线上的RabbitMQ采用的是集群模式,集群模式下所有节点会存储交换器和绑定关系以及队列的名字, 但是队列只存储在某一个节点,如果某个节点出现故障, 该节点上的
队列将会不可用,公司6月4号出现过一次交换机故障导致RabbitMQ集群脑裂的场景, 就算网络恢复了脑裂的状态还是一直保持,需要重启集群的一个节点才能解决,由于我们用的
是虚拟IP去连的集群的一个节点,如果你请求的队列是在另一个节点上RabbitMQ会给你返回404 queue not exist,spring-rabbit遇到这个错误会重启Consumer线程
,然后重新尝试创建队列,由于队列的元数据已有了, 在脑裂期间是无法再创建该队列的,spring默认只重试3次,每次间隔5s,如果15s内集群未恢复,你的rabbit消费者就死掉了,
需要重启业务进程解决,针对这个问题可以把rabbit:listener-container的declaration-retries设置的大一些, 这样集群恢复的时候consumer会自动恢复无需重启.

为了应对操作系统重启,掉电导致未及时fsync刷盘的场景,可以采用镜像队列, 镜像队列可以在rabbitmq管理后台配置策略, 从而解决队列单点故障问题.
配置了镜像队列后, 集群会自动选举一个rabbit节点为Master节点,再往镜像队列发送数据时,Rabbit会把数据发给所有节点中的队列,从而保证高可用.

镜像队列的架构如下:

监控

Rabbit管理后台上的监控是基于connection,exchange,queue, 很多时候发现某个队列积压了,但并不知道队列是谁创建的,只能在群里@所有人, 其实RabbitMQ是
提供了关联应用的能力,创建队列时可以加上队列参数,如下所示:

1
2
3
4
5
<rabbit:queue id="rabbit.queue.test" name="rabbit.queue.test">
<rabbit:queue-arguments>
<entry key="application" value="xxx-application"/>
</rabbit:queue-arguments>
</rabbit:queue>

RabbitMQ会为每个consumer分配一个ConsumerTag,客户端可以自己指定,如果没指定,RabbitMQ会自动创建一个随机的,有时候想知道某个队列是谁在消费,
spring-rabbit提供了consumer-tag-strategy这个属性来配置一个生成ConsumerTag的Bean:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<bean id="appNameConsumerTagStrategy" class="com.ximalaya.flash.rabbit.AppNameConsumerTagStrategy"/>

<rabbit:listener-container concurrency="10" prefetch="10" declaration-retries="2147483647" acknowledge="auto"
requeue-rejected="false" consumer-tag-strategy="appNameConsumerTagStrategy"
connection-factory="rabbitConnectFactory" message-converter="messageConverter"
>
<rabbit:listener ref="commonMessageListener"
queue-names="rabbit.queue.test"/>
</rabbit:listener-container>

public class AppNameConsumerTagStrategy implements ConsumerTagStrategy {
private AtomicInteger cnt = new AtomicInteger(0);
@Override
public String createConsumerTag(String queue) {
return "xxx-consumer-" + cnt.getAndIncrement();
}
}

最后可以在rabbit后台看到队列的创建者和消费者

rabbitmq在资源不足时会给所有Publish Connection发送Connection.Blocked指令, 这时候所有的Producer都无法发送信息, 在极端情况下会导致大部分线程都
阻塞在Rabbit的发送, AMQP的connection提供了BlockListener和UnBlockListener,业务方可以实现对应的监听器, 在连接被Block住后把消息写入到磁盘或者数据库,等
恢复后再从磁盘或数据库中恢复重新发送,避免进程卡死的情况.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
cachingConnectionFactory.addConnectionListener(new ConnectionListener() {
@Override
public void onCreate(Connection connection) {
if (connection instanceof ConnectionProxy) {
Connection targetConnection = ((ConnectionProxy) connection).getTargetConnection();
if (targetConnection instanceof SimpleConnection) {
try {
SimpleConnection simpleConnection = (SimpleConnection) targetConnection;
Field field = SimpleConnection.class.getDeclaredField("delegate");
field.setAccessible(true);
com.rabbitmq.client.Connection originConnection =
(com.rabbitmq.client.Connection) field.get(simpleConnection);
originConnection.addBlockedListener(new BlockedListener() {
@Override
public void handleBlocked(String reason) throws IOException {
//do something
}

@Override
public void handleUnblocked() throws IOException {
//do something
}
});
} catch (Exception e) {
}
}
}
}

@Override
public void onClose(Connection connection) {
}
});
🐶 您的支持将鼓励我继续创作 🐶