如何保证RocketMQ消息不丢失?

1、Producer将消息发送到RocketMQ Broker的过程
首先在生产者端,消息的发送分为同步、异步和Oneway三种方式,分别是:

同步发送:Producer 向 broker 发送消息,阻塞等待 broker 响应发送结果。

异步发送:Producer 首先构建一个向 broker 发送消息的任务,把该任务提交给线程池,等执行完该任务时,回调用户自定义的回调函数,执行处理结果。

Oneway发送:只负责发送请求,不等待响应,Producer只负责把请求发出去,而不处理响应结果。

在同步发送消息的情况下,消息的发送会同步阻塞等待Broker返回结果,在Broker确认收到消息之后,生产者才会拿到SendResult。如果这个过程中发生异常,那么就说明消息发送可能失败了,就需要生产者进行重新发送消息。

但是Broker其实并不会立即把消息存储到磁盘上,而是先存储到内存中,内存存储成功之后,就返回给确认结果给生产者了。然后再通过异步刷盘的方式将内存中的数据存储到磁盘上。但是这个过程中,如果机器挂了,那么就可能会导致数据丢失。

如果想要保证消息不丢失,可以将消息保存机制修改为同步刷盘,这样,Broker会在同步请求中把数据保存在磁盘上,确保保存成功后再返回确认结果给生产者。

2、Broker接收到Producer发送的消息后,存储消息的过程
为了保证消息不丢失,RocketMQ肯定要通过集群方式进行部署,Broker 通常采用一主多从部署方式,并且采用主从同步的方式做数据复制。

当主Broker宕机时,从Broker会接管主Broker的工作,保证消息不丢失。同时,RocketMQ的Broker还可以配置多个实例,消息会在多个Broker之间进行冗余备份,从而保证数据的可靠性。

默认方式下,Broker在接收消息后,写入 master 成功,就可以返回确认响应给生产者了,接着消息将会异步复制到 slave 节点。但是如果这个过程中,Master的磁盘损坏了。那就会导致数据丢失了。

如果想要解决这个问题,可以配置同步刷盘的方式,即Master在将数据同步到Slave节点后,再返回给生产者确认结果。

消息刷盘方式

同步刷盘 在返回写成功状态时,消息已经被写入磁盘。具体流程是,消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘, 然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写成功的状态。

异步刷盘在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘动作,快速写入。

将默认的异步刷盘修改成同步刷盘
flushDiskType=SYNC_FLUSH

3、Consumer消费处理过程
在消费者端,需要确保在消息拉取并消费成功之后再给Broker返回ACK,就可以保证消息不丢失了,如果这个过程中Broker一直没收到ACK,那么就可以重试。所以,在消费者的代码中,一定要在业务逻辑的最后一步return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; 当然,也可以先把数据保存在数据库中,就返回,然后自己再慢慢处理。

欢迎使用66资源网
1. 本站所有资源来源于用户上传和网络,如有侵权请邮件联系站长!
2. 分享目的仅供大家学习和交流,您必须在下载后24小时内删除!
3. 不得使用于非法商业用途,不得违反国家法律。否则后果自负!
4. 本站提供的源码、模板、插件等等其他资源,都不包含技术服务请大家谅解!
5. 如有链接无法下载、失效或广告,请联系管理员处理!
6. 如遇到加密压缩包,请使用WINRAR解压,如遇到无法解压的请联系管理员!
7. 本站有不少源码未能详细测试(解密),不能分辨部分源码是病毒还是误报,所以没有进行任何修改,大家使用前请进行甄别!

66源码网 » 如何保证RocketMQ消息不丢失?

提供最优质的资源集合

立即查看 了解详情