生产者可靠性
生产者重连
由于网络波动,可能出现客户端连接 MQ 失败的情况。可以通过配置生产者的 application.yaml 来开启失败后的重连机制:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
spring:
rabbitmq:
# 设置 MQ 的连接超时时间
connection-timeout: 1s
template:
retry:
# 开启失败重试机制
enabled: true
# 失败后的初始等待时间
initial-interval: 1000ms
# 失败后的下次等待时间,下次等待时间 = initial-interval * multiplier
multiplier: 1
# 最大重试次数
max-attempts: 3
|
生产者确认
RabbitMQ 有 Publisher Confirm 和 Publisher Return 两种确认机制。开启确认机制后,在 MQ 成果收到消息后会返回确认信息给生产者。返回到结果一以下几种情况:
- 消息投递到了 MQ,但是路由失败(routing key 没有匹配,一般为自己业务问题)。此时会通过 Publisher Return 返回路由异常原因,然后返回 ACK,告知投递成功。
- 临时消息(非持久的消息)投递到了 MQ,并且入列成功,返回 ACK, 告知投递成功。
- 持久化消息投递到了MQ,并且入列完成持久化,返回 ACK,告知投递成功。
- 其他情况都会返回 NACK,告知投递失败。
- 在 publisher 的 application.yaml 中添加配置。
1
2
3
4
5
6
|
spring:
rabbitmq:
# 开启 publisher confirm 机制,并设置 confirm 类型
publisher-confirm-type: correlated
# 开启 publisher return 机制
publisher-returns: true
|
Publisher-confirm-type 有三种模式:
- none:关闭 confirm 机制。
- simple:同步阻塞等待 MQ 的回执消息。
- correlated:MQ 异步回调方式返回回执消息。
- 每个 RabbitTemplate 只能配置一个 ReturnCallback,因此需要在项目启动过程中配置:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
@Configuration
@Slf4j
public class CommonConfig implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
// 获取 RabbitTemplate
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
// 设置 ReturnCallback
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
log.info(
"收到消息的 Return Callback,message:{}," +
"replyCode:{},replyText:{}," +
"exchange:{},routingKey:{}",
returnedMessage.getMessage(),
returnedMessage.getReplyCode(),
returnedMessage.getReplyText(),
returnedMessage.getExchange(),
returnedMessage.getRoutingKey());
}
});
}
}
|
- 发送消息,指定消息 ID、消息 ConfirmCallback。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
@SpringBootTest
@Slf4j
class PublisherApplicationTests {
@Resource
private RabbitTemplate rabbitTemplate;
@Test
void testPublisherConfirm() {
// 1.创建 CorrelationData,并指定 id
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 2.给 Future 添加 ConfirmCallback
correlationData.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
@Override
public void onFailure(Throwable ex) {
// Future 发生异常时的处理逻辑
log.error("handle message ack fail", ex);
}
@Override
public void onSuccess(CorrelationData.Confirm result) {
// Future 接收到回执的处理逻辑,参数中的 result 就是回执类型
log.info("收到 confirm callback 回执");
if (result.isAck()) {
log.debug("发送消息成功,收到 ACK!");
} else {
log.error("发送消息失败,收到 NACK,原因:{}", result.getReason());
}
}
});
// 3.发送消息
rabbitTemplate.convertAndSend("direct.exchange", "red", "hello", correlationData);
}
}
|
MQ 的可靠性
在默认情况下,RabbitMQ 会将接收到的消息保存在内存中以降低消息收发的延迟。这样会导致两个问题:
- 一旦 MQ 宕机,内存中的消息会丢失。
- 内存空间有限,当消费者故障或处理过慢时,会导致消息积压,引发 MQ 阻塞。
数据持久化
RabbitMQ 实现数据持久化包括3个方面:
其中“交换机持久化”和“队列持久化” Spring 默认处理就是持久化。
“消息持久化”可以通过设置消息的属性来进行持久化。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
@SpringBootTest
@Slf4j
class PublisherApplicationTests {
@Resource
private RabbitTemplate rabbitTemplate;
@Test
void testDurability() {
Message message = MessageBuilder.
withBody("hello".getBytes(StandardCharsets.UTF_8)).
setDeliveryMode(MessageDeliveryMode.PERSISTENT).
build();
rabbitTemplate.convertAndSend("simple.queue", message);
}
}
|
LazyQueue
惰性队列的特征如下:
- 接收到消息后直接存入磁盘而非内存,内存中只保留最近的消息,默认2048条。
- 消费者要消费消息才会从磁盘中读取而非加载到内存。
- 支持数百万的消息存储。
目前 RabbitMQ 的所有队列都是 Lazy Queue 模式,无法更改。
消费者可靠性
消费者确认
为了确认消费者是否成功处理消息,RabbitMQ 提供了消费者确认机制(Cosumer Acknowledegement)。当消费者处理消息结束后,应该向 RabbitMQ 发送一个回执,告知 RabbitMQ 自己消息处理状态。回执有三种状态:
- ack:成功处理消息,RabbitMQ 从队列中删除消息。
- nack:消息处理失败,RabbitMQ 需要再次投递消息。
- reject:消息处理失败并拒绝消息,RabbitMQ 从队列中删除消息。
其中 SpringAMQP 已经实现了消息确认功能,并且可以通过修改配置文件选择 ACK 处理方式,有三种方式:
application.yaml:
1
2
3
4
5
|
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: auto
|
publisher 端代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
@SpringBootTest
@Slf4j
class PublisherApplicationTests {
@Resource
private RabbitTemplate rabbitTemplate;
@Test
void testConsumerAcknowledgement() {
// 交换机名称
String exchangeName = "fanout.exchange";
// 消息
String message = "hello,fanout exchange";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "consumerack", message);
}
}
|
consumer 端代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
@Component
@Slf4j
public class TestListener {
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "fanout.queue", durable = "true"),
exchange = @Exchange(value = "fanout.exchange", type = ExchangeTypes.FANOUT),
key = "consumerack"))
public void testConsumerAcknowledgement(Message message, Channel channel) throws IOException {
try {
// 处理消息
System.out.println("收到消息: " + new String(message.getBody()));
// 如果消息成功处理,手动确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
// 如果处理失败,拒绝消息,RabbitMQ 会重新投递
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
|
失败重试机制
当消费者出现异常后,消息会不断 requeue 到队列,再重新发给消费者,然后再次异常,再次 requeue,导致 MQ 的消息处理飙升,带来不必要的压力。
可以利用 Spring 的 retry 机制,在消费者出现异常时利用本地重试,而不是无限制的 requeue 到 MQ 队列。
applcation.yaml
1
2
3
4
5
6
7
8
9
10
|
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000ms # 初始的失败重试等待时长
multiplier: 1 # 下次失败重试的等待时长 = initial-interval * multiplier
max-attempts: 3 # 最大重试次数
stateless: true # true 无状态;false 有状态。如果业务中包含事务,这里改为 true
|
在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有 MessageRecocver 接口来处理,它包含三种不同的实现:
- RejectAndDontRequeueRecoverer:重试耗尽后,直接 reject,丢弃消息。默认就是这种方法。
- ImmediateRequeueMessageRecoverer:重试耗尽后,返回 nack,消息重新入队。
- RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机。
这里采用 RepublishMessageRecoverer 的方案。
(1)定义接收失败消息的交换机、队列以及绑定关系。
(2)定义 RepublishMessageRecoverer:
1
2
3
4
5
6
7
|
public class RabbitMQConfig {
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate) {
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
}
|
业务幂等性
业务 幂等性指同一个业务,执行一次和执行多次对业务状态对影响是一致的。
为了防止消息被多次消费,需要采取措施来实现业务幂等性。
方案一:唯一消息 id
给每一个消息设置一个唯一 id,利用 id 区分是否重复消费:
- 每一条消息都生成一个唯一的 id,与消息一起投递给消费者。
- 消费者接收到消息后处理自己的业务,业务处理成功后将消息 id 保存到数据库。
- 如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
@Configuration
public class RabbitMQMessageConfig {
@Bean
public MessageConverter messageConverter() {
// 定义消息转换器
Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
// 配置自动创建 id,用于识别不同消息,也可以在业务中基于 id 判断是否是重复消息
jjmc.setCreateMessageIds(true);
return jjmc;
}
}
|
方案二:业务判断
结合业务逻辑,基于业务本身做判断。举例:在支付订单后修改订单状态为以支付,应该在修改订单状态前先查询订单状态,判断状态是否是未支付。只有未支付订单才需要修改,其他状态不做处理: