RabbitMQ

官网

https://www.rabbitmq.com/
https://www.rabbitmq.com/tutorials/amqp-concepts.html

RabbitMQ介绍

RabbitMQ介绍

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。

Erlang:Erlang是一种通用的面向并发的编程语言,Erlang充分发挥CPU的性能,延迟特别低,相比其他的MQ(Kafka,RocketMQ)延迟是最低的。

RabbitMQ支持多种语言通讯:Java,Python…………都有响应的API

RabbitMQ支持海量的插件去实现一些特殊功能(延迟交换机),RabbitMQ自带了一款图形化界面,操作异常的简单。

RabbitMQ应用场景

  • 排队算法:使用消息队列特性
  • 秒杀活动:使用消息队列特性
  • 消息分发:使用消息异步特性
  • 异步处理:使用消息异步特性
  • 数据同步:使用消息异步特性
  • 处理耗时任务:使用消息异步特性
  • 流量削峰

AMQP是什么

AMQP(Advanced Message Queuing protocol,高级消息队列协议)是进程之间传递异步消息的网络协议。

AMQP工作过程

中间的白框就是AMQP

image.png

工作过程:发布者(publisher) 发布消息(Message),经过交换机(Exchange),交换机根据路由规则将收到的消息分发给交换机绑定的队列(Queue),最后AMQP代理会将消息投递给订阅了此队列的消费者,或者由消费者按照需求自行获取。

RabbitMQ安装

docker-compose.yml

编写docker-compose.yml文件

version: "3.1"
services:
  rabbitmq:
    image: daocloud.io/library/rabbitmq:3.8.5
    container_name: rabbitmq
    restart: always
    volumes:
      - /usr/local/xqm/rabbitmq/data/:/var/lib/rabbitmq/
    ports:
      - 5672:5672
      # 图形化界面的端口
      - 15672:15672

安装docker-compose

安装的版本是1.27.4,可自行去查找其他版本

curl -L https://get.daocloud.io/docker/compose/releases/download/1.27.4/docker-compose-`uname -s`-`uname -m` > /usr/local/bin/docker-compose

chmod +x /usr/local/bin/docker-compose
docker-compose --version

运行

docker-compose up -d

测试

[root@localhost rabbitmq]# curl localhost:5672
AMQP 
表示正常运行curl localhost:5672

启动图形化界面

默认RabbitMQ的图形化界面是关闭的,需要自行打开图形化这个插件

docker exec -it rabbitmq /bin/bash
# 进入/opt/rabbitmq/sbin/ 启动插件
root@721622f2a085:/opt/rabbitmq/sbin# ./rabbitmq-plugins enable rabbitmq_management
Enabling plugins on node rabbit@721622f2a085:
rabbitmq_management
The following plugins have been configured:
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_web_dispatch
Applying plugin configuration to rabbit@721622f2a085...
The following plugins have been enabled:
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_web_dispatch

started 3 plugins.

访问图形化界面

访问15672端口:默认的用户名和密码均为:`guest`
192.168.49.131:15672

RabbitMQ架构

image.png

  • publisher:消息的生产者,也是一个向交换机发布消息的客户端应用程序
  • consumer:消息的消费者,也是一个从消息队列中取得消息的客户端应用程序
  • message:消息。由消息头消息体组成。消息头由一系列可选属性组成。包括:routing-key(路由)、priority(优先权)、delivery-mode(消息是否持久性存储)等等
  • exchange:交换机,一共有四种:direct(发布订阅完全匹配)、fanout(广播)、topic(主题,规则匹配)、headers(基本不用)
  • routes/routeing-key:路由,rabbitMQ通过路由规则来决定将exchange中消息投放到哪个队列中,可以是一对一投放,一对多投放,多对多投放。通过不同的exchange类型来实现
  • queue:消息队列
  • connection:连接,通rabbitMQ服务器和服务创建TCP连接
  • channel:频道/信道。
    • 是TCP里面的虚拟链接。比如:电缆相当于TCP连接,channel相当光纤束。
    • 一个connection中可以创建多个channel
    • TCP一旦打开,就会创建AMQP信道
    • 无论是发布消息、接收消息、订阅队列,都是通过channel来完成的
  • binding:绑定,用于消息队列和交换器之间关联关系。一个绑定就是一个routes将exchange和queue连接起来的路由规则
  • virtual-host:虚拟主机,可以有多个exchange,表示一批交换机。
    • RabbitMQ默认的虚拟主机是/
    • 每个vHost本质都是一个mini版本的rabbitMQ服务器,拥有自己的队列、交换器、绑定和权限机制
  • Borker:表示消息队列服务器实体

RabbitMQ的通讯方式

七种通讯方式

RabbitMQ一共有七种通讯方式。

通讯方式官网:https://www.rabbitmq.com/getstarted.html

# 交换机 DIRECT("direct"), FANOUT("fanout"), TOPIC("topic"), HEADERS("headers")有4种
hello world:
	一个生产者-默认交换机-RoutingKey-queue-一个消费者
work queues:
	一个生产者-默认交换机-RoutingKey-queue-多个消费者(采用轮询的机制来消费一个队列中的数据)
publish/subscribe: 交换机能够分发消息到不同的队列,routingKey无所谓,随便填,发送一条消息,那么所有的队列全部有
	一个生产者-exchange(fanout) -RoutingKey-queue-consumer
				   -RoutingKey-queue-consumer
