RabbitMQ
官网
https://www.rabbitmq.com/
https://www.rabbitmq.com/tutorials/amqp-concepts.html
RabbitMQ介绍
RabbitMQ介绍
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。
Erlang:Erlang是一种通用的面向并发的编程语言,Erlang充分发挥CPU的性能,延迟特别低
,相比其他的MQ(Kafka,RocketMQ)延迟是最低的。
RabbitMQ支持多种语言通讯:Java,Python…………都有响应的API
RabbitMQ支持海量的插件去实现一些特殊功能(延迟交换机),RabbitMQ自带了一款图形化界面,操作异常的简单。
RabbitMQ应用场景
- 排队算法:使用消息队列特性
- 秒杀活动:使用消息队列特性
- 消息分发:使用消息异步特性
- 异步处理:使用消息异步特性
- 数据同步:使用消息异步特性
- 处理耗时任务:使用消息异步特性
- 流量削峰
AMQP是什么
AMQP(Advanced Message Queuing protocol,高级消息队列协议)是进程之间传递异步消息的网络协议。
AMQP工作过程
中间的白框就是AMQP
工作过程:发布者(publisher) 发布消息(Message),经过交换机(Exchange),交换机根据路由规则将收到的消息分发给交换机绑定的队列(Queue),最后AMQP代理会将消息投递给订阅了此队列的消费者,或者由消费者按照需求自行获取。
RabbitMQ安装
docker-compose.yml
编写docker-compose.yml文件
version: "3.1"
services:
rabbitmq:
image: daocloud.io/library/rabbitmq:3.8.5
container_name: rabbitmq
restart: always
volumes:
- /usr/local/xqm/rabbitmq/data/:/var/lib/rabbitmq/
ports:
- 5672:5672
# 图形化界面的端口
- 15672:15672
安装docker-compose
安装的版本是1.27.4,可自行去查找其他版本
curl -L https://get.daocloud.io/docker/compose/releases/download/1.27.4/docker-compose-`uname -s`-`uname -m` > /usr/local/bin/docker-compose
chmod +x /usr/local/bin/docker-compose
docker-compose --version
运行
docker-compose up -d
测试
[root@localhost rabbitmq]# curl localhost:5672
AMQP
表示正常运行curl localhost:5672
启动图形化界面
默认RabbitMQ的图形化界面是关闭的,需要自行打开图形化这个插件
docker exec -it rabbitmq /bin/bash
# 进入/opt/rabbitmq/sbin/ 启动插件
root@721622f2a085:/opt/rabbitmq/sbin# ./rabbitmq-plugins enable rabbitmq_management
Enabling plugins on node rabbit@721622f2a085:
rabbitmq_management
The following plugins have been configured:
rabbitmq_management
rabbitmq_management_agent
rabbitmq_web_dispatch
Applying plugin configuration to rabbit@721622f2a085...
The following plugins have been enabled:
rabbitmq_management
rabbitmq_management_agent
rabbitmq_web_dispatch
started 3 plugins.
访问图形化界面
访问15672端口:默认的用户名和密码均为:`guest`
192.168.49.131:15672
RabbitMQ架构
- publisher:消息的生产者,也是一个向交换机发布消息的客户端应用程序
- consumer:消息的消费者,也是一个从消息队列中取得消息的客户端应用程序
- message:消息。由消息头消息体组成。消息头由一系列可选属性组成。包括:routing-key(路由)、priority(优先权)、delivery-mode(消息是否持久性存储)等等
- exchange:交换机,一共有四种:direct(发布订阅完全匹配)、fanout(广播)、topic(主题,规则匹配)、headers(基本不用)
- routes/routeing-key:路由,rabbitMQ通过路由规则来决定将exchange中消息投放到哪个队列中,可以是一对一投放,一对多投放,多对多投放。通过不同的exchange类型来实现
- queue:消息队列
- connection:连接,通rabbitMQ服务器和服务创建TCP连接
- channel:频道/信道。
- 是TCP里面的虚拟链接。比如:电缆相当于TCP连接,channel相当光纤束。
- 一个connection中可以创建多个channel
- TCP一旦打开,就会创建AMQP信道
- 无论是发布消息、接收消息、订阅队列,都是通过channel来完成的
- binding:绑定,用于消息队列和交换器之间关联关系。一个绑定就是一个routes将exchange和queue连接起来的路由规则
- virtual-host:虚拟主机,可以有多个exchange,表示一批交换机。
- RabbitMQ默认的虚拟主机是/
- 每个vHost本质都是一个mini版本的rabbitMQ服务器,拥有自己的队列、交换器、绑定和权限机制
- Borker:表示消息队列服务器实体
RabbitMQ的通讯方式
七种通讯方式
RabbitMQ一共有七种通讯方式。
通讯方式官网:https://www.rabbitmq.com/getstarted.html
- Hello World!:为了入门操作!
- Work queues:一个队列被多个消费者消费
- Publish/Subscribe:手动创建Exchange(FANOUT-广播)
- Routing:手动创建Exchange(DIRECT-匹配)
- Topics:手动创建Exchange(TOPIC-通过通配符来匹配)
- RPC:RPC方式
- Publisher Confirms:保证消息可靠性
# 交换机 DIRECT("direct"), FANOUT("fanout"), TOPIC("topic"), HEADERS("headers")有4种
hello world:
一个生产者-默认交换机-RoutingKey-queue-一个消费者
work queues:
一个生产者-默认交换机-RoutingKey-queue-多个消费者(采用轮询的机制来消费一个队列中的数据)
publish/subscribe: 交换机能够分发消息到不同的队列,routingKey无所谓,随便填,发送一条消息,那么所有的队列全部有
一个生产者-exchange(fanout) -RoutingKey-queue-consumer
-RoutingKey-queue-consumer
routing: 通过routingKey分发给不同的队列
一个生产者-exchange(direct) -RoutingKey(orange)-queue-consumer
-RoutingKey(apple)-queue-consumer
topics: 通过routingKey分发给不同的队列, *代表的是占位符,#代表的是通配符
一个生产者-exchange(topic) -RoutingKey(*.orange.*)-queue-consumer
-RoutingKey(*.*.apple)-queue-consumer
-RoutingKey(lazy.#)-queue-consumer
连接IDEA
- 导入依赖:amqp-client,junit
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.9.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>
创建连接工具类
// 连接工具类
package com.xqm.mq.util;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class RabbitMQConnectionUtil {
// ip
public static final String RABBITMQ_HOST="192.168.49.131";
// port
public static final int RABBITMQ_PORT=5672;
// username
public static final String RABBITMQ_USERNAME="guest";
// password
public static final String RABBITMQ_PASSWORD="guest";
// virtual host
public static final String RABBITMQ_VIRTUAL_HOST="/";
/**
* 构建rabbitmq的连接对象
* @return
*/
public static Connection getConnection() throws Exception {
// 1.创建connection工场
ConnectionFactory factory=new ConnectionFactory();
// 2.设置RabbitMQ连接信息
factory.setHost(RABBITMQ_HOST);
factory.setPort(RABBITMQ_PORT);
factory.setUsername(RABBITMQ_USERNAME);
factory.setPassword(RABBITMQ_PASSWORD);
factory.setVirtualHost(RABBITMQ_VIRTUAL_HOST);
// 3.返回连接对象
return factory.newConnection();
}
}
第一种:Hello world
// 一个生产者 一个消费者 使用的是默认交换机
生产者–默认交换机-RoutingKey-queue-消费者
- 生产者
// 生产者
package com.xqm.mq.test.helloworld;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.xqm.mq.util.RabbitMQConnectionUtil;
import org.junit.Test;
import java.nio.charset.StandardCharsets;
public class Publisher {
public static final String QUEUE_NAME="hello";
@Test
public void publish() throws Exception {
// 1.获取连接对象
Connection connection = RabbitMQConnectionUtil.getConnection();
// 2.构建Channel
Channel channel = connection.createChannel();
// 3.构建队列 (队列名,是否持久化(重启是否删除),是否是独占队列(其他连接不能占有),是否自动删除(如果长时间没使用的时候),其他参数(在图形化界面的其他参数位置))
// queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) throws IOException
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
// 4.发布消息 默认交换机就是""
String message="hello world";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));
System.out.println("消息已发送");
// 进行阻塞
System.in.read();
}
}
- 消费者
// 消费者
package com.xqm.mq.test.helloworld;
import com.rabbitmq.client.*;
import com.xqm.mq.util.RabbitMQConnectionUtil;
import org.junit.Test;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
public class Consumer {
@Test
public void consume() throws Exception {
// 1.获取连接对象
Connection connection = RabbitMQConnectionUtil.getConnection();
// 2.构建Channel
Channel channel = connection.createChannel();
// 3.构建队列 (队列名,是否持久化(重启是否删除),是否是独占队列(其他连接不能占有),是否自动删除(如果长时间没使用的时候),其他参数(在图形化界面的其他参数位置))
// 必须和生产者构建的队列一样,否则就会报错
channel.queueDeclare(Publisher.QUEUE_NAME,false,false,false,null);
// 4.监听消息 消费消息
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
// 重写handleDelivery
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者获取的消息:"+new String(body,"UTF-8"));
}
};
channel.basicConsume(Publisher.QUEUE_NAME,true,defaultConsumer);
// 进行阻塞
System.in.read();
}
}
第二种:Work queues
- 生产者
// 生产者,生产10条消息
package com.xqm.mq.test.workqueues;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.xqm.mq.util.RabbitMQConnectionUtil;
import org.junit.Test;
import java.nio.charset.StandardCharsets;
public class Publisher {
public static final String QUEUE_NAME="work";
@Test
public void publish() throws Exception {
// 1.获取连接对象
Connection connection = RabbitMQConnectionUtil.getConnection();
// 2.构建Channel
Channel channel = connection.createChannel();
// 3.构建队列 (队列名,是否持久化(重启是否删除),是否是独占队列(其他连接不能占有),是否自动删除(如果长时间没使用的时候),其他参数(在图形化界面的其他参数位置))
// queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) throws IOException
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
// 4.发布消息 默认交换机就是"" 发送10个消息
for (int i = 0; i < 10; i++) {
String message="hello world"+i;
channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));
}
System.out.println("消息已发送");
// 进行阻塞
System.in.read();
}
}
- 消费者
// 两个消费者
package com.xqm.mq.test.workqueues;
import com.rabbitmq.client.*;
import com.xqm.mq.util.RabbitMQConnectionUtil;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
public class Consumer {
@Test
public void consume1() throws Exception {
// 1.获取连接对象
Connection connection = RabbitMQConnectionUtil.getConnection();
// 2.构建Channel
Channel channel = connection.createChannel();
// 3.构建队列 (队列名,是否持久化(重启是否删除),是否是独占队列(其他连接不能占有),是否自动删除(如果长时间没使用的时候),其他参数(在图形化界面的其他参数位置))
// 必须和生产者构建的队列一样,否则就会报错
channel.queueDeclare(Publisher.QUEUE_NAME,false,false,false,null);
// 3.5 设置消息的流控 加上关闭自动ack 开启手动应答ack 就能做到消费快的消费多,不再是轮询负载
channel.basicQos(1);
// 4.监听消息 消费消息
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
// 重写handleDelivery
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者1号获取的消息:"+new String(body,"UTF-8"));
// 手动进行ack的应答 void basicAck(long deliveryTag, boolean multiple) (消息标识,是否是批量操作)
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
// 关闭自动ack
channel.basicConsume(Publisher.QUEUE_NAME,false,defaultConsumer);
// 进行阻塞
System.in.read();
}
@Test
public void consume2() throws Exception {
// 1.获取连接对象
Connection connection = RabbitMQConnectionUtil.getConnection();
// 2.构建Channel
Channel channel = connection.createChannel();
// 3.构建队列 (队列名,是否持久化(重启是否删除),是否是独占队列(其他连接不能占有),是否自动删除(如果长时间没使用的时候),其他参数(在图形化界面的其他参数位置))
// 必须和生产者构建的队列一样,否则就会报错
channel.queueDeclare(Publisher.QUEUE_NAME,false,false,false,null);
// 3.5 设置消息的流控 加上关闭自动ack 开启手动应答ack 就能做到消费快的消费多,不再是轮询负载
channel.basicQos(1);
// 4.监听消息 消费消息
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
// 重写handleDelivery
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者2号获取的消息:"+new String(body,"UTF-8"));
// 手动进行ack的应答 void basicAck(long deliveryTag, boolean multiple) (消息标识,是否是批量操作)
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
// 关闭自动ack
channel.basicConsume(Publisher.QUEUE_NAME,false,defaultConsumer);
// 进行阻塞
System.in.read();
}
}
第三种:Publisher/Subscribe
一个生产者-exchange(fanout) -RoutingKey-queue-consumer
-RoutingKey-queue-consumerDIRECT(“direct”), FANOUT(“fanout”), TOPIC(“topic”), HEADERS(“headers”)有4种
生产者:自行构建Exchange并绑定指定队列(fanout类型交换机)
消费者:和hello world消费者一样
采用fanout广播的形式,一个生产者生产的消息能够广播到多个队列种
- 生产者
// 生产者
package com.xqm.mq.test.publishSubscribe;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.xqm.mq.util.RabbitMQConnectionUtil;
import org.junit.Test;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
public class Publisher {
public static final String EXCHANGE_NAME="pubsub";
public static final String QUEUE_NAME1="pubsub-one";
public static final String QUEUE_NAME2="pubsub-two";
@Test
public void publisher() throws Exception {
// 1.获取连接对象
Connection connection = RabbitMQConnectionUtil.getConnection();
// 2.构建channel
Channel channel = connection.createChannel();
// 3.构建交换机
// Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type) throws IOException;
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
// 4.构建队列
channel.queueDeclare(QUEUE_NAME1,false,false,false,null);
channel.queueDeclare(QUEUE_NAME2,false,false,false,null);
// 5.绑定交换机和队列 使用的fanout类型的交换机,绑定方式是直接绑定
channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"");
channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"");
// 6.发消息到交换机
channel.basicPublish(EXCHANGE_NAME,"",null,"publish/subscribe".getBytes(StandardCharsets.UTF_8));
System.out.println("消息成功发送");
System.in.read();
}
}
- 消费者
也是一个队列对应一个消费者,所以和Hello World的消费者一样
第四种:Routing
Routing模式采用的exchange是direct,根据不同的Routing-Key来绑定不同的Queue,只要Routing-Key相同,那么exchange就会发送同一个消息到这些队列
在绑定exchange和queue时,指定好routingKey,同时绑定queue的时候,也需要绑定routingKey,指定routingKey的queue不存在时,消息就会丢失
- 生产者
// 生产者
package com.xqm.mq.test.routing;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.xqm.mq.util.RabbitMQConnectionUtil;
import org.junit.Test;
import java.nio.charset.StandardCharsets;
public class Publish {
public static final String EXCHANGE_NAME="routing";
public static final String QUEUE_NAME1="routing-one";
public static final String QUEUE_NAME2="routing-two";
@Test
public void publisher() throws Exception {
// 1.获取连接对象
Connection connection = RabbitMQConnectionUtil.getConnection();
// 2.构建channel
Channel channel = connection.createChannel();
// 3.构建交换机
// Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type) throws IOException;
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 4.构建队列
channel.queueDeclare(QUEUE_NAME1,false,false,false,null);
channel.queueDeclare(QUEUE_NAME2,false,false,false,null);
// 5.绑定交换机和队列 使用的fanout类型的交换机,绑定方式是直接绑定
channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"white");
channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"black");
channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"green");
// 6.发消息到交换机
channel.basicPublish(EXCHANGE_NAME,"white",null,"白色".getBytes(StandardCharsets.UTF_8));
channel.basicPublish(EXCHANGE_NAME,"black",null,"黑色".getBytes(StandardCharsets.UTF_8));
channel.basicPublish(EXCHANGE_NAME,"green",null,"绿色".getBytes(StandardCharsets.UTF_8));
System.out.println("消息成功发送");
}
}
第五种:Topics
基本都用Topics模式,通过routing-key,通过通配符匹配的方式,只要能够匹配上,一个交换机能够发送消息到不同的队列。
和Routing模式不同在于,Routing必须要完全匹配,而Topics通过占位符/通配符方式来匹配
- 生产者
// 生产者
package com.xqm.mq.test.topic;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.xqm.mq.util.RabbitMQConnectionUtil;
import org.junit.Test;
import java.nio.charset.StandardCharsets;
public class Publish {
public static final String EXCHANGE_NAME="topic";
public static final String QUEUE_NAME1="topic-one";
public static final String QUEUE_NAME2="topic-two";
@Test
public void publisher() throws Exception {
// 1.获取连接对象
Connection connection = RabbitMQConnectionUtil.getConnection();
// 2.构建channel
Channel channel = connection.createChannel();
// 3.构建交换机
// Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type) throws IOException;
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
// 4.构建队列
channel.queueDeclare(QUEUE_NAME1,false,false,false,null);
channel.queueDeclare(QUEUE_NAME2,false,false,false,null);
// 5.绑定交换机和队列 topic类型的交换机,和队列绑定时,需要以aaa.bbb.ccc方式编写
// 其中*表示占位符,#相当于通配符,如果发送aaa.orange.rabbit,那么匹配到*.orange.*和*.*.rabbit两个队列
channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"*.orange.*");
channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"*.*.rabbit");
channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"lazy.#");
// 6.发消息到交换机
channel.basicPublish(EXCHANGE_NAME,"big.orange.rabbit",null,"大橙兔子".getBytes(StandardCharsets.UTF_8));
channel.basicPublish(EXCHANGE_NAME,"small.white.rabbit",null,"小白兔".getBytes(StandardCharsets.UTF_8));
channel.basicPublish(EXCHANGE_NAME,"lazy.dog.dog",null,"懒狗".getBytes(StandardCharsets.UTF_8));
System.out.println("消息成功发送");
}
}
第六种:RPC
使用的不是很多,了解
client server
因为两个服务在交互时,可以尽量做到client和Server的解耦,通过RabbitMQ进行解耦操作
需要让client发送消息时,携带两个属性:
replyTo:告知server将消息放到哪个队列中
corrlationId:告知Server发送相应消息时,需要携带的唯一标识来告知client响应的信息
- 生产者
// publisher
package com.xqm.mq.test.rpc;
import com.rabbitmq.client.*;
import com.xqm.mq.util.RabbitMQConnectionUtil;
import org.junit.Test;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
public class Publisher {
public static final String QUEUE_PUBLISHER="rpc_publisher";
public static final String QUEUE_CONSUMER="rpc_consumer";
@Test
public void publish() throws Exception {
// 1.获取连接对象
Connection connection = RabbitMQConnectionUtil.getConnection();
// 2.构建Channel
Channel channel = connection.createChannel();
// 3.构建队列 (队列名,是否持久化(重启是否删除),是否是独占队列(其他连接不能占有),是否自动删除(如果长时间没使用的时候),其他参数(在图形化界面的其他参数位置))
channel.queueDeclare(QUEUE_PUBLISHER,false,false,false,null);
channel.queueDeclare(QUEUE_CONSUMER,false,false,false,null);
// 4.发布消息 默认交换机就是""
String message="hello RPC";
String uuid = UUID.randomUUID().toString();
AMQP.BasicProperties properties=new AMQP.BasicProperties()
.builder()
.replyTo(QUEUE_CONSUMER) // 需要监听的队列
.correlationId(uuid) // 唯一标识
.build();
// 将properties放进去
channel.basicPublish("",QUEUE_PUBLISHER,properties,message.getBytes(StandardCharsets.UTF_8));
// 将自动ack关闭 生产者监听是否是给它的响应信息
channel.basicConsume(QUEUE_CONSUMER,false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String correlationId = properties.getCorrelationId();
if (correlationId !=null && correlationId.equalsIgnoreCase(uuid)){
System.out.println("接收到服务端的响应:"+new String(body,"UTF-8"));
}
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
System.out.println("消息已发送");
// 进行阻塞
System.in.read();
}
}
- 消费者
// consumer
package com.xqm.mq.test.rpc;
import com.rabbitmq.client.*;
import com.xqm.mq.test.helloworld.Publisher;
import com.xqm.mq.util.RabbitMQConnectionUtil;
import org.junit.Test;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
public class Consumer {
public static final String QUEUE_PUBLISHER="rpc_publisher";
public static final String QUEUE_CONSUMER="rpc_consumer";
@Test
public void consume() throws Exception {
// 1.获取连接对象
Connection connection = RabbitMQConnectionUtil.getConnection();
// 2.构建Channel
Channel channel = connection.createChannel();
// 3.构建队列 (队列名,是否持久化(重启是否删除),是否是独占队列(其他连接不能占有),是否自动删除(如果长时间没使用的时候),其他参数(在图形化界面的其他参数位置))
// 必须和生产者构建的队列一样,否则就会报错
channel.queueDeclare(QUEUE_PUBLISHER,false,false,false,null);
channel.queueDeclare(QUEUE_CONSUMER,false,false,false,null);
// 4.监听消息 消费消息
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
// 重写handleDelivery
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者获取的消息:"+new String(body,"UTF-8"));
String response="获取到client发出的请求,这里是响应的信息";
String replyQueueName = properties.getReplyTo();
String uuid = properties.getCorrelationId();
AMQP.BasicProperties prop=new AMQP.BasicProperties()
.builder()
.correlationId(uuid)
.build();
channel.basicPublish("",replyQueueName,prop,response.getBytes(StandardCharsets.UTF_8));
// 手动响应ack消息
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 监听publish
channel.basicConsume(QUEUE_PUBLISHER,false,defaultConsumer);
// 进行阻塞
System.in.read();
}
}
第七种:Publisher Confirms
这是用来保证消息从Publisher到Exchange的机制,下面有详细讲解。
Exchange的第四种模式:Headers
headers就是一个基于key-value的方式,让Exchange和Queue绑定的到一起的一种规则
相比Topic形式,可以采用的类型更丰富。
如果x-match=all,那么传过来的消息必须全部匹配,也就是name和age都相等
如果x-match=any,那么只要满足一个条件就行
- 生产者
package com.xqm.mq.test.headers;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.xqm.mq.util.RabbitMQConnectionUtil;
import org.junit.Test;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
public class Publisher {
public static final String HEADERS_EXCHANGE = "header-exchange";
public static final String HEADERS_QUEUE = "header-queue";
@Test
public void publish() throws Exception {
// 1.获取连接对象
Connection connection = RabbitMQConnectionUtil.getConnection();
// 2.构建Channel
Channel channel = connection.createChannel();
// 3.构建交换机和队列,基于header的方式绑定
channel.exchangeDeclare(HEADERS_EXCHANGE, BuiltinExchangeType.HEADERS);
channel.queueDeclare(HEADERS_QUEUE, true, false, false, null);
Map<String, Object> map = new HashMap<>();
map.put("x-match", "all");
map.put("name", "jack");
map.put("age","12");
channel.queueBind(HEADERS_QUEUE, HEADERS_EXCHANGE, "", map);
// 发送消息
String msg="headers测试消息";
Map<String, Object> headers=new HashMap<>();
// 必须携带这个消息,否则消息就会发送不到queue
headers.put("name", "jack");
headers.put("age","12");
AMQP.BasicProperties properties=new AMQP.BasicProperties()
.builder()
.headers(headers)
.build();
// 这里和routingKey没关系了
channel.basicPublish(HEADERS_EXCHANGE,"",properties,msg.getBytes(StandardCharsets.UTF_8));
System.out.println("发送消息成功"+headers);
}
}
SpringBoot整合RabbitMQ
依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
yml
server:
port: 8081
spring:
rabbitmq:
host: 192.168.49.131
port: 5672
username: guest
password: guest
virtual-host: /
listener:
simple:
# 开启手动ack,默认是自动auto
acknowledge-mode: manual
# 消费者每次拿10条消息进行消费
prefetch: 10
# 多少个线程并发执行
# concurrency: 2
配置类
// 配置类
package com.xqm.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
public static final String EXCHANGE="boot-exchange";
public static final String QUEUE="boot-queue";
public static final String ROUTING_KEY="*.black.*";
/**
* 声明交换机
* @return
*/
@Bean
public Exchange bootExchange(){
return ExchangeBuilder.topicExchange(EXCHANGE).build();
}
/**
* 声明队列
* @return
*/
@Bean
public Queue bootQueue(){
return QueueBuilder.durable(QUEUE).build();
}
/**
* 绑定交换机和队列
* @return
*/
@Bean
public Binding bootBinding(Exchange bootExchange,Queue bootQueue){
return BindingBuilder.bind(bootQueue).to(bootExchange).with(ROUTING_KEY).noargs();
}
}
生产者
package com.xqm.rabbitmq;
import com.xqm.rabbitmq.config.RabbitMQConfig;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class PublisherTest {
@Autowired
public RabbitTemplate rabbitTemplate;
@Test
public void publish(){
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE,"big.black.dog","message");
System.out.println("消息发送成功");
}
@Test
public void publishWithProperties(){
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE, "big.black.dog", "messageWithProperties", new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setCorrelationId("1234");
return message;
}
});
System.out.println("消息发送成功");
}
}
消费者
// 配置文件中开启手动ack
spring:
rabbitmq:
host: 192.168.49.131
port: 5672
username: guest
password: guest
virtual-host: /
listener:
simple:
# 开启手动ack,默认是自动auto
acknowledge-mode: manual
# 消费者每次拿10条消息进行消费
prefetch: 10
# 多少个线程并发执行
# concurrency: 2
java实现
package com.xqm.rabbitmq;
import com.rabbitmq.client.Channel;
import com.xqm.rabbitmq.config.RabbitMQConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Configuration;
import java.io.IOException;
@Configuration
public class ConsumerListener {
@RabbitListener(queues = RabbitMQConfig.QUEUE)
public void consume(String msg, Channel channel, Message message) throws IOException {
System.out.println("队列的消息为:"+msg);
String correlationId = message.getMessageProperties().getCorrelationId();
System.out.println("唯一标识为:"+correlationId);
// 手动ack应答
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
结果
队列的消息为:message
唯一标识为:null
队列的消息为:messageWithProperties
唯一标识为:1234
RabbitMQ保证消息可靠性
publisher->exchange
confirm机制,保证生产者到交换机消息的可靠性。
一共有三种,选取了异步回调的方式。
package com.xqm.mq.test.confirms;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.xqm.mq.util.RabbitMQConnectionUtil;
import org.junit.Test;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
public class Publisher {
public static final String QUEUE_NAME="confirms";
@Test
public void publisher() throws Exception {
// 1.获取连接对象
Connection connection = RabbitMQConnectionUtil.getConnection();
// 2.构建channel
Channel channel = connection.createChannel();
// 3.构建队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
// 4.开启confirms机制
channel.confirmSelect();
// 5.设置confirms的异步回调
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("消息成功发送到交换机");
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("消息没有成功发送到交换机,重试或者保存到数据库");
}
});
// 6.发消息到交换机
String message="confirms";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));
System.out.println("消息成功发送");
System.in.read();
}
}
exchange-(routing)-queue
Return机制,保证消息可以路由到队列
package com.xqm.mq.test.confirms;
import com.rabbitmq.client.*;
import com.xqm.mq.util.RabbitMQConnectionUtil;
import org.junit.Test;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
/**
* return机制保证exchange发送消息到queue,但是需要basicPublish新增参数
*/
public class PublisherReturn {
public static final String QUEUE_NAME="confirms";
@Test
public void publisher() throws Exception {
// 1.获取连接对象
Connection connection = RabbitMQConnectionUtil.getConnection();
// 2.构建channel
Channel channel = connection.createChannel();
// 3.构建队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
// 4.开启confirms机制
channel.confirmSelect();
// 5.设置confirms的异步回调
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("消息成功发送到交换机");
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("消息没有成功发送到交换机,重试或者保存到数据库");
}
});
// 6.设置return回调,确认消息是否路由到queue
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消息没有路由到指定队列中执行此方法,做其他的补偿措施");
System.out.println("消息没有送达到队列中");
}
});
// 7.发消息到交换机
String message="confirms";
channel.basicPublish("","11",true,null,message.getBytes(StandardCharsets.UTF_8));
System.out.println("消息成功发送");
System.in.read();
}
}
//7. 在发送消息时,将basicPublish方法参数中的mandatory设置为true,即可开启Return机制,当消息没有路由到队列中时,就会执行return回调
queue
使用DeliveryMode设置消息持久化。
DeliveryMode设置为2代表持久化,如果设置为1,就代表不会持久化。
package com.xqm.mq.test.confirms;
import com.rabbitmq.client.*;
import com.xqm.mq.util.RabbitMQConnectionUtil;
import org.junit.Test;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
/**
* 设置消息持久化
*/
public class PublisherDurable {
public static final String QUEUE_NAME="confirms";
@Test
public void publisher() throws Exception {
// 1.获取连接对象
Connection connection = RabbitMQConnectionUtil.getConnection();
// 2.构建channel
Channel channel = connection.createChannel();
// 3.构建队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
// 4.开启confirms机制
channel.confirmSelect();
// 5.设置confirms的异步回调
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("消息成功发送到交换机");
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("消息没有成功发送到交换机,重试或者保存到数据库");
}
});
// 6.设置return回调,确认消息是否路由到queue
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消息没有路由到指定队列中执行此方法,做其他的补偿措施");
System.out.println("消息没有送达到队列中");
}
});
// 7.设置消息持久化
AMQP.BasicProperties properties=new AMQP.BasicProperties().builder()
.deliveryMode(2)
.build();
// 8.发消息到交换机
String message="confirms";
channel.basicPublish("","11",true,properties,message.getBytes(StandardCharsets.UTF_8));
System.out.println("消息成功发送");
System.in.read();
}
}
consumer
消费者可以正常消费队列中的消息,使用主动ack的模式。
详细的参考WorkQueue模式
springboot实现消息可靠性
confirm
- yml
spring:
rabbitmq:
publisher-confirm-type: correlated # 新版本
publisher-confirms: true # 老版本
- 在发送消息时,配置RabbitTemplate
@Test
public void publishWithConfirms() throws IOException {
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if(ack){
System.out.println("消息已经送达到交换机!!");
}else{
System.out.println("消息没有送达到Exchange,需要做一些补偿操作!!retry!!!");
}
}
});
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE,"big.black.dog","message");
System.out.println("消息发送成功");
System.in.read();
}
return
- yml
spring:
rabbitmq:
publisher-returns: true # 开启Return机制
- 在发送消息时,配置RabbitTemplate
@Test
public void publishWithReturn() throws IOException {
// 新版本用 setReturnsCallback ,老版本用setReturnCallback
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returned) {
String msg = new String(returned.getMessage().getBody());
System.out.println("消息:" + msg + "路由队列失败!!做补救操作!!");
}
});
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE,"big.black.dog","message");
System.out.println("消息发送成功");
System.in.read();
}
消息持久化
@Test
public void publishWithBasicProperties() throws IOException {
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE, "big.black.dog", "message", new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 设置消息的持久化!
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return message;
}
});
System.out.println("消息发送成功");
}
死信队列&延迟交换机
概念
消息被消费者拒绝(nack、reject),requence设置为false(这样被拒绝的消息不会再放到队列中了)
# 死信队列的应用
1.基于死信队列,即使队列消息已满的情况下,消息也不会丢失
2.实现延迟消费的效果:比如下订单,有15分钟的付款时间
springboot实现死信队列
进入死信队列三种方式:
- 消息被消费者拒绝
- 消息过期时间到了
- 设置队列长度,队列已满,消息进不来就会进去死信队列
死信队列配置文件
// 死信队列配置
package com.xqm.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DeadLetterConfig {
public static final String NORMAL_EXCHANGE="normal-exchange";
public static final String NORMAL_QUEUE="normal-queue";
public static final String NORMAL_ROUTING_KEY="normal.#";
public static final String DEAD_EXCHANGE="dead-exchange";
public static final String DEAD_QUEUE="dead-queue";
public static final String DEAD_ROUTING_KEY="dead.#";
@Bean
public Exchange normalExchange(){
return ExchangeBuilder.topicExchange(NORMAL_EXCHANGE).build();
}
@Bean
public Queue normalQueue(){
// 队列绑定死信交换机和死信routing
return QueueBuilder.durable(NORMAL_QUEUE).deadLetterExchange(DEAD_EXCHANGE).deadLetterRoutingKey("dead.abc").build();
}
@Bean
public Binding normalBinding(Exchange normalExchange,Queue normalQueue){
return BindingBuilder.bind(normalQueue).to(normalExchange).with(NORMAL_ROUTING_KEY).noargs();
}
@Bean
public Exchange deadExchange(){
return ExchangeBuilder.topicExchange(DEAD_EXCHANGE).build();
}
@Bean
public Queue deadQueue(){
return QueueBuilder.durable(DEAD_QUEUE).build();
}
@Bean
public Binding deadBinding(Exchange deadExchange,Queue deadQueue){
return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY).noargs();
}
}
生产者
// 生产者
package com.xqm.rabbitmq;
import com.xqm.rabbitmq.config.DeadLetterConfig;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class DeadPublisher {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void publish(){
String msg="send deadLetter";
rabbitTemplate.convertAndSend(DeadLetterConfig.NORMAL_EXCHANGE,"normal.abc",msg);
}
}
消费者
消费者通过拒绝消费消息,使消息进入死信队列
// 消费者--通过拒绝消息
package com.xqm.rabbitmq;
import com.rabbitmq.client.Channel;
import com.xqm.rabbitmq.config.DeadLetterConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class DeadConsumer {
@RabbitListener(queues = DeadLetterConfig.NORMAL_QUEUE)
public void consumer(String msg, Channel channel, Message message) throws IOException {
System.out.println("接受到的normal队列的消息为:"+msg);
// reject ack 和下面那种二选一
channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
// no ack
// channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
}
}
消息设置过期时间
消息过期后,也会被丢进死信队列,如果设置过期时间,必须按照时间先后依次进入死信队列。
有两种方式:一是给单个消息设置过期时间;而是给所有消息设置过期时间
单个消息设置过期时间:
// 给消息设置生存时间 DeadPublisher.class
// 有一个问题,如果第一个消息30s延时,第二个消息5s延时,那么第二个消息必须等第一个消息时间到了之后才能轮到它,因此需要延迟交换机
// 携带生存时间的消息发送者
@Test
public void publishExpire(){
String msg="send letter expire";
rabbitTemplate.convertAndSend(DeadLetterConfig.NORMAL_EXCHANGE, "normal.abc", msg, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 设置5秒的过期时间,5s没有消费就会放入到死信队列中,指定单一的消息生存时间
message.getMessageProperties().setExpiration("5000");
return message;
}
});
}
所有消息设置过期时间:使用ttl给exchange中所有消息设置过期时间
// ttl 给队列中的所有消息设置生存时间 @Configuration public class DeadLetterConfig {}
@Bean
public Queue normalQueue(){
// 队列绑定死信交换机
return QueueBuilder.durable(NORMAL_QUEUE).
deadLetterExchange(DEAD_EXCHANGE).
deadLetterRoutingKey("dead.abc")
// 队列中所有消息生存10s
// .ttl(10000)
.build();
}
设置队列长度
当队列已满,消息进不来就会进入死信队列
// maxLength 队列最大长度,如果超过长度就会被送到死信队列
@Bean
public Queue normalQueue(){
// 队列绑定死信交换机
return QueueBuilder.durable(NORMAL_QUEUE).
deadLetterExchange(DEAD_EXCHANGE).
deadLetterRoutingKey("dead.abc")
// 队列中所有消息生存10s
// .ttl(10000)\
// 队列中最大长度,超出长度就会到死信队列
// .maxLength(1)
.build();
}
其他
设置delivery-limit参数,如果一个消息被频繁拒绝,也会被送入到死信队列
延迟交换机(插件)
网址:https://www.rabbitmq.com//community-plugins.html 搜索rabbitmq_delayed_message_exchange
下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/tag/3.8.9
下载插件
# 下载插件,放到容器中
[root@localhost xqm]# docker cp rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez rabbitmq:/opt/rabbitmq/plugins
root@721622f2a085:/opt/rabbitmq/plugins# cd ../sbin/
root@721622f2a085:/opt/rabbitmq/sbin# ls
rabbitmq-defaults rabbitmq-diagnostics rabbitmq-env rabbitmq-plugins rabbitmq-queues rabbitmq-server rabbitmq-upgrade rabbitmqctl
# 开启插件
root@721622f2a085:/opt/rabbitmq/sbin# rabbitmq-plugins enable rabbitmq_delayed_message_exchange
配置类
// 配置类
package com.xqm.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class DelayedConfig {
public static final String DELAY_EXCHANGE="delay_exchange";
public static final String DELAY_QUEUE="delay_queue";
public static final String DELAY_ROUTING_KEY="delay.#";
@Bean
public Exchange delayExchange(){
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-delayed-type","topic");
Exchange exchange=new CustomExchange(DELAY_EXCHANGE,"x-delayed-message",true,false,arguments);
return exchange;
}
@Bean
public Queue delayQueue(){
return QueueBuilder.durable(DELAY_QUEUE).build();
}
@Bean
public Binding delayBinding(Queue delayQueue,Exchange delayExchange){
return BindingBuilder.bind(delayQueue).to(delayExchange).with(DELAY_ROUTING_KEY).noargs();
}
}
生产者
// publisher
package com.xqm.rabbitmq;
import com.xqm.rabbitmq.config.DelayedConfig;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class DelayPublisher {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void publish(){
rabbitTemplate.convertAndSend(DelayedConfig.DELAY_EXCHANGE, "delay.abc", "xxx", new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 30s
message.getMessageProperties().setDelay(30000);
return message;
}
});
}
@Test
public void publishT(){
rabbitTemplate.convertAndSend(DelayedConfig.DELAY_EXCHANGE, "delay.abc", "xxx", new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 30s
message.getMessageProperties().setDelay(5000);
return message;
}
});
}
}
结论
死信队列实现延迟消费时,如果延迟时间比较复杂,比较多,直接使用死信队列时,需要创建大量的队列还对应不同的时间,可以采用延迟交换机来解决这个问题。
RabbitMQ集群
rabbitMQ镜像模式
docker-compose
rabbitmq1:
version: '3.1'
services:
rabbitmq1:
image: rabbitmq:3.8.5-management-alpine
container_name: rabbitmq1
hostname: rabbitmq1
extra_hosts:
- "rabbitmq1:192.168.11.32"
- "rabbitmq2:192.168.11.33"
environment:
- RABBITMQ_ERLANG_COOKIE=SDJHFGDFFS
ports:
- 5672:5672
- 15672:15672
- 4369:4369
- 25672:25672
rabbitmq2:
version: '3.1'
services:
rabbitmq2:
image: rabbitmq:3.8.5-management-alpine # alpine占用资源少
container_name: rabbitmq2
hostname: rabbitmq2
extra_hosts:
- "rabbitmq1:192.168.11.32"
- "rabbitmq2:192.168.11.33"
environment:
- RABBITMQ_ERLANG_COOKIE=SDJHFGDFFS # 后面写什么无所谓,但是cookie必须要一样
ports:
- 5672:5672
- 15672:15672
- 4369:4369 # 集群交互需要用到的端口
- 25672:25672 # 集群交互需要用到的端口
执行
docker-compose up -d
RabbitMQ实现join操作
# 需要四个命令完成join操作
# 让rabbitmq2 join rabbitmq1,需要进入到rabbitmq2的容器内部,去执行下述命令 docker exec -it name /bin/bash
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@rabbitmq1
rabbitmqctl start_app
设置镜像模式
在指定的RabbitMQ服务中设置好镜像策略即可
name:随便写
Pattern:^ # 代表所有
Apply to:Exchanges and queues
Priority:不需要写
Definition: ha-mode=all
ha-sync-mode=automatic