一、本节介绍
1.1 主要内容
1.初始MQ:核心概念、组件
2.RabbitMQ的安装、启动
3.RabbitMQ管理后台
看到整体消息的流量,交换机,队列,对用户添加、删除、配置权限
4.案例实战
简单发消息--->多个消费者
5.交换机的工作模式
6.Spring Boot整合RabbitMQ
二、初识RabbitMQ
2.1 RabbitMQ官网
1.
RabbitMQ的安装是比较复杂的,因为它是用Erlang语言编写的,还需要该语言的环境。
但是,本课程采用另外一种简单清晰的方式。
2.
下面是官网的一些教程:
2.2 MQ(消息队列)
1.有三个基本概念:
2.消息队列的作用:
方便系统解耦、流量削峰、异步调用。
3.消息队列的特性:
2.3 RabbitMQ的特点:5个
2.4 RabbitMQ的核心概念、核心组件(图)
其中:
- 虚拟主机:隔离服务。因为同一个RabbitMQ消息队列,可能会同时给多个服务进行使用。如果想服务与服务之间隔离开、不想相互干扰,那么就可以使用虚拟主机进行隔离。
三、RabbitMQ的安装
3.1 Linux下安装
要安装的RabbitMQ的版本是3.8.2;
不需要单独安装Erlang环境(饿狼)
1 环境配置
前提:在一个新建的阿里云服务器的Cent OS 7.6上安装,不要对yum换源,否则可能会安装失败。
ssh root@121.199.31.220
echo "export LC_ALL=en_US.UTF-8" >> /etc/profile
---------------编码方式设置为UTF-8,支持中文
source /etc/profile
------------------------------------------------------------------- 让上面的设置生效
2 安装
第一步:执行
curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh | sudo bash
------------配置RabbitMQ:用于下载一个脚本,以找到RabbitMQ源地址
第二步,执行:
curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | sudo bash
-------------配置erlang
第三步:
sudo yum install rabbitmq-server-3.8.2-1.el7.noarch
-------------------将RabbitMQ及其依赖,安装
3 启动
启动RabbitMQ:
systemctl start rabbitmq-server
看看端口有没有起来,查看状态:
rabbitmqctl status
要检查RabbitMQ服务器的状态,请运行:
systemctl status rabbitmq-server
3.2 Mac OS和Windows下的安装
用的少,见文档《安装RabbitMQ》。
因为RabbitMQ服务端代码是使用并发式语言Erlang编写的,所以,安装Rabbit MQ的前提是安装Erlang。>
以下是自己尝试安装windows的RabbitMQ。尝试失败,果然像老师所说的,有很多的坑:
还是使用云服务器的Linux系统吧。
1.安装Erlang
下载安装包,一路next;
电脑上环境变量;
cmd命令行,表示安装Erlang成功:
2.安装RabbitMQ
一路next,RabbitMQ安装好后;
再去安装RabbitMQ-Plugins插件:打开命令行cd,输入RabbitMQ的sbin目录:
rabbitmq-plugins enable rabbitmq_management
打开命令行命令行,进入RabbitMQ的安装目录: sbin
输入 rabbitmqctl status , 如果出现以下的图,说明安装是成功的,并且说明现在RabbitMQ Server已经启动了,运行正常。
3.3 RabbitMQ常用命令
开启web管理界面
rabbitmq-plugins enable rabbitmq_management
停止RabbitMQ
rabbitmqctl stop
设置开机启动
systemctl enable rabbitmq-server
启动RabbitMQ
systemctl start rabbitmq-server
看看端口有没有起来,查看状态
rabbitmqctl status
要检查RabbitMQ服务器的状态,请运行:
systemctl status rabbitmq-server
四、RabbitMQ的管理后台
管理后台是亮点之一。
4.1 开启web形式的管理后台
rabbitmq-plugins enable rabbitmq_management
4.2 添加admin用户
因为默认情况下,用户只有guest,但是guest在远端是访问不上去的,所以要新添加一个admin用户。
rabbitmqctl add_user admin password
------------------------------ 添加用户
rabbitmqctl set_user_tags admin administrator
------------- 将其设置为管理员
4.3 登录管理后台
在阿里云控制台上,配置阿里云安全组,打开15672端口(RabbitMQ的默认端口)
浏览器中,输入121.199.31.220:15672/
:
用admin,密码password即可登录
成功登录:
让admin管理员,拥有与guest一样的权限:
五、实战案例1:基础运用,只1条消息、1个消费者
使用Java客户端,来连接到RabbitMQ,然后进行消息的发送、接收。
第一次接触新东西,要循序渐进
5.1 目标
5.2 引入依赖
1.引入两个依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.imooc</groupId>
<artifactId>rabbitmq</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<!--1.rabbitmq依赖 -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>
<!--2. 用于记录日志-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-nop</artifactId>
<version>1.7.29</version>
</dependency>
</dependencies>
</project>
5.3 前提准备
(1)
编写发送类Send
:
虽然该类很简单,但是能学到很多rabbitmq
的通用流程:
// 1.创建连接工厂
// 2.设置RabbitMQ的地址
// 3.建立连接
// 4.获得信道
// 5.声明队列
// 6.发布消息
// 7.关闭连接
(2)
在阿里云控制台上,配置网络与安全组,开放5672端口:
(3)
(4)
/
是最常用的virtual host,因为没有必要区分过多的虚拟主机。那么就需要给/
的virtual host加上admin用户的权限:
5.4 编写发送类Send
package helloworld;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* Hello World 的发送类:连接到RabbitMQ的服务端,然后发送一条消息,最后退出。
*/
public class Send {
private final static String QUEUE_NAME = "hello"; // 用类常量,来体现队列的名字
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2.设置RabbitMQ的地址
factory.setHost("121.199.31.220"); // 填入ip地址,即之前安装了MQ的阿里云的实例
factory.setUsername("admin");
factory.setPassword("password");
// 3.建立连接
Connection connection = factory.newConnection();
// 4.获得信道
Channel channel = connection.createChannel();
// 5.声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 6.发布消息
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println("发送了消息:" + message);
// 7.关闭连接
channel.close();
connection.close();
}
}
其中:
- 关于
5.声明队列
: channel.queueDeclare(String var1, boolean var2, boolean var3, boolean var4, Map<String, Object> var5) var1:消息队列的名字 var2:这个队列需要持久吗?持久就是如果服务器重启了,该队列仍然存在。一般不需要持久存在。 var3:是否独有。该队列是否是仅能给这个连接使用? var4:是有需要自动删除?如果队列没有使用,就会自动删除。 var5: Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException; Params: queue – the name of the queue durable – true if we are declaring a durable queue (the queue will survive a server restart) exclusive – true if we are declaring an exclusive queue (restricted to this connection) autoDelete – true if we are declaring an autodelete queue (server will delete it when no longer in use) arguments – other properties (construction arguments) for the queue
publish
发布,公布,放出(let out)
结果为:
发送了消息:Hello World!
5.5 编写接收类
Recv
package helloworld; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 接收消息,并打印,持续运行(即持续的在消费消息) */ public class Recv { private final static String QUEUE_NAME = "hello"; // 用类常量,来体现队列的名字 public static void main(String[] args) throws IOException, TimeoutException { // 1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 2.设置RabbitMQ的地址 factory.setHost("121.199.31.220"); // 填入ip地址,即之前安装了MQ的阿里云的实例 factory.setUsername("admin"); factory.setPassword("password"); // 3.建立连接 Connection connection = factory.newConnection(); // 4.获得信道 Channel channel = connection.createChannel(); // 5.声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 6.接收消息,并消费 channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) { // 队列名;自动确认消息;对消息的处理; @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("已收到消息:" + message); } }); } }
Consume
消费,消耗(use up)
结果为:
因为前面的发送类,发送的消息,存储在队列中;
所以接受类在启动之后,可以拿到这个消息
已收到消息:Hello World!
如果发送类改变了消息内容,接受类因为一直在运行,就会马上接收到最新的信息:
以上,就完成了对
RabbitMQ
的基础运用。5.6
RabbitMQ
控制台的变化1.
在
RabbitMQ
的控制台上:刚才发送消息的流量:
2.
java程序的操作,都会在
RabbitMQ
控制台中有所体现。六、实战案例2:10条信息、多消费者
如果消息很多,一个消费者处理不过来,能否让多个消费者一起承担呢?
当然可以。
6.1 编写发送类
NewTask
package workqueues; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 任务会很耗时,因为有多个任务 */ public class NewTask { private final static String TASK_QUEUE_NAME = "task_queue"; // 用类常量,来体现队列的名字 public static void main(String[] args) throws IOException, TimeoutException { // 1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 2.设置RabbitMQ的地址 factory.setHost("121.199.31.220"); // 填入ip地址,即之前安装了MQ的阿里云的实例 factory.setUsername("admin"); factory.setPassword("password"); // 3.建立连接 Connection connection = factory.newConnection(); // 4.获得信道 Channel channel = connection.createChannel(); // 5.声明队列 channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); // 6.发布消息(模拟消息很多的情况) for (int i = 0; i < 10; i++) { String message; message = i + "..."; channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes("UTF-8")); System.out.println("发送了消息:" + message); } // 7.关闭连接 channel.close(); connection.close(); } }
其中:
如果自己电脑已安装了本地客户端,也可以连接它:
factory.setHost("localhost"); // 即默认用的是guest用户进行登录,此时就不用用户名、密码了
对于发送方,它只管发送消息,不管后续。至于谁接收、什么时候接收,这是
RabbitMQ
消息队列的事情。所以,发送类,在发送完消息后,就会直接退出。
结果为:
发送了消息:0... 发送了消息:1... 发送了消息:2... 发送了消息:3... 发送了消息:4... 发送了消息:5... 发送了消息:6... 发送了消息:7... 发送了消息:8... 发送了消息:9...
6.2 编写接收类
Worker
package workqueues; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 消费者,用于接收前面的批量消息 */ public class Worker { private final static String TASK_QUEUE_NAME = "task_queue"; // 用类常量,来体现队列的名字 public static void main(String[] args) throws IOException, TimeoutException { // 1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 2.设置RabbitMQ的地址 factory.setHost("121.199.31.220"); // 填入ip地址,即之前安装了MQ的阿里云的实例 factory.setUsername("admin"); factory.setPassword("password"); // 3.建立连接 Connection connection = factory.newConnection(); // 4.获得信道 Channel channel = connection.createChannel(); // 5.声明队列 channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); // 6.接收消息,并消费 System.out.println("开始接收消息"); channel.basicConsume(TASK_QUEUE_NAME, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("已收到消息:" + message); try { doWork(message); } finally { System.out.println("已完成消息处理"); } } }); } // 类方法:根据字符是否有英文句号(点),进行消息区分。被上面调用。 private static void doWork(String task) { char[] chars = task.toCharArray(); // 将字符串形式的task,转换为字符数组 for (char ch : chars) { if (ch == '.') { // 如果消息中,有一个点,就现成休眠1秒,模拟处理消息的过程 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } } }
结果为:
开始接收消息 已收到消息:0... 已完成消息处理 已收到消息:1... 已完成消息处理 已收到消息:2... 已完成消息处理 已收到消息:3... 已完成消息处理 已收到消息:4... 已完成消息处理 已收到消息:5... 已完成消息处理 已收到消息:6... 已完成消息处理 已收到消息:7... 已完成消息处理 已收到消息:8... 已完成消息处理 已收到消息:9... 已完成消息处理
以上,虽然是消息已处理完成,但是只有一个worker的原因,速度很慢,大约有30秒。
所以,下一步,要加入更多的worker,用于消息的处理。
6.3 创建3个消费者,并配置并行运行
1.
如果继续运行,会提示不允许并行运行:
因为当前worker只有一个实例,就算是重启,还是原来那一个工人,把活儿重新再干一遍,还是花30秒。
所以,不重启,接下来需要一定的配置:
应用即可。此时就能并行运行了:
multiply
乘法;众多的(many)
我曾遇到的问题:
我的IDEA没有allow parallel run:
我的IDEA:
老师的IDEA:
原因:
因为老师是旧的IDEA,我是新的IDEA。
解决:
结果:
两个消费者,能成功的并行运行了:
2.
再增加一个消费者,至一共三个消费者。看下任务的分配方式:
worker1
已收到消息:0... 已完成消息处理 已收到消息:3... 已完成消息处理 已收到消息:6... 已完成消息处理 已收到消息:9... 已完成消息处理
worker2:
已收到消息:1... 已完成消息处理 已收到消息:4... 已完成消息处理 已收到消息:7... 已完成消息处理
worker3:
开始接收消息 已收到消息:2... 已完成消息处理 已收到消息:5... 已完成消息处理 已收到消息:8... 已完成消息处理
由上可知:
任务的分配方式是平均分配。即按照任务的数量,平均分。6.4 优化:当每个消息压力不同时,通过信息确认实现公平分配
但是,上面按照任务数量的方式来分配调度任务,一定公平吗?
不公平。因为实际工作中,每个消息的压力很可能是不一样的。
1.按数量平均分配时的不公平情况:
(1)
发送类
NewTask
/** * 任务会很耗时,因为有多个任务 */ public class NewTask { private final static String TASK_QUEUE_NAME = "task_queue"; // 用类常量,来体现队列的名字 public static void main(String[] args) throws IOException, TimeoutException { // 1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 2.设置RabbitMQ的地址 factory.setHost("121.199.31.220"); // 填入ip地址,即之前安装了MQ的阿里云的实例 factory.setUsername("admin"); factory.setPassword("password"); // 3.建立连接 Connection connection = factory.newConnection(); // 4.获得信道 Channel channel = connection.createChannel(); // 5.声明队列 channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); // 6.发布消息 for (int i = 0; i < 10; i++) { String message; if (i % 2 == 0) { message = i + "..."; }else { // message = i; message = String.valueOf(i); // 让整型,转换为字符串 } channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes("UTF-8")); System.out.println("发送了消息:" + message); } // 7.关闭连接 channel.close(); connection.close(); } }
结果为:
模拟出了不同压力的消息:
发送了消息:0... 发送了消息:1 发送了消息:2... 发送了消息:3 发送了消息:4... 发送了消息:5 发送了消息:6... 发送了消息:7 发送了消息:8... 发送了消息:9
(2)
ps:要先启动接受类的实例,再运行发送类中发送信息的方法。
接收类:
worker1:
开始接收消息 已收到消息:0... 已完成消息处理 已收到消息:2... // 实际上,执行到这里时,worker1早就已经执行完了 :) 已完成消息处理 已收到消息:4... 已完成消息处理 已收到消息:6... 已完成消息处理 已收到消息:8... 已完成消息处理
worker2:
开始接收消息 已收到消息:1 已完成消息处理 已收到消息:3 已完成消息处理 已收到消息:5 已完成消息处理 已收到消息:7 已完成消息处理 已收到消息:9 已完成消息处理
很明显,这是分配任务不公平的情况:
- 按数量平均分配:你干奇数的活儿,他干偶数的活儿。
先把任务分配完,你们工人再去干活儿
重活儿,全让人家worker1自己干,你worker2早就干完了自己的轻活儿,然后在那里闲着?
2.按压力来分配时的公平情况:
(1)
(2)
接收类
Worker
:
优化点有以下三处/** * 消费者,用于接收前面的批量消息 */ public class Worker { private final static String TASK_QUEUE_NAME = "task_queue"; // 用类常量,来体现队列的名字 public static void main(String[] args) throws IOException, TimeoutException { // 1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 2.设置RabbitMQ的地址 factory.setHost("121.199.31.220"); // 填入ip地址,即之前安装了MQ的阿里云的实例 factory.setUsername("admin"); factory.setPassword("password"); // 3.建立连接 Connection connection = factory.newConnection(); // 4.获得信道 Channel channel = connection.createChannel(); // 5.声明队列 channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); // 6.接收消息,并消费 System.out.println("开始接收消息"); channel.basicQos(1); // 1.在没有处理完自己的1个任务之前,是不会接收下一个任务的 channel.basicConsume(TASK_QUEUE_NAME, false, new DefaultConsumer(channel) { // 2.自动确认:false @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("已收到消息:" + message); try { doWork(message); } finally { System.out.println("已完成消息处理"); channel.basicAck(envelope.getDeliveryTag(), false); // 3.在自己的活儿干完之后,要手动确认消息,即跟MQ说一声“我活儿干完了!” } } }); } // 类方法:根据字符是否有英文句号(点),进行消息区分。被上面调用。 private static void doWork(String task) { char[] chars = task.toCharArray(); // 将字符串形式的task,转换为字符数组 for (char ch : chars) { if (ch == '.') { // 如果消息中,有一个点,就现成休眠1秒,模拟处理消息的过程 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } } }
ack
acknowledge=confirm先开启两个worker实例,再启动发送类:结果为:
worker1:
开始接收消息 已收到消息:0... 已完成消息处理 已收到消息:3 已完成消息处理 已收到消息:4... 已完成消息处理 已收到消息:7 已完成消息处理 已收到消息:8... 已完成消息处理
worker2:
开始接收消息 已收到消息:1 已完成消息处理 已收到消息:2... 已完成消息处理 已收到消息:5 已完成消息处理 已收到消息:6... 已完成消息处理 已收到消息:9 已完成消息处理
很明显,这是分配任务公平的情况,是按照压力调度的。
- 在没有处理完自己的1个任务之前,是不会接收下一个任务的;
worker在谁先把自己的活儿干完,就要手动确认消息,即跟MQ说一声“我活儿干完了!”
七、交换机的类型
7.1 四种类型
重点是前三种,第四种几乎不用。
fanout
=fan out。扇出。
7.2 fanout类型:扇形广播的群发
fanout类型,是一个基础的交换机类型。
1.图示
- 只要是与该交换机相关联(绑定)的queue,一律都会受到消息。
每个队列受到的消息内容是一样的,即有很多副本。
目标:搭建日志记录系统:
- 程序1:发出日志信息
程序2:负责接收日志消息 。接收器1:接收,存到磁盘;接收器2:接收,打印到控制台上
2.编写发送类
EmitLog
package fanout; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 发送日志信息 */ public class EmitLog { private static final String EXCHANGE_NAME = "logs"; // 给交换机起个名儿 public static void main(String[] args) throws IOException, TimeoutException { // 1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 2.设置RabbitMQ的地址、连接用户 factory.setHost("121.199.31.220"); // 填入ip地址,即之前安装了MQ的阿里云服务器的实例 factory.setUsername("admin"); factory.setPassword("password"); // 3.建立连接 Connection connection = factory.newConnection(); // 4.获得信道 Channel channel = connection.createChannel(); // 5.声明定义一个交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); // 6.把消息发送到交换机上 String message = "info:hello world!"; channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8")); System.out.println("发送了消息:" + message); // 7.关闭连接 channel.close(); connection.close(); } }
emit
=launch。发射,发出
结果为:
发送了消息:info:hello world!
3.编写接受类
ReceiveLogs
:1个接收者日志的特点:
- 只对当前的消息感兴趣,像以前发生的日志没必要再记录。
时效性强。如果线上日志量很大,可能保存不超过3天就清理了。因为日志记录太多浪费磁盘,且本身也没有太大价值。
基于以上日志的特点,队列使用新的空队列,让
RabbitMQ
自动生成队列的名字。即,非持久的、能自动删除的队列。当接收者不存在时,该临时队列也会自动的删除。即,这样搞就可以在同一个类中多次启动它,每一次队列的名字都是不一样的。
package fanout; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class ReceiveLogs { private static final String EXCHANGE_NAME = "logs"; // 给交换机起个名儿 public static void main(String[] args) throws IOException, TimeoutException { // 1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 2.设置RabbitMQ的地址、连接用户 factory.setHost("121.199.31.220"); // 填入ip地址,即之前安装了MQ的阿里云服务器的实例 factory.setUsername("admin"); factory.setPassword("password"); // 3.建立连接 Connection connection = factory.newConnection(); // 4.获得信道 Channel channel = connection.createChannel(); // 5.声明定义一个交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); // 6.声明一个临时队列(非持久的、能自动删除的队列) String queueName = channel.queueDeclare().getQueue(); // 7.将上述临时队列绑定到交换机上 channel.queueBind(queueName, EXCHANGE_NAME, ""); // 8.接收消息 System.out.println("开始接收消息"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); // 将字符转换为字符串 System.out.println("收到消息:" + message); } }; // 9.进行消息的消费 channel.basicConsume(queueName, true, consumer); } }
其中:
接收,是跟队列绑定的,不跟交换机绑定,这是一种解耦的关系。
先启动接受类,让其持续运行。然后,启动发送类:
发送类已成功发送消息:
发送了消息:info:hello world!
接受类也已成功的接收消息:
开始接收消息 收到消息:info:hello world!
4.如果有多个接收者
设置,允许并行计算
先启动接受类,让其持续运行。然后,启动发送类:
两个接受类,同时收到了消息:
7.3 direct类型:灵活选择接收的信息
1.图示
场景:有时候不需要订阅所有的信息。对于发送者:需要灵活的去区分不同类型的信息,究竟应该发送到哪一个队列中。对于接收者:只你想接收哪个消息,就去绑定该消息对应的队列。
比如,对于日志系统,不需要把所有的日志都存储到磁盘上个,只需要在磁盘上存储那些错误的日志error。
选择信息的关键:RoutingKey
- 设置RoutingKey,用于对哪些信息感兴趣。对于不是RoutingKey的信息,会直接丢弃,不会转发到任何的队列中。
允许相同的RoutingKey,绑定到多个队列中。
在实际生产中,是这样的图示:
目标:优化日志记录系统:
- 程序1:发出日志信息
程序2:负责接收日志消息。接收器1:只接收error信息;接收器2:info、error、warning的信息
2.编写发送类
EmitLogDirect
package direct; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 用direct类型的交换机,来发送信息 */ public class EmitLogDirect { private static final String EXCHANGE_NAME = "direct_logs"; // 给交换机起个名儿 public static void main(String[] args) throws IOException, TimeoutException { // 1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 2.设置RabbitMQ的地址、连接用户 factory.setHost("121.199.31.220"); // 填入ip地址,即之前安装了MQ的阿里云服务器的实例 factory.setUsername("admin"); factory.setPassword("password"); // 3.建立连接 Connection connection = factory.newConnection(); // 4.获得信道 Channel channel = connection.createChannel(); // 5.声明定义一个direct类型的交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); // (1) // 6.把消息发送到交换机上 String message = "info:hello world!"; channel.basicPublish(EXCHANGE_NAME, "info", null, message.getBytes("UTF-8"));// (2) System.out.println("发送了消息," +"等级为info,信息内容:"+ message); // (3) message = "warning:hello world!"; channel.basicPublish(EXCHANGE_NAME, "warning", null, message.getBytes("UTF-8")); System.out.println("发送了消息," +"等级为warning,信息内容:"+ message); message = "error:hello world!"; channel.basicPublish(EXCHANGE_NAME, "error", null, message.getBytes("UTF-8")); System.out.println("发送了消息," +"等级为error,信息内容:"+ message); // 7.关闭连接 channel.close(); connection.close(); } }
老师:
重点不在于敲每一行的代码,而在于理解、提升自己的能力。
解决:
记得更改交换机的名儿结果为:
发送了消息,等级为info,信息内容:info:hello world! 发送了消息,等级为warning,信息内容:warning:hello world! 发送了消息,等级为error,信息内容:error:hello world!
3.编写两个接受类
第一个接收类
ReceiveLogsDirect1
package direct; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 接收3个不同等级的日志:根据自己需求,通过对应的绑定,灵活控制接收的信息 */ public class ReceiveLogsDirect1 { private static final String EXCHANGE_NAME = "direct_logs"; // 给交换机起个名儿 public static void main(String[] args) throws IOException, TimeoutException { // 1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 2.设置RabbitMQ的地址、连接用户 factory.setHost("121.199.31.220"); // 填入ip地址,即之前安装了MQ的阿里云服务器的实例 factory.setUsername("admin"); factory.setPassword("password"); // 3.建立连接 Connection connection = factory.newConnection(); // 4.获得信道 Channel channel = connection.createChannel(); // 5.声明定义一个交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); // 6.声明一个随机的临时队列(非持久的、能自动删除的队列) String queueName = channel.queueDeclare().getQueue(); // 7.将3个临时队列绑定到1个交换机上 channel.queueBind(queueName, EXCHANGE_NAME, "info"); channel.queueBind(queueName, EXCHANGE_NAME, "warning"); channel.queueBind(queueName, EXCHANGE_NAME, "error"); // 8.接收消息 System.out.println("开始接收消息"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); // 将字符转换为字符串 System.out.println("收到消息:" + message); } }; // 9.进行消息的消费 channel.basicConsume(queueName, true, consumer); } }
第二个接收类
ReceiveLogsDirect2
package direct; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 只接收1个不同等级的日志 */ public class ReceiveLogsDirect2 { private static final String EXCHANGE_NAME = "direct_logs"; // 给交换机起个名儿 public static void main(String[] args) throws IOException, TimeoutException { // 1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 2.设置RabbitMQ的地址、连接用户 factory.setHost("121.199.31.220"); // 填入ip地址,即之前安装了MQ的阿里云服务器的实例 factory.setUsername("admin"); factory.setPassword("password"); // 3.建立连接 Connection connection = factory.newConnection(); // 4.获得信道 Channel channel = connection.createChannel(); // 5.声明定义一个交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); // 6.声明一个随机的临时队列(非持久的、能自动删除的队列) String queueName = channel.queueDeclare().getQueue(); // 7.将1个临时队列绑定到1个交换机上 channel.queueBind(queueName, EXCHANGE_NAME, "error"); // 8.接收消息 System.out.println("开始接收消息"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); // 将字符转换为字符串 System.out.println("收到消息:" + message); } }; // 9.进行消息的消费 channel.basicConsume(queueName, true, consumer); } }
结果为:
接受类1:
收到消息:info:hello world! 收到消息:warning:hello world! 收到消息:error:hello world!
接受类2:
收到消息:error:hello world!
以上,通过direct类型的交换机,实现了根据消息类型不同,灵活接收自己所需要的信息。
7.4 topic类型:对于复杂条件,能模糊匹配
1.direct类型的交换机的不足
跟direct类型的交换机,是一种升级关系。
direct类型的交换机的不足:
不能根据一些复杂的条件,进行路由。
比如:有时候不仅要根据消息的严重性进行接收,还要根据消息的源头。
即,目标:优化日志记录系统:
- 程序1:发出日志信息
程序2:负责接收日志消息。接收器1:只接收来自用户模块的error信息;接收器2:只接收来自用户、商品模块的,info的信息
2.模糊匹配
两种符号:
- 注意是单词,不是字母
星号:1个单词,且不能为空
图示:
3.编写发送类
EmitLogTopic
发送了9个消息,它们内容相同,但是路由键却是各不相同的;
package topic; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 用topic类型的交换机,来发送信息 */ public class EmitLogTopic { private static final String EXCHANGE_NAME = "topic_logs"; // 给交换机起个名儿 public static void main(String[] args) throws IOException, TimeoutException { // 1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 2.设置RabbitMQ的地址、连接用户 factory.setHost("121.199.31.220"); // 填入ip地址,即之前安装了MQ的阿里云服务器的实例 factory.setUsername("admin"); factory.setPassword("password"); // 3.建立连接 Connection connection = factory.newConnection(); // 4.获得信道 Channel channel = connection.createChannel(); // 5.声明定义一个direct类型的交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); // (1) // 6.把消息发送到交换机上 String message = "Animal World"; // 7.建立String类型的数组,模拟需要发出很多类型的消息:9个路由键 String[] routingKeys = new String[9]; routingKeys[0] = "quick.orange.rabbit"; routingKeys[1] = "lazy.orange.elephant"; routingKeys[2] = "quick.orange.fox"; routingKeys[3] = "lazy.brown.fox"; routingKeys[4] = "lazy.pink.fox"; routingKeys[5] = "quick.brown.fox"; routingKeys[6] = "orange"; routingKeys[7] = "quick.orange.male.rabbit"; routingKeys[8] = "lazy.orange.male.rabbit"; for (int i = 0; i < routingKeys.length; i++) { channel.basicPublish(EXCHANGE_NAME, routingKeys[i], null, message.getBytes("UTF-8")); System.out.println("发送了:" + message + " routingKey:" + routingKeys[i]); } // 7.关闭连接 channel.close(); connection.close(); } }
4.编写两个接受类
第一个接收类
ReceiveLogsTopic1
package topic; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 特定路由键 */ public class ReceiveLogsTopic1 { private static final String EXCHANGE_NAME = "topic_logs"; // 给交换机起个名儿 public static void main(String[] args) throws IOException, TimeoutException { // 1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 2.设置RabbitMQ的地址、连接用户 factory.setHost("121.199.31.220"); // 填入ip地址,即之前安装了MQ的阿里云服务器的实例 factory.setUsername("admin"); factory.setPassword("password"); // 3.建立连接 Connection connection = factory.newConnection(); // 4.获得信道 Channel channel = connection.createChannel(); // 5.声明定义一个topic类型的交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); // 6.声明一个随机的临时队列(非持久的、能自动删除的队列) String queueName = channel.queueDeclare().getQueue(); // 7.声明一个路由键 String routingKey = "*.orange.*"; // 7.5 将上述的临时队列、路由键,绑定到1个交换机上 channel.queueBind(queueName, EXCHANGE_NAME, routingKey); // 8.接收消息 System.out.println("开始接收消息"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); // 将字符转换为字符串 System.out.println("收到消息:" + message + " routingKey:" + envelope.getRoutingKey()); } }; // 9.进行消息的消费 channel.basicConsume(queueName, true, consumer); } }
第二个接收类
ReceiveLogsTopic2
package topic; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 特定路由键 */ public class ReceiveLogsTopic2 { private static final String EXCHANGE_NAME = "topic_logs"; // 给交换机起个名儿 public static void main(String[] args) throws IOException, TimeoutException { // 1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 2.设置RabbitMQ的地址、连接用户 factory.setHost("121.199.31.220"); // 填入ip地址,即之前安装了MQ的阿里云服务器的实例 factory.setUsername("admin"); factory.setPassword("password"); // 3.建立连接 Connection connection = factory.newConnection(); // 4.获得信道 Channel channel = connection.createChannel(); // 5.声明定义一个topic类型的交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); // 6.声明一个随机的临时队列(非持久的、能自动删除的队列) String queueName = channel.queueDeclare().getQueue(); // 7.声明一个路由键 String routingKey = "*.*.rabbit"; // 7.5 将上述的临时队列、路由键,绑定到1个交换机上 channel.queueBind(queueName, EXCHANGE_NAME, routingKey); String routingKey2 = "lazy.#"; channel.queueBind(queueName, EXCHANGE_NAME, routingKey2); // 8.接收消息 System.out.println("开始接收消息"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); // 将字符转换为字符串 System.out.println("收到消息:" + message + " routingKey:" + envelope.getRoutingKey()); } }; // 9.进行消息的消费 channel.basicConsume(queueName, true, consumer); } }
在两个接受类都启动运行的前提下,启动发送类:
发送类:
预判下
发送了:Animal World routingKey:quick.orange.rabbit // 接受类1: // 接受类2: 发送了:Animal World routingKey:lazy.orange.elephant // 接受类1: // 接受类2: 发送了:Animal World routingKey:quick.orange.fox // 接受类1: 发送了:Animal World routingKey:lazy.brown.fox // 接受类2: 发送了:Animal World routingKey:lazy.pink.rabbit // 接受类2:虽然接受类2的两个路由键都满足,但只会显示1次 发送了:Animal World routingKey:quick.brown.fox 发送了:Animal World routingKey:orange 发送了:Animal World routingKey:quick.orange.male.rabbit // 不满足接受类2,因为路由键显示只能是三个单词 发送了:Animal World routingKey:lazy.orange.male.rabbit // 不满足接受类1,因为*只能代表1个单词 // 接受类2:
接受类1:
其路由键为
String routingKey = "*.orange.*";
收到消息:Animal World routingKey:quick.orange.rabbit 收到消息:Animal World routingKey:lazy.orange.elephant 收到消息:Animal World routingKey:quick.orange.fox
接受类2:
String routingKey = "*.*.rabbit";
String routingKey2 = "lazy.#";
(此处的井号#不会限制单词的数量)收到消息:Animal World routingKey:quick.orange.rabbit 收到消息:Animal World routingKey:lazy.orange.elephant 收到消息:Animal World routingKey:lazy.brown.fox 收到消息:Animal World routingKey:lazy.pink.rabbit 收到消息:Animal World routingKey:lazy.orange.male.rabbit
上述9个例子,包含了几乎所有的情况。
八、SpringBoot整合RabbitMQ
8.1 创建SpringBoot项目
1.消费者项目
创建SpringBoot项目
spring-boot-rabbitmq-consumer
:我曾遇到的问题:
忘记了选择类型为maven:最后项目一创建时就会爆红出问题
配置路径:
2.生产者项目
同理,创建项目
spring-boot-rabbitmq-producer
。8.2 引入依赖
在以上两个springboot项目的配置文件
pom.xml
中,均做出以下4点的修改和依赖补充:<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.1.RELEASE</version> <!--1.将springboot的版本改为2.2.1.RELEASE--> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.imooc</groupId> <artifactId>spring-boot-rabbitmq-consumer</artifactId> <version>0.0.1-SNAPSHOT</version> <name>spring-boot-rabbitmq-consumer</name> <description>Demo project for Spring Boot</description> <properties> <java.version>17</java.version> </properties> <!--2.阿里云Maven镜像--> <repositories> <repository> <id>aliyun</id> <name>aliyun</name> <url>https://maven.aliyun.com/repository/public</url> </repository> </repositories> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!--3.1 引入rabbitmq相关的依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <version>2.2.1.RELEASE</version> <!--4.这里加上版本信息,否则会出错--> </plugin> </plugins> </build> </project>
8.3 发送者:编写
application.properties
配置文件由于使用到了
rabbitmq
,同时因为后续要启动生产者(发送类)连接阿里云的远程云服务器,所以要配置下各自的端口号:在
application.properties
配置文件中,进行程序的配置:server.port=8080 spring.application.name=producer spring.rabbitmq.addresses=121.199.31.220:5672 spring.rabbitmq.username=admin spring.rabbitmq.password=password spring.rabbitmq.virtual-host=/ spring.rabbitmq.connection-timeout=15000
8.4 发送者:编写Rabbitmq的配置类
新建一个Java类,用于配置
tabbitmq
:
声明有哪些交换机、哪些队列;下面生成的对象,均放在了Spring IoC容器中进行管理:
package com.imooc.springbootrabbitmqproducer; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * Rabbitmq的配置类 */ @Configuration public class TopicRabbitmqConfig { // 定义2个队列 @Bean public Queue queue1(){ return new Queue("queue1"); // 队列名儿,同方法名,这样spring会自动匹配 } @Bean public Queue queue2(){ return new Queue("queue2"); // 队列名儿,同方法名,这样spring会自动匹配 } // 定义1个交换机 @Bean TopicExchange exchange(){ return new TopicExchange("bootExchange"); } // 第一次绑定:将上述的队列1,通过自己的路由键,绑定到交换机上 @Bean Binding bingExchangeMessage1(Queue queue1, TopicExchange exchange) { return BindingBuilder.bind(queue1).to(exchange).with("dog.red"); } // 第二次绑定:将上述的队列2,通过自己的路由键,绑定到交换机上 @Bean Binding bingExchangeMessage2(Queue queue2, TopicExchange exchange) { return BindingBuilder.bind(queue2).to(exchange).with("dog.#"); } }
8.5 发送者:编写发送类
MsgSender
编写发送消息的方法
package com.imooc.springbootrabbitmqproducer; import org.springframework.amqp.core.AmqpTemplate; import javax.annotation.Resource; /** * 发送类:用于发送消息 */ public class MsgSender { @Resource private AmqpTemplate rabbitmqTemplate; // 依赖注入 public void send1(){ String message = "This is message 1, routingKey is dog.red"; System.out.println("发送了:" + message); // 发送消息 this.rabbitmqTemplate.convertAndSend("bootExchange", "dog.red", message); } public void send2(){ String message = "This is message 2, routingKey is dog.black"; System.out.println("发送了:" + message); // 发送消息 this.rabbitmqTemplate.convertAndSend("bootExchange", "dog.black", message); } }
8.6 发送者:编写测试类
为了发送消息,需要编写一个测试类:
package com.imooc.springbootrabbitmqproducer; import org.junit.jupiter.api.Test; import org.springframework.boot.test.context.SpringBootTest; import javax.annotation.Resource; @SpringBootTest class SpringBootRabbitmqProducerApplicationTests { @Resource MsgSender msgSender; // 依赖注入 @Test public void send1(){ msgSender.send1(); } @Test public void send2(){ msgSender.send2(); } }
以上,发送者,就已编写完成。
接下来,开始编写接收者。
springboot中要想使用rabbitmq来接收消息,其实是非常方便的。
8.7 接收者:编写
application.properties
配置文件由于使用到了
rabbitmq
,同时因为后续要启动消费者(接受类)来连接阿里云的远程服务器,所以要配置下各自的端口号:在
application.properties
配置文件中,进行程序的配置:server.port=8081 spring.application.name=consumer spring.rabbitmq.addresses=121.199.31.220:5672 spring.rabbitmq.username=admin spring.rabbitmq.password=password spring.rabbitmq.virtual-host=/ spring.rabbitmq.connection-timeout=15000
8.8 接收者:编写两个接受类
接受类1:
package com.imooc.springbootrabbitmqconsumer; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * 消费者1 */ @Component @RabbitListener(queues = "queue1") // 用注解的形式,指定队列为queue1(queue1就是生产者那边定义的队列) public class Receiver1 { @RabbitHandler // 受到消息后,会怎么处理 public void process(String message) { System.out.println("Receiver1:" + message); } }
接受类2:
package com.imooc.springbootrabbitmqconsumer; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * 消费者2 */ @Component @RabbitListener(queues = "queue2") // 用注解的形式,指定队列为queue1(queue1就是生产者那边定义的队列) public class Receiver2 { @RabbitHandler // 受到消息后,会怎么处理 public void process(String message) { System.out.println("Receiver2:" + message); } }
8.9 启动
在保证阿里云服务器的RabbitMQ成功运行的前提下:
控制台结果为 :
发送了:This is message 1, routingKey is dog.red 发送了:This is message 2, routingKey is dog.black
2.再启动接收类:
会分别监听两个队列:
控制台结果为 :
Receiver1:This is message 1, routingKey is dog.red Receiver2:This is message 1, routingKey is dog.red Receiver2:This is message 2, routingKey is dog.black
拓展:
我曾遇到的问题:
原因:
JDK的版本不一致导致。
解决:将JDK的版本改为1.8
借鉴:https://blog.csdn.net/qq_42025798/article/details/113917231
综上所述,已成功的通过rabbitmq,在两个独立的项目之间,即生产者(发送类)、消费者(接受类)之间,建立连接,对消息进行发送并接收。
也实现了SpringBoot整合RabbitMQ。
不管是Java客户端,还是SpringBoot,都能很好的运用了RabbitMQ。
后续如果想更深入的了解RabbitMQ
,可以参考官网如下内容:
Comments | NOTHING