routing: 通过routingKey分发给不同的队列
	一个生产者-exchange(direct) -RoutingKey(orange)-queue-consumer
							  -RoutingKey(apple)-queue-consumer
topics: 通过routingKey分发给不同的队列, *代表的是占位符,#代表的是通配符
	一个生产者-exchange(topic) -RoutingKey(*.orange.*)-queue-consumer
							  -RoutingKey(*.*.apple)-queue-consumer
							   -RoutingKey(lazy.#)-queue-consumer

连接IDEA

  • 导入依赖:amqp-client,junit
<dependencies>
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.9.0</version>
    </dependency>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
    </dependency>
</dependencies>

创建连接工具类

// 连接工具类
package com.xqm.mq.util;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


public class RabbitMQConnectionUtil {

    // ip
    public static final String RABBITMQ_HOST="192.168.49.131";
    // port
    public static final int RABBITMQ_PORT=5672;
    // username
    public static final String RABBITMQ_USERNAME="guest";
    // password
    public static final String RABBITMQ_PASSWORD="guest";
    // virtual host
    public static final String RABBITMQ_VIRTUAL_HOST="/";


    /**
     * 构建rabbitmq的连接对象
     * @return
     */
    public static Connection getConnection() throws Exception {
        // 1.创建connection工场
        ConnectionFactory factory=new ConnectionFactory();
        // 2.设置RabbitMQ连接信息
        factory.setHost(RABBITMQ_HOST);
        factory.setPort(RABBITMQ_PORT);
        factory.setUsername(RABBITMQ_USERNAME);
        factory.setPassword(RABBITMQ_PASSWORD);
        factory.setVirtualHost(RABBITMQ_VIRTUAL_HOST);
        // 3.返回连接对象
        return factory.newConnection();
    }
}

第一种:Hello world

// 一个生产者 一个消费者 使用的是默认交换机

生产者–默认交换机-RoutingKey-queue-消费者

  • 生产者
// 生产者
package com.xqm.mq.test.helloworld;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.xqm.mq.util.RabbitMQConnectionUtil;
import org.junit.Test;

import java.nio.charset.StandardCharsets;

public class Publisher {

    public static final String QUEUE_NAME="hello";

    @Test
    public void publish() throws Exception {
        // 1.获取连接对象
        Connection connection = RabbitMQConnectionUtil.getConnection();
        // 2.构建Channel
        Channel channel = connection.createChannel();
        // 3.构建队列  (队列名,是否持久化(重启是否删除),是否是独占队列(其他连接不能占有),是否自动删除(如果长时间没使用的时候),其他参数(在图形化界面的其他参数位置))
        // queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) throws IOException
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        // 4.发布消息 默认交换机就是""
        String message="hello world";
        channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));
        System.out.println("消息已发送");

        // 进行阻塞
        System.in.read();

    }
}
  • 消费者
// 消费者
package com.xqm.mq.test.helloworld;

import com.rabbitmq.client.*;
import com.xqm.mq.util.RabbitMQConnectionUtil;
import org.junit.Test;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

public class Consumer {

    @Test
    public void consume() throws Exception {
        // 1.获取连接对象
        Connection connection = RabbitMQConnectionUtil.getConnection();
        // 2.构建Channel
        Channel channel = connection.createChannel();
        // 3.构建队列  (队列名,是否持久化(重启是否删除),是否是独占队列(其他连接不能占有),是否自动删除(如果长时间没使用的时候),其他参数(在图形化界面的其他参数位置))
        // 必须和生产者构建的队列一样,否则就会报错
        channel.queueDeclare(Publisher.QUEUE_NAME,false,false,false,null);
        // 4.监听消息 消费消息
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            // 重写handleDelivery
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者获取的消息:"+new String(body,"UTF-8"));
            }
        };
        channel.basicConsume(Publisher.QUEUE_NAME,true,defaultConsumer);

        // 进行阻塞
        System.in.read();
    }
}

第二种:Work queues

  • 生产者
// 生产者,生产10条消息
package com.xqm.mq.test.workqueues;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.xqm.mq.util.RabbitMQConnectionUtil;
import org.junit.Test;

import java.nio.charset.StandardCharsets;

public class Publisher {

    public static final String QUEUE_NAME="work";

    @Test
    public void publish() throws Exception {
        // 1.获取连接对象
        Connection connection = RabbitMQConnectionUtil.getConnection();
        // 2.构建Channel
        Channel channel = connection.createChannel();
        // 3.构建队列  (队列名,是否持久化(重启是否删除),是否是独占队列(其他连接不能占有),是否自动删除(如果长时间没使用的时候),其他参数(在图形化界面的其他参数位置))
        // queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) throws IOException
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        // 4.发布消息 默认交换机就是"" 发送10个消息
        for (int i = 0; i < 10; i++) {
            String message="hello world"+i;
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));
        }
        System.out.println("消息已发送");

        // 进行阻塞
        System.in.read();

    }
}
  • 消费者
// 两个消费者

package com.xqm.mq.test.workqueues;

import com.rabbitmq.client.*;
import com.xqm.mq.util.RabbitMQConnectionUtil;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

public class Consumer {

