题记:之前总结过一篇关于分布式事务的文章,今天闲来无事再整理一下,发布出来,也是加深一下自己的记忆吧。感觉对分布式事务的理解和使用还是不太到位,等之后有空看下《RocketMQ技术内幕》书籍吧

基于RocketMQ分布式事务

前言

正式讲述之前,先来理解几个概念吧

半消息(HALF MESSAGE):这是一个Rocket里的概念,指暂时不能被consumer消费。Producer已经把消息发送到Broker端,但是消息的状态被标记为不可投递,处于这种状态下的消息被称为半消息。事实上,该状态下的消息被投放在一个特殊主题下(RMQ_SYS_TRANS_HALF_TOPIC)。当Producer端对它进行二次确认后,Consumer端才可以消费到;如果Producer端对它进行rollback,那么消息被删除,永远不会被消费到。

看到半消息的概念感觉有一些似曾相识,其实Kafka里也有类似的概念,Kafka也是采用Leader-Follower的主从复制机制,但是客户端只能消费已经被写入到所有消息Follower副本的消息即HW水平范围。通过图也许更容易理解,示意图如下:

WX20240121-110013@2x.png

引申一下,可能大家都会有疑问,对于Kafka为什么只有Follower同步过的消息才可见呢?

因为如果没有足够多的副本复制消息,被认为是不安全的。当Leader副本发生崩溃,重新选取新的Leader副本,原来未同步完成的消息会丢失,如果允许读取这些消息,就会破坏一致性原则。不同的消费者,读取的消息是不相同的,会造成消费者的困惑。

那么对于半消息也一样,其实也是一种不稳定的状态。因此,必须要经过Producer的二次确认才能被消费,代表Broker端接收到了半消息。

事务状态回查:可能由于网络原因,应用问题等,导致Producer一直没有对这个半消息进行确认。那么这个时候Broker会定时扫描这些半消息,主动找到Producer端查询该消息的状态。

RocketMQ是实现消息最终一致性的分布式事务解决方案,如果系统不需要保证强一致性,那么可以采用RocketMQ的方案。RocketMQ事务消息的实现原理就是基于两阶段式提交和事务状态回查,来决定消息最终是提交还是回滚。

简单来讲,RocketMQ采用两阶段提交(2PC)的思想来实现事务消息,当事务消息失败或者超时,同时采用补偿的方式处理这个问题。这两个阶段分别为正常事务消息的发送与提交以及事务消息的补偿

简要实现

下面我们以订单服务、积分服务为例,分析分布式事务的整体流程如下:

WX20240121-214539@2x.png

具体代码可以看自己engineering工程的代码实现,下面简要的讲解一下代码概要:

注:代码只注重具体的使用过程,数据模型和代码实现都是简化。

订单服务

在订单服务中,用户下单时,请求后端的下单接口,并保存相关的数据到本地数据库。

