使用rocketmq-starter发送事务消息

Posted by Naah on Thursday, Oct 11,2018 11:43:47

1 RocketMQ分布式事务机制


1.1 概念介绍

  • 事务消息:MQ 提供类似 X/Open XA 的分布事务功能,通过 MQ 事务消息能达到分布式事务的最终一致。
  • 半消息:暂不能投递的消息,发送方已经将消息成功发送到了 MQ 服务端,但是服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半消息。
  • 消息回查:由于网络闪断0、生产者应用重启等原因,导致某条事务消息的二次确认丢失,MQ 服务端通过扫描发现某条消息长期处于“半消息”时,需要主动向消息生产者询问该消息的最终状态(Commit 或是 Rollback),该过程即消息回查。

    1.2 适用场景

    帮助用户实现类似 X/Open XA 的分布事务功能,通过 MQ 事务消息能达到分布式事务的最终一致

1.3 MQ事务消息交互流程

事务消息发送对应步骤1、2、3、4,事务消息回查对应步骤5、6、7

  1. 发送方向 MQ 服务端发送消息。
  2. MQ Server 将消息持久化成功之后,向发送方 ACK 确认消息已经发送成功,此时消息为半消息。
  3. 发送方开始执行本地事务逻辑。
  4. 发送方根据本地事务执行结果向 MQ Server 提交二次确认(Commit 或是 Rollback),MQ Server 收到 Commit 状态则将半消息标记为可投递,订阅方最终将收到该消息;MQ Server 收到 Rollback 状态则删除半消息,订阅方将不会接受该消息。
  5. 在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达 MQ Server,经过固定时间后 MQ Server 将对该消息发起消息回查。
  6. 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
  7. 发送方根据检查得到的本地事务的最终状态再次提交二次确认,MQ Server 仍按照步骤4对半消息进行操作。

1.4 常量介绍

  • LocalTransactionState.UNKNOW :暂时无法判断状态,期待固定时间以后 MQ Server 向发送方进行消息回查
  • LocalTransactionState.ROLLBACK_MESSAGE :回滚事务,消息将被丢弃不允许消费
  • LocalTransactionState.COMMIT_MESSAGE :提交事务,允许订阅方消费该消息

2 使用方法


2.1 导入Maven依赖

导入下方的maven依赖,已上传至公司内网

<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <optional>true</optional>
</dependency>
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>spring-boot-starter-rocketmq</artifactId>
    <version>1.0.1-SNAPSHOT</version>
</dependency>

2.2 创建TransactionListener监听类

  1. 创建一个rocketmq的事务监听类,实现TransactionListener接口
  2. 实现executeLocalTransaction方法和checkLocalTransaction方法
  3. executeLocalTransaction方法在我们向rocketmq提交消息时会执行一次,用来执行事务操作
  4. checkLocalTransaction方法是在我们执行executeLocalTransaction返回LocalTransactionState.UNKNOW之后用来定期进行回查的,
  5. 注意:千万要加上@Service@RocketMQTranscationListener注解,否则会导致无法扫描到该监听类,导致程序错误
@Service
@RocketMQTranscationListener
public class TransactionListenerImpl implements TransactionListener {

    /**
     * 执行本地事务
     * @param msg
     * @param arg
     * @return
     */
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
//        这里使用时间进行模拟
        long time = System.currentTimeMillis();
        LocalTransactionState state;
        if (time % 3 == 0) {
//            过一会再问
            state = LocalTransactionState.UNKNOW;
        } else if (time % 3 == 1) {
//            回滚
            state = LocalTransactionState.ROLLBACK_MESSAGE;
        } else {
//            提交
            state = LocalTransactionState.COMMIT_MESSAGE;
        }
        System.out.println(LocalDateTime.now() + " " + msg.getTransactionId() + ":executeLocalTransaction : " + state);
        return state;
    }

    /**
     * 回查本地事务
     * @param msg
     * @return
     */
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        long time = System.currentTimeMillis();
        LocalTransactionState state;
        if (time % 3 == 0) {
            state = LocalTransactionState.UNKNOW;
        } else if (time % 3 == 1) {
            state = LocalTransactionState.ROLLBACK_MESSAGE;
        } else {
            state = LocalTransactionState.COMMIT_MESSAGE;
        }
        System.out.println(LocalDateTime.now() + " " + msg.getTransactionId() + ":checkLocalTransaction : " + state);
        return state;
    }
}

2.3 LogVO类

@Data
@AllArgsConstructor
//一定要提供无参数构造,否则报错
@NoArgsConstructor
/**
 * LogVO
 *
 * @author naah
 * @date 2018-09-10 上午10:46
 * @desc
 */
public class LogVO implements Serializable {
    private Long logId;
    private String account;
    private String realName;
    private LocalDateTime time;
    private String ipAddress;
    private LogKind kind;
    private String context;
    private LocalDateTime createTime;
    private LocalDateTime updateTime;

    public enum LogKind {
        //添加类型
        add("add"),

        //修改类型
        update("update"),

        //删除类型
        delete("delete");

        private final String kind;

        private LogKind(String kind) {
            this.kind = kind;
        }

        public String getKind() {
            return kind;
        }
    }

}

2.4 Application启动类

我在rocketMQTemplate中提供了一个名为sendInTransaction的重载了三次的事务发送方法,在你使用的时候可以自己观察一下参数进行选择

@SpringBootApplication
public class RocketmqProducterApplication implements CommandLineRunner {
    @Resource
    private RocketMQTemplate rocketMQTemplate;

    public static void main(String[] args) {
        SpringApplication.run(RocketmqProducterApplication.class, args);
    }

    @Override
    public void run(String... strings) throws Exception {
        this.stringProducter();
    }

    public void stringProducter() throws InterruptedException {
        Random random = new Random();
        for (int i=0;i<10;i++){
            LogVO log = generateLog(random, i);
            String json= JSON.toJSONString(log);
            Thread.sleep(2000);
            //使用该方法发送事务消息
            rocketMQTemplate.sendInTransaction("log_str_demo",json,TransactionListenerImpl.class);
        }
    }

    private LogVO generateLog(Random random, int i) {
        return new LogVO(0L, "nayan" + i + random.nextInt(1000), "那焱", LocalDateTime.now(), "127.0.0.1", random.nextInt() % 2 == 0 ? LogVO.LogKind.add : LogVO.LogKind.delete, "", LocalDateTime.now(), null);
    }


}

2.5 properties配置

也可以使用Yaml

spring.rocketmq.nameServer=127.0.0.1:9876
spring.rocketmq.producer.group=my-group
server.port=9000

3 相关链接


3.1 本文Demo源码

公司内网gitlab Demo

3.2 相关文档地址

  1. 事务消息-阿里云
  2. RocketMQ 4.3正式发布,支持分布式事务-CSDN