    @Test
    public void consume1() throws Exception {
        // 1.获取连接对象
        Connection connection = RabbitMQConnectionUtil.getConnection();
        // 2.构建Channel
        Channel channel = connection.createChannel();
        // 3.构建队列  (队列名,是否持久化(重启是否删除),是否是独占队列(其他连接不能占有),是否自动删除(如果长时间没使用的时候),其他参数(在图形化界面的其他参数位置))
        // 必须和生产者构建的队列一样,否则就会报错
        channel.queueDeclare(Publisher.QUEUE_NAME,false,false,false,null);

        // 3.5 设置消息的流控  加上关闭自动ack 开启手动应答ack 就能做到消费快的消费多,不再是轮询负载
        channel.basicQos(1);

        // 4.监听消息 消费消息
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            // 重写handleDelivery
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    TimeUnit.MILLISECONDS.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("消费者1号获取的消息:"+new String(body,"UTF-8"));
                //  手动进行ack的应答  void basicAck(long deliveryTag, boolean multiple)   (消息标识,是否是批量操作)
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        // 关闭自动ack
        channel.basicConsume(Publisher.QUEUE_NAME,false,defaultConsumer);

        // 进行阻塞
        System.in.read();
    }


    @Test
    public void consume2() throws Exception {
        // 1.获取连接对象
        Connection connection = RabbitMQConnectionUtil.getConnection();
        // 2.构建Channel
        Channel channel = connection.createChannel();
        // 3.构建队列  (队列名,是否持久化(重启是否删除),是否是独占队列(其他连接不能占有),是否自动删除(如果长时间没使用的时候),其他参数(在图形化界面的其他参数位置))
        // 必须和生产者构建的队列一样,否则就会报错
        channel.queueDeclare(Publisher.QUEUE_NAME,false,false,false,null);


        // 3.5 设置消息的流控  加上关闭自动ack 开启手动应答ack 就能做到消费快的消费多,不再是轮询负载
        channel.basicQos(1);

        // 4.监听消息 消费消息
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            // 重写handleDelivery
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("消费者2号获取的消息:"+new String(body,"UTF-8"));
                //  手动进行ack的应答  void basicAck(long deliveryTag, boolean multiple)   (消息标识,是否是批量操作)
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        // 关闭自动ack
        channel.basicConsume(Publisher.QUEUE_NAME,false,defaultConsumer);

        // 进行阻塞
        System.in.read();
    }
}

第三种:Publisher/Subscribe

一个生产者-exchange(fanout) -RoutingKey-queue-consumer
-RoutingKey-queue-consumer

DIRECT(“direct”), FANOUT(“fanout”), TOPIC(“topic”), HEADERS(“headers”)有4种

生产者:自行构建Exchange并绑定指定队列(fanout类型交换机)

消费者:和hello world消费者一样

采用fanout广播的形式,一个生产者生产的消息能够广播到多个队列种

  • 生产者
// 生产者
package com.xqm.mq.test.publishSubscribe;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.xqm.mq.util.RabbitMQConnectionUtil;
import org.junit.Test;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

public class Publisher {

    public static final String EXCHANGE_NAME="pubsub";
    public static final String QUEUE_NAME1="pubsub-one";
    public static final String QUEUE_NAME2="pubsub-two";

    @Test
    public void publisher() throws Exception {
        // 1.获取连接对象
        Connection connection = RabbitMQConnectionUtil.getConnection();
        // 2.构建channel
        Channel channel = connection.createChannel();
        // 3.构建交换机
        // Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type) throws IOException;
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
        // 4.构建队列
        channel.queueDeclare(QUEUE_NAME1,false,false,false,null);
        channel.queueDeclare(QUEUE_NAME2,false,false,false,null);
        // 5.绑定交换机和队列  使用的fanout类型的交换机,绑定方式是直接绑定
        channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"");
        channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"");
        // 6.发消息到交换机
        channel.basicPublish(EXCHANGE_NAME,"",null,"publish/subscribe".getBytes(StandardCharsets.UTF_8));
        System.out.println("消息成功发送");
        System.in.read();
    }
}
  • 消费者

也是一个队列对应一个消费者,所以和Hello World的消费者一样

第四种:Routing

Routing模式采用的exchange是direct,根据不同的Routing-Key来绑定不同的Queue,只要Routing-Key相同,那么exchange就会发送同一个消息到这些队列

在绑定exchange和queue时,指定好routingKey,同时绑定queue的时候,也需要绑定routingKey,指定routingKey的queue不存在时,消息就会丢失

  • 生产者
// 生产者

package com.xqm.mq.test.routing;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.xqm.mq.util.RabbitMQConnectionUtil;
import org.junit.Test;

import java.nio.charset.StandardCharsets;

public class Publish {

    public static final String EXCHANGE_NAME="routing";
    public static final String QUEUE_NAME1="routing-one";
    public static final String QUEUE_NAME2="routing-two";

    @Test
    public void publisher() throws Exception {
        // 1.获取连接对象
        Connection connection = RabbitMQConnectionUtil.getConnection();
        // 2.构建channel
        Channel channel = connection.createChannel();
        // 3.构建交换机
        // Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type) throws IOException;
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        // 4.构建队列
        channel.queueDeclare(QUEUE_NAME1,false,false,false,null);
        channel.queueDeclare(QUEUE_NAME2,false,false,false,null);
        // 5.绑定交换机和队列  使用的fanout类型的交换机,绑定方式是直接绑定
        channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"white");
        channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"black");
        channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"green");
        // 6.发消息到交换机
        channel.basicPublish(EXCHANGE_NAME,"white",null,"白色".getBytes(StandardCharsets.UTF_8));
        channel.basicPublish(EXCHANGE_NAME,"black",null,"黑色".getBytes(StandardCharsets.UTF_8));
        channel.basicPublish(EXCHANGE_NAME,"green",null,"绿色".getBytes(StandardCharsets.UTF_8));
        System.out.println("消息成功发送");
    }
}

