RabbitMQ 使用教程(全功能讲解 + Spring Boot 落地代码)
目标:一份从入门到生产可用的 RabbitMQ 教程。
说明:文件名按你的要求使用rabitmq,内容讲解的是 RabbitMQ。
1. RabbitMQ 是什么
RabbitMQ 是基于 AMQP 协议的消息中间件,常用于:
- 系统解耦(异步化)
- 削峰填谷(流量高峰缓冲)
- 可靠通知(订单、支付、物流)
- 广播与多订阅(事件驱动)
核心角色:
Producer:生产者Exchange:交换机(路由消息)Queue:队列(存储消息)Consumer:消费者Binding:绑定(队列和交换机的关系)
2. 安装与启动
2.1 Docker 快速启动
docker run -d --name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
rabbitmq:3.13-management
- AMQP 端口:
5672 - 管理控制台:
15672 - 默认账号密码:
guest / guest(仅本机访问)
3. 消息模型与交换机类型
RabbitMQ 最常见交换机:
direct:精确路由(routingKey 完全匹配)topic:通配符路由(*、#)fanout:广播headers:按消息头匹配(较少用)
4. 消息生命周期(核心流程)
- 生产者发送消息到 Exchange
- Exchange 按规则路由到 Queue
- 消费者监听 Queue 并处理
- 消费成功后 ACK;失败则重试/拒绝/转死信
5. AMQP 常用属性
deliveryMode:是否持久化(2 表示持久化)contentType:内容类型(如application/json)expiration:消息过期时间(毫秒)priority:优先级correlationId:请求追踪replyTo:RPC 回调队列
6. Spring Boot 集成
6.1 依赖
pom.xml:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
6.2 基础配置
application.yml:
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtual-host: /
publisher-confirm-type: correlated
publisher-returns: true
template:
mandatory: true
listener:
simple:
acknowledge-mode: manual
prefetch: 20
retry:
enabled: true
max-attempts: 3
initial-interval: 1000
multiplier: 2
max-interval: 10000
6.3 消息序列化配置(JSON)
@Configuration
public class RabbitConfig {
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
7. 四种交换机的落地代码
7.1 Direct(精确路由)
@Configuration
public class DirectMqConfig {
public static final String EX = "demo.direct.ex";
public static final String Q = "demo.direct.q";
public static final String RK = "demo.direct.key";
@Bean
DirectExchange directExchange() {
return ExchangeBuilder.directExchange(EX).durable(true).build();
}
@Bean
Queue directQueue() {
return QueueBuilder.durable(Q).build();
}
@Bean
Binding directBinding() {
return BindingBuilder.bind(directQueue()).to(directExchange()).with(RK);
}
}
生产消息:
@Service
public class DirectProducer {
@Resource
private RabbitTemplate rabbitTemplate;
public void send(Object payload) {
rabbitTemplate.convertAndSend(DirectMqConfig.EX, DirectMqConfig.RK, payload);
}
}
消费消息(手动 ack):
@Slf4j
@Component
public class DirectConsumer {
@RabbitListener(queues = DirectMqConfig.Q)
public void onMessage(Map<String, Object> msg, Channel channel, Message message) throws IOException {
long tag = message.getMessageProperties().getDeliveryTag();
try {
log.info("direct consumer receive: {}", msg);
// TODO 业务处理
channel.basicAck(tag, false);
} catch (Exception e) {
channel.basicNack(tag, false, false); // 拒绝并不重回队列(可转死信)
}
}
}
7.2 Topic(通配符路由)
@Configuration
public class TopicMqConfig {
public static final String EX = "demo.topic.ex";
public static final String Q_ORDER = "demo.topic.order.q";
public static final String Q_PAY = "demo.topic.pay.q";
@Bean
TopicExchange topicExchange() {
return ExchangeBuilder.topicExchange(EX).durable(true).build();
}
@Bean
Queue orderQueue() {
return QueueBuilder.durable(Q_ORDER).build();
}
@Bean
Queue payQueue() {
return QueueBuilder.durable(Q_PAY).build();
}
@Bean
Binding b1() {
return BindingBuilder.bind(orderQueue()).to(topicExchange()).with("order.*");
}
@Bean
Binding b2() {
return BindingBuilder.bind(payQueue()).to(topicExchange()).with("order.pay.#");
}
}
7.3 Fanout(广播)
@Configuration
public class FanoutMqConfig {
public static final String EX = "demo.fanout.ex";
public static final String Q_A = "demo.fanout.a.q";
public static final String Q_B = "demo.fanout.b.q";
@Bean
FanoutExchange fanoutExchange() {
return ExchangeBuilder.fanoutExchange(EX).durable(true).build();
}
@Bean
Queue fanoutQueueA() {
return QueueBuilder.durable(Q_A).build();
}
@Bean
Queue fanoutQueueB() {
return QueueBuilder.durable(Q_B).build();
}
@Bean
Binding fanoutBindA() {
return BindingBuilder.bind(fanoutQueueA()).to(fanoutExchange());
}
@Bean
Binding fanoutBindB() {
return BindingBuilder.bind(fanoutQueueB()).to(fanoutExchange());
}
}
7.4 Headers(按 Header 路由)
@Configuration
public class HeadersMqConfig {
public static final String EX = "demo.headers.ex";
public static final String Q = "demo.headers.q";
@Bean
HeadersExchange headersExchange() {
return ExchangeBuilder.headersExchange(EX).durable(true).build();
}
@Bean
Queue headersQueue() {
return QueueBuilder.durable(Q).build();
}
@Bean
Binding headersBinding() {
return BindingBuilder.bind(headersQueue()).to(headersExchange())
.where("type").matches("report");
}
}
发送时带 header:
rabbitTemplate.convertAndSend(HeadersMqConfig.EX, "", payload, msg -> {
msg.getMessageProperties().setHeader("type", "report");
return msg;
});
8. 可靠投递(生产者确认 + 回退)
8.1 ConfirmCallback(到达 Exchange)
@Configuration
public class RabbitCallbackConfig {
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(new Jackson2JsonMessageConverter());
template.setConfirmCallback((correlationData, ack, cause) -> {
if (!ack) {
// TODO 记录失败日志,进入补偿流程
}
});
template.setReturnsCallback(returned -> {
// 到达 exchange 但路由不到 queue
// TODO 记录 returned message,补偿
});
return template;
}
}
9. 消费确认 ACK / NACK / Reject
basicAck:确认消费成功basicNack:否认(可批量、可重回队列)basicReject:否认单条(可重回队列)
建议:
- 业务成功才 ACK
- 非瞬时错误可进死信,避免无限重试
10. 重试机制
两类常见重试:
- 应用层重试(Spring listener retry)
- 消息层重试(TTL + DLX 或延迟插件)
11. 死信队列(DLX)
触发死信场景:
- 消息被拒绝且
requeue=false - 消息过期
- 队列达到最大长度
配置示例:
@Configuration
public class DlxConfig {
public static final String EX_NORMAL = "demo.normal.ex";
public static final String Q_NORMAL = "demo.normal.q";
public static final String EX_DLX = "demo.dlx.ex";
public static final String Q_DLX = "demo.dlx.q";
@Bean
DirectExchange normalExchange() {
return ExchangeBuilder.directExchange(EX_NORMAL).durable(true).build();
}
@Bean
Queue normalQueue() {
return QueueBuilder.durable(Q_NORMAL)
.deadLetterExchange(EX_DLX)
.deadLetterRoutingKey("dlx.key")
.build();
}
@Bean
DirectExchange dlxExchange() {
return ExchangeBuilder.directExchange(EX_DLX).durable(true).build();
}
@Bean
Queue dlxQueue() {
return QueueBuilder.durable(Q_DLX).build();
}
@Bean
Binding normalBind() {
return BindingBuilder.bind(normalQueue()).to(normalExchange()).with("normal.key");
}
@Bean
Binding dlxBind() {
return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("dlx.key");
}
}
12. 延迟消息
12.1 方案 A:TTL + DLX(通用)
为消息设置 TTL,到期后投递到死信队列,再由死信消费者处理。
12.2 方案 B:x-delayed-message 插件(更灵活)
安装插件后可按消息级别设置延迟。
@Bean
CustomExchange delayExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange("demo.delay.ex", "x-delayed-message", true, false, args);
}
发送延迟消息:
rabbitTemplate.convertAndSend("demo.delay.ex", "delay.key", payload, msg -> {
msg.getMessageProperties().setHeader("x-delay", 5000); // 5 秒
return msg;
});
13. 消息优先级队列
@Bean
Queue priorityQueue() {
return QueueBuilder.durable("demo.priority.q")
.maxPriority(10)
.build();
}
发送时指定优先级:
rabbitTemplate.convertAndSend("demo.direct.ex", "demo.direct.key", payload, msg -> {
msg.getMessageProperties().setPriority(9);
return msg;
});
14. 消息过期(TTL)
队列级 TTL:
QueueBuilder.durable("demo.ttl.q")
.ttl(30000)
.build();
消息级 TTL:
rabbitTemplate.convertAndSend("demo.direct.ex", "demo.direct.key", payload, msg -> {
msg.getMessageProperties().setExpiration("10000");
return msg;
});
15. 惰性队列(Lazy Queue)
将消息优先落盘,减少内存占用,适合大堆积场景。
@Bean
Queue lazyQueue() {
return QueueBuilder.durable("demo.lazy.q")
.lazy()
.build();
}
16. 幂等消费(防重复)
核心思路:
- 每条消息有唯一业务 ID(如
orderId) - 消费前先做幂等检查(Redis/DB)
- 已处理过直接 ACK 跳过
简例(伪码):
if (redis.setIfAbsent("mq:idem:" + msgId, "1", 1 day)) {
// 执行业务
ack
} else {
// 重复消息
ack
}
17. 顺序消息
RabbitMQ 不天然保证全局顺序。
常用做法:
- 同一业务主键路由到同一队列(单消费者)
- 消费端串行处理
- 允许局部有序而非全局有序
18. RPC 模式(请求-响应)
RabbitMQ 支持 RPC(replyTo + correlationId),但生产中更推荐异步事件模型;RPC 易耦合、超时处理复杂。
19. Spring Boot 统一发送服务示例
@Service
public class MqSendService {
@Resource
private RabbitTemplate rabbitTemplate;
public void sendOrderCreated(OrderCreatedEvent event) {
CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(
TopicMqConfig.EX,
"order.created",
event,
message -> {
message.getMessageProperties().setContentType("application/json");
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return message;
},
cd
);
}
}
20. 失败重试 + 死信 的推荐流程
- 消费异常先走应用层重试(短重试)
- 仍失败则
nack(requeue=false) - 进入死信队列
- 死信消费者记录告警/人工干预/定时补偿
21. 常见生产参数建议
prefetch:10~200(按耗时调优)- 消费者线程数:CPU 核数与 I/O 类型综合评估
- 消息体大小:尽量小(大对象走对象存储)
- 队列长度:设置上限 + 降级策略
22. 集群与高可用
22.1 普通集群
- 解决吞吐与扩展问题
- 队列默认不自动镜像到所有节点
22.2 Quorum Queue(推荐)
- 基于 Raft,替代经典镜像队列
- 提供更可靠的一致性和故障恢复
声明示例:
@Bean
Queue quorumQueue() {
return QueueBuilder.durable("demo.quorum.q")
.quorum()
.build();
}
23. 监控与运维
重点监控项:
- 消息堆积(Ready/Unacked)
- 消费速率/发布速率
- 连接数、通道数
- 节点磁盘与内存告警
- 死信队列增长
管理插件 API 可结合 Prometheus + Grafana 做告警。
24. 安全实践
- 禁止生产环境使用默认账号
- 按业务拆分 vhost 与账号权限
- 使用 TLS 传输(跨机房/公网)
- 最小权限原则
- 审计关键操作
25. 常见问题排查
25.1 消息丢失
排查方向:
- 是否开启持久化(交换机/队列/消息)
- 是否开启生产者确认和 return 回调
- 消费是否提前 ack
25.2 消息重复
- 网络抖动导致重投递
- 消费失败重回队列
- 解决:幂等消费
25.3 消息堆积
- 消费能力不足
- 单条处理耗时过长
- 解决:扩容消费者、优化业务、拆分队列
26. 一个可直接复用的业务案例(订单)
链路:
- 用户下单 -> 发送
order.created - 库存服务消费扣库存
- 积分服务消费加积分
- 通知服务消费发短信
- 任一服务失败 -> 进入 DLQ -> 补偿重试
Topic 路由示例:
order.created.stockorder.created.pointorder.created.notify
27. Controller 测试接口示例
@RestController
@RequestMapping("/mq")
public class MqController {
@Resource
private DirectProducer directProducer;
@Resource
private MqSendService mqSendService;
@PostMapping("/direct")
public String sendDirect(@RequestBody Map<String, Object> body) {
directProducer.send(body);
return "ok";
}
@PostMapping("/order")
public String sendOrder(@RequestBody OrderCreatedEvent event) {
mqSendService.sendOrderCreated(event);
return "ok";
}
}
28. 与 Kafka / Redis Stream 的简单对比
- RabbitMQ:路由灵活、低延迟、业务通知友好
- Kafka:超高吞吐、日志流平台、回放能力强
- Redis Stream:轻量简单、适合中小场景
29. 学习路线建议
- 第 1 阶段:Direct/Topic/Fanout + ACK
- 第 2 阶段:Confirm/Return + DLX + 重试
- 第 3 阶段:延迟消息 + 幂等 + 顺序
- 第 4 阶段:Quorum Queue + 监控 + 容灾
30. 官方文档
如果你愿意,我下一步可以继续给你补:
rabbitmq面试题.md(高频问题+标准答案)- 或者直接生成一个可运行的
springboot-rabbitmq-demo项目结构(含配置、生产者、消费者、DLQ、延迟消息、接口测试脚本)。