java rocketmq
java rocketmq
pom.xml
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.3.0</version> <!-- 兼容 JDK 8+,推荐版本 -->
</dependency>
application.yml
rocketmq:
# NameServer 地址(必填)
name-server: 192.168.1.104:9876;192.168.1.105:9876
# 生产者配置(可选,但建议显式配置)
producer:
# 生产者组名(必填!必须全局唯一)
group: home_care_ry_producer
# 消息发送超时时间(毫秒,默认 3000)
send-message-timeout: 3000
# 同步发送失败时重试次数(默认 2)
retry-times-when-send-failed: 2
# 异步发送失败重试次数(默认 2)
retry-times-when-send-async-failed: 2
# 最大消息大小(字节,默认 4MB = 4 * 1024 * 1024)
max-message-size: 4194304
消息发送服务示例
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
/**
* RocketMQ 消息发送服务示例
* <p>
* 提供多种消息发送方式:
* 1. 同步发送:等待发送结果,适用于重要消息
* 2. 异步发送:不阻塞业务,通过回调处理结果
* 3. 单向发送:不关心结果,性能最高
* 4. 延迟发送:定时任务场景
* 5. 顺序发送:保证消息顺序消费
* 6. 事务发送:保证本地事务与消息发送的一致性
* 7. 批量发送:提高吞吐量
* </p>
*
* <p><b>使用示例:</b></p>
* <pre>
* {@code
* @Autowired
* private OrderMessageService orderMessageService;
*
* // 同步发送
* orderMessageService.sendOrderCreated(order);
*
* // 异步发送
* orderMessageService.asyncSendOrderCreated(order);
*
* // 延迟发送(10秒后)
* orderMessageService.sendDelayMessage(order, 3);
* }
* </pre>
*
* @author ruoyi
*/
@Service
@Slf4j
public class OrderMessageService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
// ==================== 基础发送 ====================
/**
* 同步发送消息(推荐用于重要业务)
* <p>会等待 Broker 返回结果,确保消息发送成功</p>
* <p><b>使用场景:</b>订单创建、支付成功等重要业务通知</p>
*
* @param order 订单对象,会自动 JSON 序列化
*/
public void sendOrderCreated(Object order) {
try {
// 同步发送,等待 Broker 返回结果
SendResult sendResult = rocketMQTemplate.syncSend("ORDER_TOPIC", order);
log.info("订单消息发送成功,MsgId: {}, Status: {}",
sendResult.getMsgId(), sendResult.getSendStatus());
} catch (Exception e) {
log.error("订单消息发送失败", e);
throw new RuntimeException("消息发送失败", e);
}
}
/**
* 发送带 Tag 的消息(消费者可以按 Tag 过滤)
* <p><b>使用场景:</b>同一 Topic 下区分不同业务类型</p>
*
* @param order 订单对象
* @param tag 消息标签(如:created、paid、cancelled)
*/
public void sendOrderWithTag(Object order, String tag) {
try {
// Topic:Tag 格式
String destination = "ORDER_TOPIC:" + tag;
SendResult sendResult = rocketMQTemplate.syncSend(destination, order);
log.info("带Tag消息发送成功,Tag: {}, MsgId: {}", tag, sendResult.getMsgId());
} catch (Exception e) {
log.error("带Tag消息发送失败,Tag: {}", tag, e);
}
}
// ==================== 异步发送 ====================
/**
* 异步发送消息(推荐用于非关键路径)
* <p>不阻塞当前线程,通过回调处理发送结果</p>
* <p><b>使用场景:</b>日志记录、数据同步等对实时性要求不高的场景</p>
*
* @param order 订单对象
*/
public void asyncSendOrderCreated(Object order) {
rocketMQTemplate.asyncSend("ORDER_TOPIC", order, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("异步消息发送成功,MsgId: {}, QueueId: {}",
sendResult.getMsgId(), sendResult.getMessageQueue().getQueueId());
}
@Override
public void onException(Throwable e) {
log.error("异步消息发送失败", e);
// 可以在这里实现重试或补偿逻辑
}
});
}
/**
* 异步发送带超时时间的消息
*
* @param order 订单对象
* @param timeout 超时时间(毫秒)
*/
public void asyncSendWithTimeout(Object order, long timeout) {
rocketMQTemplate.asyncSend("ORDER_TOPIC", order, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("异步消息发送成功: {}", sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
log.error("异步消息发送失败", e);
}
}, timeout);
}
// ==================== 单向发送 ====================
/**
* 单向发送消息(性能最高,不关心结果)
* <p>不等待 Broker 返回任何结果,性能最高但可靠性最低</p>
* <p><b>使用场景:</b>日志采集、监控数据上报等允许丢失的场景</p>
*
* @param order 订单对象
*/
public void sendOneWay(Object order) {
rocketMQTemplate.sendOneWay("ORDER_TOPIC", order);
log.debug("单向消息已发送(不保证送达)");
}
// ==================== 延迟发送 ====================
/**
* 发送延迟消息
* <p><b>延迟等级对照表:</b></p>
* <ul>
* <li>1: 1s</li>
* <li>2: 5s</li>
* <li>3: 10s</li>
* <li>4: 30s</li>
* <li>5: 1m</li>
* <li>6: 2m</li>
* <li>7: 3m</li>
* <li>8: 4m</li>
* <li>9: 5m</li>
* <li>10: 6m</li>
* <li>11: 7m</li>
* <li>12: 8m</li>
* <li>13: 9m</li>
* <li>14: 10m</li>
* <li>15: 20m</li>
* <li>16: 30m</li>
* <li>17: 1h</li>
* <li>18: 2h</li>
* </ul>
* <p><b>使用场景:</b>订单超时取消、定时提醒等</p>
*
* @param order 订单对象
* @param delayLevel 延迟等级(1-18)
*/
public void sendDelayMessage(Object order, int delayLevel) {
Message<Object> message = MessageBuilder.withPayload(order)
.setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, delayLevel)
.build();
SendResult sendResult = rocketMQTemplate.syncSend("DELAY_TOPIC", message);
log.info("延迟消息发送成功,延迟等级: {}, MsgId: {}", delayLevel, sendResult.getMsgId());
}
/**
* 发送延迟消息(便捷方法)
*
* @param order 订单对象
* @param delaySeconds 延迟秒数(1、5、10、30、60...)
*/
public void sendDelayMessageBySeconds(Object order, int delaySeconds) {
int delayLevel = getDelayLevel(delaySeconds);
sendDelayMessage(order, delayLevel);
}
/**
* 将秒数转换为延迟等级
*/
private int getDelayLevel(int seconds) {
if (seconds <= 1) return 1;
if (seconds <= 5) return 2;
if (seconds <= 10) return 3;
if (seconds <= 30) return 4;
if (seconds <= 60) return 5;
if (seconds <= 120) return 6;
if (seconds <= 180) return 7;
if (seconds <= 240) return 8;
if (seconds <= 300) return 9;
if (seconds <= 360) return 10;
if (seconds <= 420) return 11;
if (seconds <= 480) return 12;
if (seconds <= 540) return 13;
if (seconds <= 600) return 14;
if (seconds <= 1200) return 15;
if (seconds <= 1800) return 16;
if (seconds <= 3600) return 17;
return 18; // 2小时
}
// ==================== 顺序发送 ====================
/**
* 发送顺序消息(保证同一 Key 的消息顺序消费)
* <p>相同 hashKey 的消息会发送到同一队列,保证顺序消费</p>
* <p><b>使用场景:</b>订单状态流转、库存操作等需要严格顺序的场景</p>
*
* @param order 订单对象
* @param orderId 订单ID(作为 hashKey)
*/
public void syncSendOrderly(Object order, String orderId) {
// 第三个参数是 hashKey,相同 key 的消息会进入同一队列
SendResult sendResult = rocketMQTemplate.syncSendOrderly(
"ORDER_STATUS_TOPIC",
order,
orderId // hashKey,相同 orderId 的消息保证顺序
);
log.info("顺序消息发送成功,OrderId: {}, MsgId: {}, QueueId: {}",
orderId, sendResult.getMsgId(), sendResult.getMessageQueue().getQueueId());
}
// ==================== 带属性发送 ====================
/**
* 发送带 Key 的消息(用于消息追踪和查询)
* <p>通过 Key 可以在 RocketMQ 控制台快速查询消息</p>
* <p><b>使用场景:</b>需要追踪的重要业务消息</p>
*
* @param order 订单对象
* @param messageKey 消息唯一标识(如订单号、流水号)
*/
public void syncSendWithKey(Object order, String messageKey) {
Message<Object> message = MessageBuilder.withPayload(order)
.setHeader(MessageConst.PROPERTY_KEYS, messageKey) // 可用于控制台查询
.build();
SendResult sendResult = rocketMQTemplate.syncSend("TRACE_TOPIC", message);
log.info("带Key消息发送成功,Key: {}, MsgId: {}", messageKey, sendResult.getMsgId());
}
/**
* 发送带自定义属性的消息
*
* @param order 订单对象
* @param properties 自定义属性(如:业务类型、来源系统等)
*/
public void syncSendWithProperties(Object order, java.util.Map<String, Object> properties) {
MessageBuilder<Object> builder = MessageBuilder.withPayload(order);
// 添加自定义属性
if (properties != null && !properties.isEmpty()) {
properties.forEach(builder::setHeader);
}
SendResult sendResult = rocketMQTemplate.syncSend("ORDER_TOPIC", builder.build());
log.info("带自定义属性消息发送成功,MsgId: {}", sendResult.getMsgId());
}
// ==================== 事务发送 ====================
/**
* 发送事务消息(保证本地事务与消息发送的一致性)
* <p><b>执行流程:</b></p>
* <ol>
* <li>发送半消息(Half Message)</li>
* <li>执行本地事务(executeLocalTransaction)</li>
* <li>根据本地事务结果提交或回滚消息</li>
* <li>如果长时间未收到结果,Broker 会回查本地事务状态(checkLocalTransaction)</li>
* </ol>
* <p><b>使用场景:</b>分布式事务、订单支付等需要保证一致性的场景</p>
*
* @param order 订单对象
* @param transactionId 事务ID(用于回查)
*/
public void sendTransaction(Object order, String transactionId) {
Message<Object> message = MessageBuilder.withPayload(order)
.setHeader("transactionId", transactionId) // 用于回查时识别事务
.setHeader(MessageConst.PROPERTY_KEYS, transactionId)
.build();
rocketMQTemplate.sendMessageInTransaction(
"TX_ORDER_TOPIC",
message,
order // 传递给 executeLocalTransaction 的参数
);
log.info("事务消息已发送,TransactionId: {}", transactionId);
}
// ==================== 批量发送 ====================
/**
* 批量发送消息(提高吞吐量)
* <p><b>注意:</b></p>
* <ul>
* <li>批量消息不能是延迟消息</li>
* <li>批量消息的总大小不能超过 4MB</li>
* <li>批量消息需要配置相同的 Topic 和 Tag</li>
* </ul>
* <p><b>使用场景:</b>批量数据同步、批量通知等</p>
*
* @param orders 订单列表
*/
public void sendBatch(java.util.List<Object> orders) {
if (orders == null || orders.isEmpty()) {
log.warn("批量消息列表为空,跳过发送");
return;
}
try {
// 构建批量消息
java.util.List<Message<Object>> messages = new java.util.ArrayList<>();
for (Object order : orders) {
messages.add(MessageBuilder.withPayload(order).build());
}
// RocketMQ Spring Boot 暂不直接支持批量发送,需要通过原生 API
// 这里提供思路,实际使用需要注入 DefaultMQProducer
log.info("批量消息准备发送,数量: {}", orders.size());
// 逐个发送(实际项目中可以优化为真正的批量发送)
for (Object order : orders) {
rocketMQTemplate.syncSend("ORDER_TOPIC", order);
}
log.info("批量消息发送完成,数量: {}", orders.size());
} catch (Exception e) {
log.error("批量消息发送失败", e);
throw new RuntimeException("批量消息发送失败", e);
}
}
// ==================== 请求-响应模式 ====================
/**
* 发送请求消息并等待响应(RPC 模式)
* <p><b>使用场景:</b>需要同步获取响应结果的场景</p>
*
* @param request 请求对象
* @param timeout 超时时间(毫秒)
* @return 响应对象
*/
public Object sendAndReceive(Object request, long timeout) {
try {
Object response = rocketMQTemplate.sendAndReceive(
"REQUEST_TOPIC",
request,
Object.class,
timeout
);
log.info("RPC请求成功,收到响应");
return response;
} catch (Exception e) {
log.error("RPC请求失败", e);
throw new RuntimeException("RPC请求失败", e);
}
}
}
消费者示例
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* RocketMQ 消费者示例 - 订单消息消费
* <p>
* <b>消费者配置说明:</b>
* <ul>
* <li><b>topic</b>: 监听的主题名称</li>
* <li><b>consumerGroup</b>: 消费者组,必须全局唯一,建议命名:项目名_业务名_consumer</li>
* <li><b>selectorType</b>: 消息选择器类型(TAG 或 SQL92)</li>
* <li><b>selectorExpression</b>: 过滤表达式,"*" 表示接收所有消息</li>
* <li><b>consumeMode</b>: 消费模式(并发 CONCURRENTLY 或顺序 ORDERLY)</li>
* <li><b>messageModel</b>: 消息模式(集群 CLUSTERING 或广播 BROADCASTING)</li>
* <li><b>consumeThreadNumber</b>: 消费线程数,默认 64</li>
* <li><b>maxReconsumeTimes</b>: 最大重试次数,默认 16</li>
* </ul>
* </p>
*
* <p><b>使用注意事项:</b></p>
* <ol>
* <li>消费者组名必须全局唯一,不同消费者不能使用相同的组名</li>
* <li>消费逻辑应实现幂等性,避免重复消费问题</li>
* <li>消费失败时抛出异常,消息会自动重试</li>
* <li>达到最大重试次数后,消息会进入死信队列(DLQ)</li>
* </ol>
*
* @author ruoyi
*/
@Slf4j
@Component
@RocketMQMessageListener(
topic = "ORDER_TOPIC", // 监听的主题
consumerGroup = "home_care_order_consumer", // 消费者组,全局唯一
selectorType = SelectorType.TAG, // 按 Tag 过滤
selectorExpression = "*", // "*" 表示接收所有 Tag,也可以指定:"created || paid"
consumeMode = ConsumeMode.CONCURRENTLY, // 并发消费(默认)
messageModel = MessageModel.CLUSTERING, // 集群模式(默认)
consumeThreadNumber = 20, // 消费线程数
maxReconsumeTimes = 3 // 最大重试次数
)
public class OrderConsumer implements RocketMQListener<Object> {
/**
* 消费消息的核心方法
* <p><b>注意事项:</b></p>
* <ul>
* <li>此方法必须实现幂等性,即多次消费同一消息结果一致</li>
* <li>消费成功则正常返回,失败抛出异常触发重试</li>
* <li>达到最大重试次数后,消息进入死信队列:%DLQ%{consumerGroup}</li>
* <li>避免耗时操作,建议单条消息处理时间 < 5s</li>
* </ul>
*
* @param order 订单消息对象,会自动反序列化
*/
@Override
public void onMessage(Object order) {
try {
log.info("[OrderConsumer] 开始消费订单消息: {}", order);
// TODO: 实现具体的订单处理逻辑
// 1. 校验消息合法性
// 2. 幂等性处理(检查是否已处理)
// 3. 执行业务逻辑
// 4. 记录处理状态
// 模拟业务处理
processOrder(order);
log.info("[OrderConsumer] 订单消息消费成功");
} catch (Exception e) {
log.error("[OrderConsumer] 订单消息消费失败,将触发重试", e);
// 抛出异常,触发 RocketMQ 重试机制
throw new RuntimeException("消息消费失败", e);
}
}
/**
* 处理订单逻辑(示例)
*/
private void processOrder(Object order) {
// 实际业务逻辑处理
log.info("处理订单业务逻辑: {}", order);
}
}
顺序消费者示例
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* RocketMQ 顺序消费者示例
* <p>
* <b>顺序消费特点:</b>
* <ul>
* <li>消息按照发送顺序被消费(同一队列内)</li>
* <li>消费失败会阻塞队列,直到消费成功</li>
* <li>适用于需要严格保证消息顺序的场景</li>
* </ul>
* </p>
*
* <p><b>使用场景:</b></p>
* <ul>
* <li>订单状态流转(待支付 → 已支付 → 已发货 → 已完成)</li>
* <li>库存操作(扣减、回滚必须按顺序)</li>
* <li>账户流水(保证借贷记录顺序)</li>
* </ul>
*
* <p><b>注意事项:</b></p>
* <ul>
* <li>顺序消费性能低于并发消费</li>
* <li>消费失败会阻塞后续消息,需要特别注意异常处理</li>
* <li>建议设置合理的消费超时时间</li>
* </ul>
*
* @author ruoyi
*/
@Slf4j
@Component
@RocketMQMessageListener(
topic = "ORDER_STATUS_TOPIC",
consumerGroup = "home_care_order_status_sequential_consumer",
consumeMode = ConsumeMode.ORDERLY, // 顺序消费模式
messageModel = MessageModel.CLUSTERING,
consumeThreadNumber = 1 // 顺序消费建议设置为 1
)
public class OrderSequentialConsumer implements RocketMQListener<Object> {
@Override
public void onMessage(Object message) {
try {
log.info("[OrderSequentialConsumer] 开始顺序消费消息: {}", message);
// TODO: 实现顺序处理逻辑
// 例如:处理订单状态变更
processOrderStatus(message);
log.info("[OrderSequentialConsumer] 消息消费成功");
} catch (Exception e) {
log.error("[OrderSequentialConsumer] 消息消费失败,将阻塞队列", e);
// 顺序消费时,抛出异常会阻塞当前队列,直到消费成功
throw new RuntimeException("顺序消费失败", e);
}
}
/**
* 处理订单状态变更
*/
private void processOrderStatus(Object message) {
log.info("处理订单状态变更: {}", message);
// 实现具体的业务逻辑
}
}
过滤消费者示例
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* RocketMQ Tag 过滤消费者示例
* <p>
* <b>Tag 过滤机制:</b>
* <ul>
* <li>消费者可以通过 Tag 只接收特定类型的消息</li>
* <li>支持单个 Tag 或多个 Tag(用 || 分隔)</li>
* <li>Broker 端过滤,减少网络传输</li>
* </ul>
* </p>
*
* <p><b>使用场景:</b></p>
* <ul>
* <li>同一 Topic 下区分不同业务类型(如:订单创建、订单支付、订单完成)</li>
* <li>消费者只关心特定类型的消息</li>
* <li>实现消息的精确订阅</li>
* </ul>
*
* <p><b>Tag 配置示例:</b></p>
* <ul>
* <li>"*" - 接收所有消息</li>
* <li>"created" - 只接收 created 标签的消息</li>
* <li>"created || paid" - 接收 created 或 paid 标签的消息</li>
* <li>"created || paid || cancelled" - 接收三种标签的消息</li>
* </ul>
*
* @author ruoyi
*/
@Slf4j
@Component
@RocketMQMessageListener(
topic = "ORDER_TOPIC",
consumerGroup = "home_care_order_paid_consumer",
selectorType = SelectorType.TAG,
selectorExpression = "paid || completed" // 只接收 paid 和 completed 标签的消息
)
public class OrderTagConsumer implements RocketMQListener<Object> {
@Override
public void onMessage(Object message) {
try {
log.info("[OrderTagConsumer] 接收到订单支付/完成消息: {}", message);
// TODO: 实现具体的业务逻辑
// 例如:发送通知、更新统计数据等
processPaymentOrCompletionMessage(message);
log.info("[OrderTagConsumer] 消息处理成功");
} catch (Exception e) {
log.error("[OrderTagConsumer] 消息处理失败", e);
throw new RuntimeException("消息处理失败", e);
}
}
/**
* 处理支付或完成消息
*/
private void processPaymentOrCompletionMessage(Object message) {
log.info("处理订单支付/完成相关业务: {}", message);
// 实现具体的业务逻辑
}
}
事务消息监听器示例
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
/**
* RocketMQ 事务消息监听器示例
* <p>
* <b>事务消息执行流程:</b>
* <ol>
* <li>发送半消息(Half Message)到 Broker</li>
* <li>执行本地事务(executeLocalTransaction)</li>
* <li>根据本地事务结果返回 COMMIT 或 ROLLBACK</li>
* <li>如果返回 UNKNOWN 或长时间未响应,Broker 会调用 checkLocalTransaction 回查</li>
* <li>根据回查结果决定是否提交消息</li>
* </ol>
* </p>
*
* <p><b>三种事务状态:</b></p>
* <ul>
* <li><b>COMMIT</b>: 提交事务,消息对消费者可见</li>
* <li><b>ROLLBACK</b>: 回滚事务,消息不会被消费</li>
* <li><b>UNKNOWN</b>: 中间状态,需要 Broker 回查</li>
* </ul>
*
* <p><b>使用场景:</b></p>
* <ul>
* <li>订单创建后发送库存扣减消息</li>
* <li>支付成功后发送通知消息</li>
* <li>需要保证数据库操作与消息发送一致性的场景</li>
* </ul>
*
* <p><b>配置说明:</b></p>
* <ul>
* <li>每个 Listener 只能绑定一个 Topic</li>
* <li>如需多个 Topic 事务,需要创建多个 Listener</li>
* <li>通过 txProducerGroup 参数区分不同的事务发送者</li>
* </ul>
*
* @author ruoyi
*/
@Slf4j
@RocketMQTransactionListener
public class OrderTransactionListener implements RocketMQLocalTransactionListener {
/**
* 执行本地事务
* <p>此方法在半消息发送成功后立即调用</p>
*
* <p><b>执行逻辑:</b></p>
* <ol>
* <li>执行数据库操作(如扣库存、更新订单状态)</li>
* <li>根据操作结果返回相应状态</li>
* <li>如果不确定状态,返回 UNKNOWN,等待 Broker 回查</li>
* </ol>
*
* <p><b>注意事项:</b></p>
* <ul>
* <li>此方法应该尽快执行完成,避免耗时操作</li>
* <li>建议记录事务执行状态,便于回查</li>
* <li>如果方法抛出异常,也会触发回查机制</li>
* </ul>
*
* @param msg 半消息对象,包含消息体和头信息
* @param arg 发送消息时传入的额外参数
* @return 本地事务执行状态:COMMIT、ROLLBACK 或 UNKNOWN
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
// 获取消息头信息
MessageHeaders headers = msg.getHeaders();
String transactionId = (String) headers.get("transactionId");
log.info("[TransactionListener] 开始执行本地事务,TransactionId: {}", transactionId);
log.info("[TransactionListener] 消息内容: {}", msg.getPayload());
log.info("[TransactionListener] 额外参数: {}", arg);
// ==================== 执行本地事务 ====================
// TODO: 实现具体的业务逻辑
// 示例:
// 1. 执行数据库操作(扣库存、更新订单)
// 2. 记录事务执行状态到数据库(用于回查)
// 3. 根据执行结果决定返回状态
boolean success = executeBusinessLogic(transactionId, arg);
if (success) {
log.info("[TransactionListener] 本地事务执行成功,提交消息");
return RocketMQLocalTransactionState.COMMIT;
} else {
log.warn("[TransactionListener] 本地事务执行失败,回滚消息");
return RocketMQLocalTransactionState.ROLLBACK;
}
} catch (Exception e) {
log.error("[TransactionListener] 本地事务执行异常,返回 UNKNOWN 等待回查", e);
// 发生异常时,返回 UNKNOWN,等待 Broker 回查
return RocketMQLocalTransactionState.UNKNOWN;
}
}
/**
* 回查本地事务状态
* <p>当 executeLocalTransaction 返回 UNKNOWN 或超时时,Broker 会调用此方法回查</p>
*
* <p><b>回查规则:</b></p>
* <ul>
* <li>首次回查间隔:6秒</li>
* <li>后续回查间隔:60秒</li>
* <li>最大回查次数:15次</li>
* <li>超过最大次数后,消息会被丢弃</li>
* </ul>
*
* <p><b>实现思路:</b></p>
* <ol>
* <li>通过 transactionId 查询数据库事务状态表</li>
* <li>根据记录的状态返回 COMMIT 或 ROLLBACK</li>
* <li>如果找不到记录,可能是事务还在执行,返回 UNKNOWN</li>
* </ol>
*
* <p><b>注意事项:</b></p>
* <ul>
* <li>回查方法应该尽快返回,避免耗时操作</li>
* <li>回查逻辑应该是查询操作,不应该修改数据</li>
* <li>如果不确定状态,可以继续返回 UNKNOWN,直到超过回查次数</li>
* </ul>
*
* @param msg 半消息对象
* @return 事务状态:COMMIT、ROLLBACK 或 UNKNOWN
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
try {
// 获取事务ID
MessageHeaders headers = msg.getHeaders();
String transactionId = (String) headers.get("transactionId");
log.info("[TransactionListener] 回查本地事务状态,TransactionId: {}", transactionId);
// ==================== 查询事务状态 ====================
// TODO: 从数据库查询事务执行状态
// 示例:
// 1. 根据 transactionId 查询事务状态表
// 2. 如果查询到事务成功记录,返回 COMMIT
// 3. 如果查询到事务失败记录,返回 ROLLBACK
// 4. 如果查不到记录,可能还在执行,返回 UNKNOWN
String status = checkTransactionStatus(transactionId);
if ("SUCCESS".equals(status)) {
log.info("[TransactionListener] 回查结果:事务已成功,提交消息");
return RocketMQLocalTransactionState.COMMIT;
} else if ("FAILED".equals(status)) {
log.info("[TransactionListener] 回查结果:事务已失败,回滚消息");
return RocketMQLocalTransactionState.ROLLBACK;
} else {
log.info("[TransactionListener] 回查结果:事务状态未知,继续回查");
return RocketMQLocalTransactionState.UNKNOWN;
}
} catch (Exception e) {
log.error("[TransactionListener] 回查本地事务状态异常", e);
// 回查异常时,返回 UNKNOWN,等待下次回查
return RocketMQLocalTransactionState.UNKNOWN;
}
}
/**
* 执行具体的业务逻辑(示例)
*
* @param transactionId 事务ID
* @param arg 额外参数
* @return 是否成功
*/
private boolean executeBusinessLogic(String transactionId, Object arg) {
// TODO: 实现具体的业务逻辑
// 示例:
// 1. 执行数据库操作
// 2. 记录事务状态
log.info("执行业务逻辑,TransactionId: {}, Arg: {}", transactionId, arg);
// 模拟业务执行成功
return true;
}
/**
* 查询事务状态(示例)
*
* @param transactionId 事务ID
* @return 事务状态:SUCCESS、FAILED 或 UNKNOWN
*/
private String checkTransactionStatus(String transactionId) {
// TODO: 从数据库查询事务状态
// 示例:
// SELECT status FROM transaction_log WHERE transaction_id = ?
log.info("查询事务状态,TransactionId: {}", transactionId);
// 模拟查询结果
return "SUCCESS";
}
}