第五种:Topics

基本都用Topics模式,通过routing-key,通过通配符匹配的方式,只要能够匹配上,一个交换机能够发送消息到不同的队列。

和Routing模式不同在于,Routing必须要完全匹配,而Topics通过占位符/通配符方式来匹配

  • 生产者
// 生产者
package com.xqm.mq.test.topic;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.xqm.mq.util.RabbitMQConnectionUtil;
import org.junit.Test;

import java.nio.charset.StandardCharsets;

public class Publish {
    public static final String EXCHANGE_NAME="topic";
    public static final String QUEUE_NAME1="topic-one";
    public static final String QUEUE_NAME2="topic-two";

    @Test
    public void publisher() throws Exception {
        // 1.获取连接对象
        Connection connection = RabbitMQConnectionUtil.getConnection();
        // 2.构建channel
        Channel channel = connection.createChannel();
        // 3.构建交换机
        // Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type) throws IOException;
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        // 4.构建队列
        channel.queueDeclare(QUEUE_NAME1,false,false,false,null);
        channel.queueDeclare(QUEUE_NAME2,false,false,false,null);
        // 5.绑定交换机和队列  topic类型的交换机,和队列绑定时,需要以aaa.bbb.ccc方式编写
        // 其中*表示占位符,#相当于通配符,如果发送aaa.orange.rabbit,那么匹配到*.orange.*和*.*.rabbit两个队列
        channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"*.orange.*");
        channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"*.*.rabbit");
        channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"lazy.#");
        // 6.发消息到交换机
        channel.basicPublish(EXCHANGE_NAME,"big.orange.rabbit",null,"大橙兔子".getBytes(StandardCharsets.UTF_8));
        channel.basicPublish(EXCHANGE_NAME,"small.white.rabbit",null,"小白兔".getBytes(StandardCharsets.UTF_8));
        channel.basicPublish(EXCHANGE_NAME,"lazy.dog.dog",null,"懒狗".getBytes(StandardCharsets.UTF_8));
        System.out.println("消息成功发送");
    }
}

第六种:RPC

使用的不是很多,了解

client server

因为两个服务在交互时,可以尽量做到client和Server的解耦,通过RabbitMQ进行解耦操作

需要让client发送消息时,携带两个属性:

replyTo:告知server将消息放到哪个队列中
corrlationId:告知Server发送相应消息时,需要携带的唯一标识来告知client响应的信息

  • 生产者
// publisher

package com.xqm.mq.test.rpc;

import com.rabbitmq.client.*;
import com.xqm.mq.util.RabbitMQConnectionUtil;
import org.junit.Test;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.UUID;

public class Publisher {

    public static final String QUEUE_PUBLISHER="rpc_publisher";
    public static final String QUEUE_CONSUMER="rpc_consumer";

    @Test
    public void publish() throws Exception {
        // 1.获取连接对象
        Connection connection = RabbitMQConnectionUtil.getConnection();
        // 2.构建Channel
        Channel channel = connection.createChannel();
        // 3.构建队列  (队列名,是否持久化(重启是否删除),是否是独占队列(其他连接不能占有),是否自动删除(如果长时间没使用的时候),其他参数(在图形化界面的其他参数位置))
        channel.queueDeclare(QUEUE_PUBLISHER,false,false,false,null);
        channel.queueDeclare(QUEUE_CONSUMER,false,false,false,null);
        // 4.发布消息 默认交换机就是""
        String message="hello RPC";
        String uuid = UUID.randomUUID().toString();
        AMQP.BasicProperties properties=new AMQP.BasicProperties()
                .builder()
                .replyTo(QUEUE_CONSUMER)  // 需要监听的队列
                .correlationId(uuid)        // 唯一标识
                .build();
        // 将properties放进去
        channel.basicPublish("",QUEUE_PUBLISHER,properties,message.getBytes(StandardCharsets.UTF_8));
        // 将自动ack关闭 生产者监听是否是给它的响应信息
        channel.basicConsume(QUEUE_CONSUMER,false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String correlationId = properties.getCorrelationId();
                if (correlationId !=null && correlationId.equalsIgnoreCase(uuid)){
                    System.out.println("接收到服务端的响应:"+new String(body,"UTF-8"));
                }
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });
        System.out.println("消息已发送");

        // 进行阻塞
        System.in.read();

    }
}
  • 消费者
// consumer
package com.xqm.mq.test.rpc;

import com.rabbitmq.client.*;
import com.xqm.mq.test.helloworld.Publisher;
import com.xqm.mq.util.RabbitMQConnectionUtil;
import org.junit.Test;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

public class Consumer {

    public static final String QUEUE_PUBLISHER="rpc_publisher";
    public static final String QUEUE_CONSUMER="rpc_consumer";

