1 RocketMQ分布式事务机制
1.1 概念介绍
事务消息
:MQ 提供类似 X/Open XA 的分布事务功能,通过 MQ 事务消息能达到分布式事务的最终一致。半消息
:暂不能投递的消息,发送方已经将消息成功发送到了 MQ 服务端,但是服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半消息。消息回查
:由于网络闪断0、生产者应用重启等原因,导致某条事务消息的二次确认丢失,MQ 服务端通过扫描发现某条消息长期处于“半消息”时,需要主动向消息生产者询问该消息的最终状态(Commit 或是 Rollback),该过程即消息回查。
1.2 适用场景
帮助用户实现类似 X/Open XA
的分布事务功能,通过 MQ 事务消息能达到分布式事务的最终一致
。
1.3 MQ事务消息交互流程
事务消息发送
对应步骤1、2、3、4,事务消息回查
对应步骤5、6、7
- 发送方向 MQ 服务端发送消息。
- MQ Server 将消息持久化成功之后,向发送方 ACK 确认消息已经发送成功,此时消息为半消息。
- 发送方开始执行本地事务逻辑。
- 发送方根据本地事务执行结果向 MQ Server 提交二次确认(Commit 或是 Rollback),MQ Server 收到 Commit 状态则将半消息标记为可投递,订阅方最终将收到该消息;MQ Server 收到 Rollback 状态则删除半消息,订阅方将不会接受该消息。
- 在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达 MQ Server,经过固定时间后 MQ Server 将对该消息发起消息回查。
- 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
- 发送方根据检查得到的本地事务的最终状态再次提交二次确认,MQ Server 仍按照步骤4对半消息进行操作。
1.4 常量介绍
LocalTransactionState.UNKNOW
:暂时无法判断状态,期待固定时间以后 MQ Server 向发送方进行消息回查LocalTransactionState.ROLLBACK_MESSAGE
:回滚事务,消息将被丢弃不允许消费LocalTransactionState.COMMIT_MESSAGE
:提交事务,允许订阅方消费该消息
2 使用方法
2.1 导入Maven依赖
导入下方的maven依赖,已上传至公司内网
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>spring-boot-starter-rocketmq</artifactId>
<version>1.0.1-SNAPSHOT</version>
</dependency>
2.2 创建TransactionListener监听类
- 创建一个rocketmq的事务监听类,实现
TransactionListener
接口 - 实现
executeLocalTransaction
方法和checkLocalTransaction
方法 executeLocalTransaction
方法在我们向rocketmq提交消息时会执行一次,用来执行事务操作checkLocalTransaction
方法是在我们执行executeLocalTransaction
返回LocalTransactionState.UNKNOW
之后用来定期进行回查的,- 注意:千万要加上
@Service
和@RocketMQTranscationListener
注解,否则会导致无法扫描到该监听类,导致程序错误
@Service
@RocketMQTranscationListener
public class TransactionListenerImpl implements TransactionListener {
/**
* 执行本地事务
* @param msg
* @param arg
* @return
*/
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 这里使用时间进行模拟
long time = System.currentTimeMillis();
LocalTransactionState state;
if (time % 3 == 0) {
// 过一会再问
state = LocalTransactionState.UNKNOW;
} else if (time % 3 == 1) {
// 回滚
state = LocalTransactionState.ROLLBACK_MESSAGE;
} else {
// 提交
state = LocalTransactionState.COMMIT_MESSAGE;
}
System.out.println(LocalDateTime.now() + " " + msg.getTransactionId() + ":executeLocalTransaction : " + state);
return state;
}
/**
* 回查本地事务
* @param msg
* @return
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
long time = System.currentTimeMillis();
LocalTransactionState state;
if (time % 3 == 0) {
state = LocalTransactionState.UNKNOW;
} else if (time % 3 == 1) {
state = LocalTransactionState.ROLLBACK_MESSAGE;
} else {
state = LocalTransactionState.COMMIT_MESSAGE;
}
System.out.println(LocalDateTime.now() + " " + msg.getTransactionId() + ":checkLocalTransaction : " + state);
return state;
}
}
2.3 LogVO类
@Data
@AllArgsConstructor
//一定要提供无参数构造,否则报错
@NoArgsConstructor
/**
* LogVO
*
* @author naah
* @date 2018-09-10 上午10:46
* @desc
*/
public class LogVO implements Serializable {
private Long logId;
private String account;
private String realName;
private LocalDateTime time;
private String ipAddress;
private LogKind kind;
private String context;
private LocalDateTime createTime;
private LocalDateTime updateTime;
public enum LogKind {
//添加类型
add("add"),
//修改类型
update("update"),
//删除类型
delete("delete");
private final String kind;
private LogKind(String kind) {
this.kind = kind;
}
public String getKind() {
return kind;
}
}
}
2.4 Application启动类
我在rocketMQTemplate
中提供了一个名为sendInTransaction
的重载了三次的事务发送方法,在你使用的时候可以自己观察一下参数进行选择
@SpringBootApplication
public class RocketmqProducterApplication implements CommandLineRunner {
@Resource
private RocketMQTemplate rocketMQTemplate;
public static void main(String[] args) {
SpringApplication.run(RocketmqProducterApplication.class, args);
}
@Override
public void run(String... strings) throws Exception {
this.stringProducter();
}
public void stringProducter() throws InterruptedException {
Random random = new Random();
for (int i=0;i<10;i++){
LogVO log = generateLog(random, i);
String json= JSON.toJSONString(log);
Thread.sleep(2000);
//使用该方法发送事务消息
rocketMQTemplate.sendInTransaction("log_str_demo",json,TransactionListenerImpl.class);
}
}
private LogVO generateLog(Random random, int i) {
return new LogVO(0L, "nayan" + i + random.nextInt(1000), "那焱", LocalDateTime.now(), "127.0.0.1", random.nextInt() % 2 == 0 ? LogVO.LogKind.add : LogVO.LogKind.delete, "", LocalDateTime.now(), null);
}
}
2.5 properties配置
也可以使用Yaml
spring.rocketmq.nameServer=127.0.0.1:9876
spring.rocketmq.producer.group=my-group
server.port=9000