订单表(简表)
1
2
3
4
5
6
CREATE TABLE `t_order` (
`id` bigint(20) UNSIGNED NOT NULL DEFAULT '0' COMMENT '主键ID',
`order_id` bigint(20) NOT NULL DEFAULT '0' COMMENT '订单id',
`user_id` bigint(20) NOT NULL DEFAULT '0' COMMENT '用户id',
`amount` bigint(20) NOT NULL DEFAULT '0' COMMENT '订单金额',
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='订单主表'
事务日志表(简表)
1
2
3
4
5
6
CREATE TABLE `transaction_log` (
`id` varchar(64) NOT NULL DEFAULT '0' COMMENT '事务ID',
`business` varchar(32) NOT NULL DEFAULT '0' COMMENT '业务标识',
`foreign_key` varchar(32) NOT NULL DEFAULT '0' COMMENT '对应业务表中的主键',
PRIMARY KEY (`id`)
) ENGINE = InnoDB DEFAULT CHARSET = utf8 COMMENT '事务日志表';

事务日志表主要用来做事务状态回查,当向订单表提交订单数据时,事务日志表也同时会插入一条数据,它们共处一个本地事务中。通过事务ID查询该表,如果返回记录,则证明本地事务已提交;如果未返回记录,则本地事务可能是未知状态或者是回滚状态。

积分表

1
2
3
4
5
6
7
8
CREATE TABLE `t_points` (
`id` bigint(16) NOT NULL COMMENT '主键',
`user_id` bigint(16) NOT NULL COMMENT '用户id',
`order_no` bigint(16) NOT NULL COMMENT '订单编号',
`points` int(4) NOT NULL COMMENT '积分',
`remarks` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL COMMENT '备注',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT '积分表';
TransactionMQProducer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
/**
* @author yangyong <yongyang@didiglobal.com>
* Created on 2023-07-11
*
* 主要是创建事务消息的发送者,在这里我们主要关注的是OrderTransactionListener,
* 它负责本地事务的执行和事务状态的回查
*/
@Component
public class TransactionProducer {

private String producerGroup = "order_trans_group";
private TransactionMQProducer producer;

//用于执行本地事务和事务状态回查的监听器
@Resource
private OrderTransactionListener orderTransactionListener;

//用于执行任务的线程池
private ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(50));

@PostConstruct
public void init() {
producer = new TransactionMQProducer(producerGroup);
producer.setNamesrvAddr("127.0.0.1:9876");
producer.setSendMsgTimeout(Integer.MAX_VALUE);
producer.setExecutorService(threadPoolExecutor);
producer.setTransactionListener(orderTransactionListener);
this.start();
}

//事务消息发送
public TransactionSendResult send(String data, String topic) throws MQClientException {
Message message = new Message(topic, data.getBytes());
return producer.sendMessageInTransaction(message, null);
}

private void start() {
try {
this.producer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
}
}
OrderTransactionListener
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
/**
* @author yangyong <yongyang@didiglobal.com>
* Created on 2023-07-11
*
* 在通过producer.SendMessageInTransaction发送事务消息后,如果发送成功,就会调用这里的exexuteLocalTransaction
* 来执行本地事务。在这里它就会完成订单数据和事务日志的插入
*/
@Slf4j
@Component
public class OrderTransactionListener implements TransactionListener {

@Resource
private OrderService orderService;

@Resource
private TransactionLogService transactionLogService;

@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
log.info("开始执行本地事务。。。");
LocalTransactionState state;
try {
String body = new String(message.getBody());
OrderDto orderDto = JSONObject.parseObject(body, OrderDto.class);

//createOrder方法包含对事务日志的写入,在同一个本地事务中
orderService.createOrder(orderDto, message.getTransactionId());
state = LocalTransactionState.COMMIT_MESSAGE;
log.info("本地事务已提交, transactionId={}", message.getTransactionId());
} catch (Exception e) {
log.error("执行本地事务失败!", e);
state = LocalTransactionState.ROLLBACK_MESSAGE;
}
return state;
}

@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
log.info("开始回查本地事务。{}", messageExt.getTransactionId());
LocalTransactionState state;
String transactionId = messageExt.getTransactionId();
if (transactionLogService.get(transactionId) > 0) {
state = LocalTransactionState.COMMIT_MESSAGE;
} else {
state = LocalTransactionState.UNKNOW;
}
log.info("结束本地事务状态回查,transactionId={}|state={}", messageExt.getTransactionId(), state);
return state;
}
}

在通过 producer.sendMessageInTransaction发送事务消息后,如果消息发送成功,RocketMQ就会调用到这里的executeLocalTransaction方法,来执行本地事务。在这里,它会完成订单数据和事务日志的插入。

该方法返回值 LocalTransactionState 代表本地事务状态,它是一个枚举类。

1
2
3
4
5
6
7
8
public enum LocalTransactionState {
//提交事务消息,消费者可以看到此消息
COMMIT_MESSAGE,
//回滚事务消息,消费者不会看到此消息
ROLLBACK_MESSAGE,
//事务未知状态,需要调用事务状态回查,确定此消息是提交还是回滚
UNKNOW;
}

其中,checkLocalTranstion 方法用于状态回查,这个主要是在TransactionListener未对消息进行确认进行,这里主要是通过消息体中的transactionId 查询本地事务表,如果查询到记录,证明本地事务已成功写入需要进行提交。如果没有查询到就返回未知状态。

那么这里可能有个问题是Broker什么时候才触发状态回查?