    @Test
    public void consume() throws Exception {
        // 1.获取连接对象
        Connection connection = RabbitMQConnectionUtil.getConnection();
        // 2.构建Channel
        Channel channel = connection.createChannel();
        // 3.构建队列  (队列名,是否持久化(重启是否删除),是否是独占队列(其他连接不能占有),是否自动删除(如果长时间没使用的时候),其他参数(在图形化界面的其他参数位置))
        // 必须和生产者构建的队列一样,否则就会报错
        channel.queueDeclare(QUEUE_PUBLISHER,false,false,false,null);
        channel.queueDeclare(QUEUE_CONSUMER,false,false,false,null);
        // 4.监听消息 消费消息
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            // 重写handleDelivery
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者获取的消息:"+new String(body,"UTF-8"));
                String response="获取到client发出的请求,这里是响应的信息";
                String replyQueueName = properties.getReplyTo();
                String uuid = properties.getCorrelationId();
                AMQP.BasicProperties prop=new AMQP.BasicProperties()
                        .builder()
                        .correlationId(uuid)
                        .build();
                channel.basicPublish("",replyQueueName,prop,response.getBytes(StandardCharsets.UTF_8));
                // 手动响应ack消息
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        // 监听publish
        channel.basicConsume(QUEUE_PUBLISHER,false,defaultConsumer);

        // 进行阻塞
        System.in.read();
    }
}

第七种:Publisher Confirms

这是用来保证消息从Publisher到Exchange的机制,下面有详细讲解。

Exchange的第四种模式:Headers

headers就是一个基于key-value的方式,让Exchange和Queue绑定的到一起的一种规则

相比Topic形式,可以采用的类型更丰富。

如果x-match=all,那么传过来的消息必须全部匹配,也就是name和age都相等
如果x-match=any,那么只要满足一个条件就行

image.png

  • 生产者
package com.xqm.mq.test.headers;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.xqm.mq.util.RabbitMQConnectionUtil;
import org.junit.Test;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;

public class Publisher {

    public static final String HEADERS_EXCHANGE = "header-exchange";
    public static final String HEADERS_QUEUE = "header-queue";


    @Test
    public void publish() throws Exception {

        // 1.获取连接对象
        Connection connection = RabbitMQConnectionUtil.getConnection();
        // 2.构建Channel
        Channel channel = connection.createChannel();
        // 3.构建交换机和队列,基于header的方式绑定
        channel.exchangeDeclare(HEADERS_EXCHANGE, BuiltinExchangeType.HEADERS);
        channel.queueDeclare(HEADERS_QUEUE, true, false, false, null);
        Map<String, Object> map = new HashMap<>();
        map.put("x-match", "all");
        map.put("name", "jack");
        map.put("age","12");
        channel.queueBind(HEADERS_QUEUE, HEADERS_EXCHANGE, "", map);

        // 发送消息
        String msg="headers测试消息";
        Map<String, Object> headers=new HashMap<>();
        // 必须携带这个消息,否则消息就会发送不到queue
        headers.put("name", "jack");
        headers.put("age","12");
        AMQP.BasicProperties properties=new AMQP.BasicProperties()
                .builder()
                .headers(headers)
                .build();
        // 这里和routingKey没关系了
        channel.basicPublish(HEADERS_EXCHANGE,"",properties,msg.getBytes(StandardCharsets.UTF_8));
        System.out.println("发送消息成功"+headers);


    }
}

SpringBoot整合RabbitMQ

依赖

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

yml

server:
  port: 8081
spring:
  rabbitmq:
    host: 192.168.49.131
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    listener:
      simple:
        # 开启手动ack,默认是自动auto
        acknowledge-mode: manual
        # 消费者每次拿10条消息进行消费
        prefetch: 10
        # 多少个线程并发执行
#        concurrency: 2

配置类

// 配置类

package com.xqm.rabbitmq.config;


import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    public static final String EXCHANGE="boot-exchange";
    public static final String QUEUE="boot-queue";
    public static final String ROUTING_KEY="*.black.*";


    /**
     * 声明交换机
     * @return
     */
    @Bean
    public Exchange bootExchange(){
        return ExchangeBuilder.topicExchange(EXCHANGE).build();
    }

    /**
     * 声明队列
     * @return
     */
    @Bean
    public Queue bootQueue(){
        return QueueBuilder.durable(QUEUE).build();
    }

    /**
     * 绑定交换机和队列
     * @return
     */
    @Bean
    public Binding bootBinding(Exchange bootExchange,Queue bootQueue){
        return BindingBuilder.bind(bootQueue).to(bootExchange).with(ROUTING_KEY).noargs();
    }
}

生产者

package com.xqm.rabbitmq;


import com.xqm.rabbitmq.config.RabbitMQConfig;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
public class PublisherTest {


    @Autowired
    public RabbitTemplate rabbitTemplate;


    @Test
    public void publish(){
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE,"big.black.dog","message");
        System.out.println("消息发送成功");
    }

    @Test
    public void publishWithProperties(){
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE, "big.black.dog", "messageWithProperties", new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setCorrelationId("1234");
                return message;
            }
        });
        System.out.println("消息发送成功");
    }

}

消费者

// 配置文件中开启手动ack

spring:
  rabbitmq:
    host: 192.168.49.131
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    listener:
      simple:
        # 开启手动ack,默认是自动auto
        acknowledge-mode: manual
        # 消费者每次拿10条消息进行消费
        prefetch: 10
        # 多少个线程并发执行
