这篇文章主要为大家展示了“如何使用RocketMQ的事务消息来解决一致性问题”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“如何使用RocketMQ的事务消息来解决一致性问题”这篇文章吧。
创新互联建站是网站建设专家,致力于互联网品牌建设与网络营销,专业领域包括成都网站设计、成都网站建设、电商网站制作开发、微信小程序定制开发、微信营销、系统平台开发,与其他网站设计及系统开发公司不同,我们的整合解决方案结合了恒基网络品牌建设经验和互联网整合营销的理念,并将策略和执行紧密结合,且不断评估并优化我们的方案,为客户提供全方位的互联网品牌整合方案!
在微服务架构中,我们常常使用异步化的手段来提升系统的 吞吐量 和 解耦 上下游,而构建异步架构最常用的手段就是使用 消息队列(MQ)
,那异步架构怎样才能实现数据一致性呢?
可以看到在 业务处理 方面来说 RocketMQ
优于其他对手,而且原生支持 事务消息
PS:业务系统用的是其他 MQ
产品但是又需要 事务消息 怎么办?学习原理自己开发实现!
例如下图的场景:生成订单记录 -> MQ -> 增加积分
我们是应该先 创建订单记录,还是先 发送MQ消息 呢?
先发送MQ消息:这个明显是不行的,因为如果消息发送成功,而订单创建失败的话是没办法把消息收回来的
先创建订单记录:如果订单创建成功后MQ消息发送失败 抛出异常,因为两个操作都在本地事务中所以订单数据是可以 回滚 的
上面的 方式二 看似没问题,但是 网络是不可靠的!如果 MQ
的响应因为网络原因没有收到,所以在面对不确定的结果只好进行回滚;但是 MQ
端又确实是收到了这条消息的,只是回给客户端的 响应丢失 了!
所以 事务消息
就是用来保证 本地事务 与 MQ消息发送 的原子性!
主要的逻辑分为两个流程:
事务消息发送及提交:
发送 half消息
MQ服务端
响应消息写入结果
根据发送结果执行 本地事务
(如果写入失败,此时half消息对业务 不可见,本地逻辑不执行)
根据本地事务状态执行 Commit
或者 Rollback
(Commit操作生成消息索引,消息对消费者 可见)
回查流程:
对于长时间没有 Commit/Rollback
的事务消息(pending
状态的消息),从服务端发起一次 回查
Producer
收到回查消息,检查回查消息对应的 本地事务状态
根据本地事务状态,重新 Commit
或者 Rollback
逻辑时序图
从上面的原理可以发现 事务消息
仅仅只是保证本地事务和MQ消息发送形成整体的 原子性
,而投递到MQ服务器后,并无法保证消费者一定能消费成功!
如果 消费端消费失败 后的处理方式,建议是记录异常信息然后 人工处理,并不建议回滚上游服务的数据(因为两者是 解耦 的,而且 复杂度 太高)
我们可以利用 MQ
的两个特性 重试
和 死信队列
来协助消费端处理:
消费失败后进行一定次数的 重试
重试后也失败的话该消息丢进 死信队列
里
另外起一个线程监听消费 死信队列
里的消息,记录日志并且预警!
因为有 重试
所以消费者需要实现幂等性
下面就用刚刚提到的场景:生成订单记录 -> MQ -> 增加积分;来简单讲一下 Spring Cloud
中应该怎么做,详细代码请 下载demo 查看。
PS:怎样安装部署RocketMQ可以参考《Apache RocketMQ 消息队列部署与可视化界面安装》
使用 spring-cloud-stream
框架来访问 RocketMQ
Spring Cloud Stream 是一个构建消息驱动的框架,通过抽象的定义实现应用与MQ消息队列之间的解耦,目前支持
RabbitMQ
、kafka
和RocketMQ
消息生产者需要添加 transactional: true
开启 事务消息
因为开启了
事务消息
所以这里发送的是half消息
对于消费端是不可见
的
使用 @RocketMQTransactionListener
注解监听 半消息,并实现 RocketMQLocalTransactionListener
接口,该接口有两个方法
executeLocalTransaction:用于提交本地事务
checkLocalTransaction:用于事务回查
如果提交事务消息失败,需等待约1分钟左右 事务回查 方法才会被调用
注意:因为有 重试 这里如果是真实的业务需要自行实现
幂等性
监听并消费死信队列中的消息,用于记录错误日志,并且预警通知运维人员等
demo中提供了3个接口分别测试不同的场景:
事务成功
http://localhost:11002/success
流程如下:
订单创建 成功
提交事务消息 成功
消费消息增加积分 成功
订单创建成功但提交事务消息失败
http://localhost:11002/produceError
流程如下:
订单创建 成功
提交事务消息 失败
事务回查(等待1分钟左右) 成功
提交事务消息 成功
消费消息增加积分 成功
消费消息失败
http://localhost:11002/consumeError
流程如下:
订单创建 成功
提交事务消息 成功
消费消息增加积分 失败
重试消费消息 失败
进入死信队列 成功
消费死信队列的消息 成功
记录日志并发出预警 成功
以上是“如何使用RocketMQ的事务消息来解决一致性问题”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注创新互联行业资讯频道!