当Broker不能确定本地事务执行状态时,需要依靠回查确定本地事务状态确定消息提交还是回滚。Broker在启动的时候,会创建并启动事务回查服务TransactionalMessageCheckService线程,TransactionalMessageCheckService服务会每分钟进行回查,直到达到事务回查最大检测数,如果超过最大检测数,仍然回查不到事务状态则进行回滚。

当然,事务回查的频率和最大次数,我们都可以配置。在 Broker端,可以通过这样来配置它:

1
2
brokerConfig.setTransactionCheckInterval(15000); //回查频率15秒一次
brokerConfig.setTransactionCheckMax(15); //最大检测次数为15
业务实现类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/**
* @author yangyong <yongyang@didiglobal.com>
* Created on 2023-07-11
*
* 订单业务服务中,主要有两个方法,一个用于向RocketMQ发送事务消息,一个用于真正的业务数据库落库
*/
@Service
@Slf4j
public class OrderServiceImpl implements OrderService {

@Resource
private OrderMapper orderMapper;

@Resource
private TransactionLogMapper transactionLogMapper;

@Resource
private TransactionProducer transactionProducer;

@Transactional
@Override
public void createOrder(OrderDto orderDto, String transactionId) {

//创建订单
Order order = new Order();
BeanUtils.copyProperties(orderDto, order);
orderMapper.createOrder(order);

//写入事务日志
TransactionLog transactionLog = new TransactionLog();
transactionLog.setId(transactionId);
transactionLog.setBusiness("order");
transactionLog.setForeignKey(String.valueOf(order.getId()));
transactionLogMapper.insert(transactionLog);

log.info("创建订单完成。{}", order.getId());

}

//前端调用,只用于向RocketMQ发送事务消息
@Override
public void createOrder(OrderDto orderDto) throws MQClientException {
orderDto.setId(new Random().nextLong());
orderDto.setOrderNo(new Random().nextLong());
transactionProducer.send(JSON.toJSONString(orderDto), "order");
}
}
接口调用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* @author yangyong <yongyang@didiglobal.com>
* Created on 2023-07-12
*/
@Slf4j
@RestController
public class OrderController {
@Resource
private OrderService orderService;

public void createOrder(@RequestBody OrderDto orderDto) throws MQClientException {
log.info("接收订单数据:{}", orderDto);
orderService.createOrder(orderDto);
}

}
总结

目前已经对订单服务的执行流程进行了介绍,那么简单总结一下:

  • 前端调用创建订单接口createOrder,发送事务消息。如果发送失败,导致报错,则将异常返回,此时不会涉及到任何数据安全
  • 如果事务消息发送成功,但在执行本地事务时发生异常,那么订单数据和事务日志都不会被保存,因为它们在一个本地事务中
  • 如果执行完本地事务,但未能及时的返回本地事务状态或者返回了未知状态。那么,会由Broker定时回查事务状态,然后根据事务日志表,就可以判断订单是否已完成,并写入到数据库。

通过上面的事务日志表和事务回查机制,已经保证了订单服务和事务消息的一致性,也就是可以保证订单数据和MQ的一致性。那么还有积分服务,如何正确的消费订单数据并完成相应的业务操作

积分服务

积分服务主要是消费订单数据然后根据订单数据,给用户增加积分