#        concurrency: 2

java实现

package com.xqm.rabbitmq;


import com.rabbitmq.client.Channel;
import com.xqm.rabbitmq.config.RabbitMQConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Configuration;

import java.io.IOException;


@Configuration
public class ConsumerListener {

    @RabbitListener(queues = RabbitMQConfig.QUEUE)
    public void consume(String msg, Channel channel, Message message) throws IOException {

        System.out.println("队列的消息为:"+msg);
        String correlationId = message.getMessageProperties().getCorrelationId();
        System.out.println("唯一标识为:"+correlationId);
        // 手动ack应答
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }
}

结果

队列的消息为:message
唯一标识为:null
队列的消息为:messageWithProperties
唯一标识为:1234

RabbitMQ保证消息可靠性

publisher->exchange

confirm机制,保证生产者到交换机消息的可靠性。

一共有三种,选取了异步回调的方式。

package com.xqm.mq.test.confirms;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.xqm.mq.util.RabbitMQConnectionUtil;
import org.junit.Test;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

public class Publisher {
    public static final String QUEUE_NAME="confirms";

    @Test
    public void publisher() throws Exception {
        // 1.获取连接对象
        Connection connection = RabbitMQConnectionUtil.getConnection();
        // 2.构建channel
        Channel channel = connection.createChannel();
        // 3.构建队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        // 4.开启confirms机制
        channel.confirmSelect();
        // 5.设置confirms的异步回调
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("消息成功发送到交换机");
            }

            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("消息没有成功发送到交换机,重试或者保存到数据库");
            }
        });
        // 6.发消息到交换机
        String message="confirms";
        channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));
        System.out.println("消息成功发送");
        System.in.read();
    }
}

exchange-(routing)-queue

Return机制,保证消息可以路由到队列

package com.xqm.mq.test.confirms;

import com.rabbitmq.client.*;
import com.xqm.mq.util.RabbitMQConnectionUtil;
import org.junit.Test;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

/**
 * return机制保证exchange发送消息到queue,但是需要basicPublish新增参数
 */
public class PublisherReturn {
    public static final String QUEUE_NAME="confirms";

    @Test
    public void publisher() throws Exception {
        // 1.获取连接对象
        Connection connection = RabbitMQConnectionUtil.getConnection();
        // 2.构建channel
        Channel channel = connection.createChannel();
        // 3.构建队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        // 4.开启confirms机制
        channel.confirmSelect();
        // 5.设置confirms的异步回调
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("消息成功发送到交换机");
            }

            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("消息没有成功发送到交换机,重试或者保存到数据库");
            }
        });
        // 6.设置return回调,确认消息是否路由到queue
        channel.addReturnListener(new ReturnListener() {
            @Override
            public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消息没有路由到指定队列中执行此方法,做其他的补偿措施");
                System.out.println("消息没有送达到队列中");
            }
        });
        // 7.发消息到交换机
        String message="confirms";
        channel.basicPublish("","11",true,null,message.getBytes(StandardCharsets.UTF_8));
        System.out.println("消息成功发送");
        System.in.read();
    }
}
//7. 在发送消息时,将basicPublish方法参数中的mandatory设置为true,即可开启Return机制,当消息没有路由到队列中时,就会执行return回调

queue

使用DeliveryMode设置消息持久化。

DeliveryMode设置为2代表持久化,如果设置为1,就代表不会持久化。

package com.xqm.mq.test.confirms;

import com.rabbitmq.client.*;
import com.xqm.mq.util.RabbitMQConnectionUtil;
import org.junit.Test;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

/**
 * 设置消息持久化
 */
public class PublisherDurable {
    public static final String QUEUE_NAME="confirms";

    @Test
    public void publisher() throws Exception {
        // 1.获取连接对象
        Connection connection = RabbitMQConnectionUtil.getConnection();
        // 2.构建channel
        Channel channel = connection.createChannel();
        // 3.构建队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        // 4.开启confirms机制
        channel.confirmSelect();
        // 5.设置confirms的异步回调
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("消息成功发送到交换机");
            }

            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("消息没有成功发送到交换机,重试或者保存到数据库");
            }
        });
        // 6.设置return回调,确认消息是否路由到queue
        channel.addReturnListener(new ReturnListener() {
            @Override
            public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消息没有路由到指定队列中执行此方法,做其他的补偿措施");
                System.out.println("消息没有送达到队列中");
            }
        });

        // 7.设置消息持久化
        AMQP.BasicProperties properties=new AMQP.BasicProperties().builder()
                .deliveryMode(2)
                .build();

        // 8.发消息到交换机
        String message="confirms";
        channel.basicPublish("","11",true,properties,message.getBytes(StandardCharsets.UTF_8));
        System.out.println("消息成功发送");
        System.in.read();
    }
}

consumer

消费者可以正常消费队列中的消息,使用主动ack的模式。

详细的参考WorkQueue模式

springboot实现消息可靠性

confirm

  • yml
spring:
  rabbitmq:
    publisher-confirm-type: correlated  # 新版本
    publisher-confirms: true  # 老版本 
  • 在发送消息时,配置RabbitTemplate
