- 工信部备案号 滇ICP备05000110号-1
- 滇公安备案 滇53010302000111
- 增值电信业务经营许可证 B1.B2-20181647、滇B1.B2-20190004
- 云南互联网协会理事单位
- 安全联盟认证网站身份V标记
- 域名注册服务机构许可:滇D3-20230001
- 代理域名注册服务机构:新网数码
代码分析事务消息
分布式事务模型图如下:
我们先看事务消息客户端的实现。即上图的MQClient.
我们先看代码的整体封装。
下面代码段做了两件事:
1.本地数据库写入 2.事务消息客户端发送消息。
@Transactional表示开启事务。在注释下,开启一个新的Order。用OrderDao将数据写入本地数据库。然后调用transactionMsgClient.sendMsg将消息发出去(这是静态方法调用,transactionMsgClient是一个类,sendMsg是它的方法)。这样,本地数据库写入和发消息这两件事,就是个原子事务,也就是说,两件事要么一起成功,要么一起失败。
上面代码用到了MybatisTransactionMsgClient。MyBatis是一个Java持久化框架,它通过XML描述符或注解把对象与存储过程或SQL语句关联起来,映射成数据库内对应的记录。
上面提到过,transactionMsgClient.sendMsg是个类,这个类继承了TransactionMsgClient。下面代码中,transactionMsgClient.sendMsg调用了其父类的TransactionsendClient的sendMsg方法,写事务消息表,并且发送消息。
我们接下来看sendMsg这个方法到底做了什么:
1、消息内容落数据库 2、发送消息
下面代码会先做一个判断,在if字段里:con.getAutoCommit。也就是说,只有当没有开启自动commit的时候(有自动提交就破坏了事务的原子性),把信息写在数据库表里,然后构造一个messages,发消息。而发消息的方法是将消息放到消息队列中。
在事务消息设计中,后台发送消息队列设计,如下图所示:
参照上图,我们可以看到后台发送消息队列有两个:
SendMsg队列的消息消费很快,基本上放进去很快就会被消费掉。这样重试才能有效,否则一直重试,没有意义。
下面代码段的的作用:是发送消息时,从队列里中取消息,看是否到期,将到期的消息取出来:
后台优先队列的维护。
最后,事务消息表的设计;
代码分析事务消息
我们知道,RocketMQ支持延时消息。我们先看一下延时消息的应用场景。
延时需求是在当前时间后的某一时间点触发指定的业务逻辑或操作。
例如微信发消息,过一段时间没发送成功的话,提示重新发送(如下图的小红点提示)。
订单状态流转:如延时支付。京东上超过24小时的订单会被自动取消。
实际上,我们在分布式事务的终极模型中,也用到了延时消息。
RocketMQ支持18个级别的延时等级:messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 6h 12h。也就是说,消息延时发送有这18个级别。如果业务的延时消息需求与这18个级别不匹配,就需要自行基于RocketMQ进行二次开发。
接下来,我们看一下18个延时的实现原理。
RocketMQ的18个延时级别,每个级别对应一个Queue,根据Level参数,将消息放到对应的18个队列中的等级。每个队列都对应了到时出队。例如1s队列就是1s出队,2小时队就是2小时出队。消息出队以后,当成正常消息投递。然后被投递到了消费队列,被消费者消费掉。
售前咨询
售后咨询
备案咨询
二维码
TOP