积分记录表
1
2
3
4
5
6
7
8
CREATE TABLE `t_points` (
`id` bigint(16) NOT NULL COMMENT '主键',
`user_id` bigint(16) NOT NULL COMMENT '用户id',
`order_no` bigint(16) NOT NULL COMMENT '订单编号',
`points` int(4) NOT NULL COMMENT '积分',
`remarks` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL COMMENT '备注',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT '积分表';
消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* @author yangyong <yongyang@didiglobal.com>
* Created on 2023-07-14
*/
@Component
public class Consumer {

@Resource
private OrderListener orderListener;

private String consumerGroup = "consumer-group";

private DefaultMQPushConsumer consumer;

@PostConstruct
public void init() throws MQClientException {
consumer = new DefaultMQPushConsumer(consumerGroup);
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("order","*");
consumer.registerMessageListener(orderListener);
consumer.start();
}
}

消费者监听器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* @author yangyong <yongyang@didiglobal.com>
* Created on 2023-07-14
*
* 监听到消息之后,调用业务服务类处理即可。处理完成则返回CONSUMER_SUCCESS已提交,失败则返回RECOSUMER_LATER
* 之后进行重试
*/
@Slf4j
@Component
public class OrderListener implements MessageListenerConcurrently {

@Resource
private PointsService pointsService;

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
log.info("消费者线程监听到消息。");
try{
for (MessageExt message:list) {
log.info("开始处理订单数据,准备增加积分....");
OrderDto order = JSONObject.parseObject(message.getBody(), OrderDto.class);
pointsService.increasePoints(order);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}catch (Exception e){
log.error("处理消费者数据发生异常。",e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
}

业务实现类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* @author yangyong <yongyang@didiglobal.com>
* Created on 2023-07-14
*/
@Slf4j
@Service
public class PointsServiceImpl implements PointsService {

@Resource
private PointsMapper pointsMapper;

@Override
public void increasePoints(OrderDto order) {
//入库之前先实现幂等,通过redis实现
if (true) {
log.info("积分添加完成,订单已处理。orderId={}", order.getOrderNo());
return;
}

Points points = new Points();
points.setId(new Random().nextLong());
points.setUserId(order.getId());
points.setOrderNo(order.getUserId());
Double amount = order.getAmount();
points.setPoints(amount.intValue()*10);
points.setRemarks("商品消费共【"+order.getAmount()+"】元,获得积分"+points.getPoints());
pointsMapper.insert(points);
log.info("已为订单号码{}增加积分。",points.getOrderNo());
}
}
幂等性消费

实现幂等性的消费方式有很多,之前在陌陌工作处理幂等问题使用Redis较多,当然也可以使用数据库,在处理前先进行查询数据是否已经存在。不管什么方式,总的思路就是在执行业务前,必须先查询该消息是否被处理过。

消费异常

我们知道,当消费者处理失败后会返回 RECONSUME_LATER ,让消息来重试,默认最多重试16次。如果真的由于特殊原因,消息一直不能被正确处理,那怎么办 ?我们考虑两种方式来解决这个问题。

第一,在代码中设置消息重试次数,如果达到指定次数,就发邮件或者短信通知业务方人工介入处理

比如可以对订单监听器代码进行改造:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
/**
* @author yangyong <yongyang@didiglobal.com>
* Created on 2023-07-14
*
* 监听到消息之后,调用业务服务类处理即可。处理完成则返回CONSUMER_SUCCESS已提交,失败则返回RECOSUMER_LATER
* 之后进行重试
*/
@Slf4j
@Component
public class OrderListener implements MessageListenerConcurrently {

@Resource
private PointsService pointsService;

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
log.info("消费者线程监听到消息。");
try{
for (MessageExt message:list) {
log.info("开始处理订单数据,准备增加积分....");
if (!processor(message)){
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}catch (Exception e){
log.error("处理消费者数据发生异常。",e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
}


/**
* 消息处理,第3次处理失败后,发送邮件通知人工介入
* @param message
* @return
*/
private boolean processor(MessageExt message){
try {
logger.info("消息处理....{}",body);
OrderDto order = JSONObject.parseObject(message.getBody(), OrderDto.class);
pointsService.increasePoints(order);
return true;
}catch (Exception e){
if(message.getReconsumeTimes()>=3){
logger.error("消息重试已达最大次数,将通知业务人员排查问题。{}",message.getMsgId());
sendMail(message);
return true;
}
return false;
}
}
}

第二,等待消息重试最大次数后,进入死信队列

死信队列的主题名称是 %DLQ% + 消费者组名称,比如在订单数据中,我们设置了消费者组名:

1
String consumerGroup = "order-consumer-group";

那么这个消费者,对应的死信队列主题名称就是%DLQ%order-consumer-group

最后就可以通过程序代码监听这个主题,来通知人工介入处理或者直接在控制台查看处理了。通过幂等性消费和对死信消息的处理,基本上就能保证消息一定会被处理。

DDMQ分布式系统事务一致性解决方案

DDMQ是滴滴基于RocketMQ构建的消息队列产品,目前已经进行开源。实现思路,基本和RocketMQ相似,但是感觉没有RocketMQ完善。

DDMQ是一款分布式消息中间件,主要应用于OLTP场景下的分布式解耦、故障隔离、流量消峰等。支持实时消息、延迟消息以及事务消息。除MQ基础特性外,还具备Gooy自定义消息预处理、多种消费模式(SDK拉取/HTTP推送)、消息粒度的延迟、事务消息、Scheme检验、SDK多语言支持等特性。各业务线独立部署,彻底的物理隔离,并且多机房部署,保证单机房故障后,业务方有备用机房切换,满足各业务对可靠性、可用性、低延迟需求。各队列统一的接入方式,现已广泛应用于专快、顺风车、地图等业务,日消息流水千亿级别。

说到分布式事务,就会谈到那个经典的Bob向Smith账户转账的问题:2个账号,分别处于2个不同的DB,或者说2个不同的子系统里面,Bob的账户要扣钱,Smith的账户要加钱,如何保证原子性?

一般的思路都是通过消息中间件来实现“最终一致性”:A系统扣钱,然后发一条消息给中间件,B系统接收此消息,进行加钱。但这里面有个问题:是先update DB,后发送消息呢?还是先发送消息,后update DB呢?假设先update DB成功,发送消息网络失败,重发又失败,怎么办?

假设先发送消息成功,update DB失败。消息已经发出去了,又不能撤回,怎么办?所以,只要发送消息和update DB这2个操作不是原子的,无论谁先谁后,都是有问题的。那这个问题怎么解决呢?

  • 方案一业务方自己实现

假设消息中间件没有提供“事务消息”功能,比如你用的是kfka。那如何解决这个问题呢?

解决方案如下:

  1. Produceri端准备1张消息表,把update DB和linsert message?这2个操作,放在一个DB事务里面
  2. 准备一个后台程序,不停地把消息表中的message传送给消息中间件。失败了,不断重试重传。允许消息重复,但消息不会丢
  3. Consumeri端准备一个判重表。处理过的消息,记在判重表里面。实现业务的幂等。

通过上面3步,我们基本就解决了这里update db和发送网络消息这2个操作的原子性问题。

但这个方案的一个缺点就是:需要设计DB消息表,同时还需要一个后台任务,不断扫描本地消息。导致消息的处理和业务逻辑耦合,额外增加业务方的负担。

  • 方案二DDMQ事务消息

目前各大知名的电商平台和互联网公司,几乎都是采用事务消息这种方案来实现“最终一致性”的。这种方式适合的业务场景广泛,而且比较可靠。不过这种方式技术实现的难度比较大。目前主流的开源MQ(ActiveMQ、RabbitMQ、Kafka)均未实现对事务消息的支持,所以需二次开发或者新造轮子。

DDMQ通过RocketMQ及自主研发的延迟队列模块来实现事务消息。

概念介绍

事务消息:DDMQ提供类似X/Open XA的分布事务功能,通过DDMQ事务消息能够达到分布式事务的最终一致。

消息回查:由于网络闪断、生产者应用重启等原因,导致某条业务消息的发送丢失,DDMQ按照一定的间隔不断向消息生产者发送监控消息询问该事务消息的状态,该过程即消息回查。

适用场景

MQ事务消息适用于如下场景:

帮助用户实现类似X/Open XA的分布事务功能,通过DDMQ事务消息能达到分布式事务的最终一致。

交互流程基本和RocketMQ一致:
WX20240128-213401@2x.png

其中:

1.发送方给DDMQ发送监控消息。

2.DDMQ接收监控消息之后,向发送方确认监控消息已经发送成功。

3.发送方开始执行本地事务逻辑。

4.发送方根据本地事务执行结果给DDMQ发送业务消息并在成功之后取消监控消息或者直接取消监控

消息。如果成功发送业务消息,订阅方最终将收到该消息。

5.在断网或者是应用重启的特殊情况下,上述步骤4没有成功发送业务消息或者没有成功取消监控消

息,经过一定时间间隔后DDMQ将对该消息发起消息回查。

6.发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。

7.发送方根据检查得到的本地事务的最终状态再次执行步骤4。

事务消息发送对应步骤1、2、3、4,事务消息回查对应步骤5、6、7。

参考文档

[1] https://zhuanlan.zhihu.com/p/115553176

[2] https://segmentfault.com/a/1190000038619419