@Test
public void publishWithConfirms() throws IOException {
    rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            if(ack){
                System.out.println("消息已经送达到交换机!!");
            }else{
                System.out.println("消息没有送达到Exchange,需要做一些补偿操作!!retry!!!");
            }
        }
    });
    rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE,"big.black.dog","message");
    System.out.println("消息发送成功");

    System.in.read();
}

return

  • yml
spring:
  rabbitmq:
    publisher-returns: true # 开启Return机制
  • 在发送消息时,配置RabbitTemplate
@Test
public void publishWithReturn() throws IOException {
    // 新版本用 setReturnsCallback ,老版本用setReturnCallback
    rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
        @Override
        public void returnedMessage(ReturnedMessage returned) {
            String msg = new String(returned.getMessage().getBody());
            System.out.println("消息:" + msg + "路由队列失败!!做补救操作!!");
        }
    });
    rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE,"big.black.dog","message");
    System.out.println("消息发送成功");

    System.in.read();
}

消息持久化

@Test
public void publishWithBasicProperties() throws IOException {
    rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE, "big.black.dog", "message", new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            // 设置消息的持久化!
            message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
            return message;
        }
    });
    System.out.println("消息发送成功");
}

死信队列&延迟交换机

概念

消息被消费者拒绝(nack、reject),requence设置为false(这样被拒绝的消息不会再放到队列中了)

image.png

# 死信队列的应用
1.基于死信队列,即使队列消息已满的情况下,消息也不会丢失
2.实现延迟消费的效果:比如下订单,有15分钟的付款时间

springboot实现死信队列

进入死信队列三种方式:

  • 消息被消费者拒绝
  • 消息过期时间到了
  • 设置队列长度,队列已满,消息进不来就会进去死信队列

死信队列配置文件

// 死信队列配置

package com.xqm.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DeadLetterConfig {

    public static final String NORMAL_EXCHANGE="normal-exchange";
    public static final String NORMAL_QUEUE="normal-queue";
    public static final String NORMAL_ROUTING_KEY="normal.#";
    public static final String DEAD_EXCHANGE="dead-exchange";
    public static final String DEAD_QUEUE="dead-queue";
    public static final String DEAD_ROUTING_KEY="dead.#";

    @Bean
    public Exchange normalExchange(){
        return ExchangeBuilder.topicExchange(NORMAL_EXCHANGE).build();
    }

    @Bean
    public Queue normalQueue(){
        // 队列绑定死信交换机和死信routing
        return QueueBuilder.durable(NORMAL_QUEUE).deadLetterExchange(DEAD_EXCHANGE).deadLetterRoutingKey("dead.abc").build();
    }

    @Bean
    public Binding normalBinding(Exchange normalExchange,Queue normalQueue){
        return BindingBuilder.bind(normalQueue).to(normalExchange).with(NORMAL_ROUTING_KEY).noargs();
    }

    @Bean
    public Exchange deadExchange(){
        return ExchangeBuilder.topicExchange(DEAD_EXCHANGE).build();
    }

    @Bean
    public Queue deadQueue(){
        return QueueBuilder.durable(DEAD_QUEUE).build();
    }

    @Bean
    public Binding deadBinding(Exchange deadExchange,Queue deadQueue){
        return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY).noargs();
    }
}

生产者

// 生产者
package com.xqm.rabbitmq;


import com.xqm.rabbitmq.config.DeadLetterConfig;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
public class DeadPublisher {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void  publish(){
        String msg="send deadLetter";
        rabbitTemplate.convertAndSend(DeadLetterConfig.NORMAL_EXCHANGE,"normal.abc",msg);
    }
}

消费者

消费者通过拒绝消费消息,使消息进入死信队列

// 消费者--通过拒绝消息
package com.xqm.rabbitmq;


import com.rabbitmq.client.Channel;
import com.xqm.rabbitmq.config.DeadLetterConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;


@Component
public class DeadConsumer {

    @RabbitListener(queues = DeadLetterConfig.NORMAL_QUEUE)
    public void consumer(String msg, Channel channel, Message message) throws IOException {
        System.out.println("接受到的normal队列的消息为:"+msg);
        // reject ack 和下面那种二选一
        channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
        // no ack
        // channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
    }
}

消息设置过期时间

消息过期后,也会被丢进死信队列,如果设置过期时间,必须按照时间先后依次进入死信队列。

有两种方式:一是给单个消息设置过期时间;而是给所有消息设置过期时间

单个消息设置过期时间:

// 给消息设置生存时间 DeadPublisher.class
// 有一个问题,如果第一个消息30s延时,第二个消息5s延时,那么第二个消息必须等第一个消息时间到了之后才能轮到它,因此需要延迟交换机

    // 携带生存时间的消息发送者
    @Test
    public void publishExpire(){
        String msg="send letter expire";
        rabbitTemplate.convertAndSend(DeadLetterConfig.NORMAL_EXCHANGE, "normal.abc", msg, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                // 设置5秒的过期时间,5s没有消费就会放入到死信队列中,指定单一的消息生存时间
                message.getMessageProperties().setExpiration("5000");
                return message;
            }
        });
    }

所有消息设置过期时间:使用ttl给exchange中所有消息设置过期时间

// ttl 给队列中的所有消息设置生存时间 @Configuration   public class DeadLetterConfig {}
    @Bean
    public Queue normalQueue(){
        // 队列绑定死信交换机
        return QueueBuilder.durable(NORMAL_QUEUE).
                deadLetterExchange(DEAD_EXCHANGE).
                deadLetterRoutingKey("dead.abc")
                // 队列中所有消息生存10s
                // .ttl(10000)
                .build();
    }

设置队列长度

当队列已满,消息进不来就会进入死信队列

// maxLength  队列最大长度,如果超过长度就会被送到死信队列   
@Bean
    public Queue normalQueue(){
        // 队列绑定死信交换机
        return QueueBuilder.durable(NORMAL_QUEUE).
                deadLetterExchange(DEAD_EXCHANGE).
                deadLetterRoutingKey("dead.abc")
                // 队列中所有消息生存10s
                // .ttl(10000)\
                // 队列中最大长度,超出长度就会到死信队列
                // .maxLength(1)
                .build();
    }

其他

设置delivery-limit参数,如果一个消息被频繁拒绝,也会被送入到死信队列

延迟交换机(插件)

网址:https://www.rabbitmq.com//community-plugins.html  搜索rabbitmq_delayed_message_exchange
下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/tag/3.8.9

下载插件

# 下载插件,放到容器中
[root@localhost xqm]# docker cp rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez  rabbitmq:/opt/rabbitmq/plugins

root@721622f2a085:/opt/rabbitmq/plugins# cd ../sbin/
root@721622f2a085:/opt/rabbitmq/sbin# ls
rabbitmq-defaults  rabbitmq-diagnostics  rabbitmq-env  rabbitmq-plugins  rabbitmq-queues  rabbitmq-server  rabbitmq-upgrade  rabbitmqctl
# 开启插件
root@721622f2a085:/opt/rabbitmq/sbin# rabbitmq-plugins enable rabbitmq_delayed_message_exchange

配置类

// 配置类
package com.xqm.rabbitmq.config;


import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class DelayedConfig {

    public static final String DELAY_EXCHANGE="delay_exchange";
    public static final String DELAY_QUEUE="delay_queue";
    public static final String DELAY_ROUTING_KEY="delay.#";



    @Bean
    public Exchange delayExchange(){
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-delayed-type","topic");
        Exchange exchange=new CustomExchange(DELAY_EXCHANGE,"x-delayed-message",true,false,arguments);
        return exchange;
    }

    @Bean
    public Queue delayQueue(){
        return QueueBuilder.durable(DELAY_QUEUE).build();
    }

    @Bean
    public Binding delayBinding(Queue delayQueue,Exchange delayExchange){
        return BindingBuilder.bind(delayQueue).to(delayExchange).with(DELAY_ROUTING_KEY).noargs();
    }
}

生产者

// publisher

package com.xqm.rabbitmq;

import com.xqm.rabbitmq.config.DelayedConfig;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
public class DelayPublisher {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void publish(){
        rabbitTemplate.convertAndSend(DelayedConfig.DELAY_EXCHANGE, "delay.abc", "xxx", new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                // 30s
                message.getMessageProperties().setDelay(30000);
                return message;
            }
        });
    }


    @Test
    public void publishT(){
        rabbitTemplate.convertAndSend(DelayedConfig.DELAY_EXCHANGE, "delay.abc", "xxx", new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                // 30s
                message.getMessageProperties().setDelay(5000);
                return message;
            }
        });
    }
}

结论

死信队列实现延迟消费时,如果延迟时间比较复杂,比较多,直接使用死信队列时,需要创建大量的队列还对应不同的时间,可以采用延迟交换机来解决这个问题。

RabbitMQ集群

rabbitMQ镜像模式

docker-compose

rabbitmq1:

version: '3.1'
services:
  rabbitmq1:
    image: rabbitmq:3.8.5-management-alpine
    container_name: rabbitmq1
    hostname: rabbitmq1
    extra_hosts:
      - "rabbitmq1:192.168.11.32"
      - "rabbitmq2:192.168.11.33"
    environment: 
      - RABBITMQ_ERLANG_COOKIE=SDJHFGDFFS
    ports:
      - 5672:5672
      - 15672:15672
      - 4369:4369
      - 25672:25672

rabbitmq2:

version: '3.1'
services:
  rabbitmq2:
    image: rabbitmq:3.8.5-management-alpine  # alpine占用资源少
    container_name: rabbitmq2
    hostname: rabbitmq2
    extra_hosts:
      - "rabbitmq1:192.168.11.32"
      - "rabbitmq2:192.168.11.33"
    environment: 
      - RABBITMQ_ERLANG_COOKIE=SDJHFGDFFS # 后面写什么无所谓,但是cookie必须要一样
    ports:
      - 5672:5672
      - 15672:15672
      - 4369:4369		# 集群交互需要用到的端口
      - 25672:25672     # 集群交互需要用到的端口

执行

docker-compose up -d

RabbitMQ实现join操作

# 需要四个命令完成join操作
# 让rabbitmq2   join  rabbitmq1,需要进入到rabbitmq2的容器内部,去执行下述命令 docker exec -it name /bin/bash

rabbitmqctl stop_app
rabbitmqctl reset 
rabbitmqctl join_cluster rabbit@rabbitmq1
rabbitmqctl start_app

image.png

设置镜像模式

在指定的RabbitMQ服务中设置好镜像策略即可

image.png

name:随便写
Pattern:^  # 代表所有
Apply to:Exchanges and queues
Priority:不需要写
Definition: ha-mode=all
			ha-sync-mode=automatic