30.5 消息队列 RabbitMQ



一、本节介绍

1.1 主要内容

1.初始MQ:核心概念、组件

2.RabbitMQ的安装、启动

3.RabbitMQ管理后台

看到整体消息的流量,交换机,队列,对用户添加、删除、配置权限

4.案例实战

简单发消息--->多个消费者

5.交换机的工作模式

6.Spring Boot整合RabbitMQ

二、初识RabbitMQ

2.1 RabbitMQ官网

1.

RabbitMQ的安装是比较复杂的,因为它是用Erlang语言编写的,还需要该语言的环境。

但是,本课程采用另外一种简单清晰的方式。

2.

下面是官网的一些教程:

2.2 MQ(消息队列)

1.有三个基本概念:

2.消息队列的作用:

方便系统解耦、流量削峰、异步调用。

3.消息队列的特性:

2.3 RabbitMQ的特点:5个

2.4 RabbitMQ的核心概念、核心组件(图)

其中:

  • 虚拟主机:隔离服务。因为同一个RabbitMQ消息队列,可能会同时给多个服务进行使用。如果想服务与服务之间隔离开、不想相互干扰,那么就可以使用虚拟主机进行隔离。

三、RabbitMQ的安装

3.1 Linux下安装

要安装的RabbitMQ的版本是3.8.2;

不需要单独安装Erlang环境(饿狼)

1 环境配置

前提:在一个新建的阿里云服务器的Cent OS 7.6上安装,不要对yum换源,否则可能会安装失败。

ssh root@121.199.31.220

echo "export LC_ALL=en_US.UTF-8" >> /etc/profile---------------编码方式设置为UTF-8,支持中文

source /etc/profile------------------------------------------------------------------- 让上面的设置生效

2 安装

第一步:执行

curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh | sudo bash------------配置RabbitMQ:用于下载一个脚本,以找到RabbitMQ源地址

第二步,执行:

curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | sudo bash-------------配置erlang

第三步:

sudo yum install rabbitmq-server-3.8.2-1.el7.noarch-------------------将RabbitMQ及其依赖,安装

3 启动

启动RabbitMQ:

systemctl start rabbitmq-server

看看端口有没有起来,查看状态:

rabbitmqctl status

要检查RabbitMQ服务器的状态,请运行:

systemctl status rabbitmq-server

3.2 Mac OS和Windows下的安装

用的少,见文档《安装RabbitMQ》。

因为RabbitMQ服务端代码是使用并发式语言Erlang编写的,所以,安装Rabbit MQ的前提是安装Erlang。>

以下是自己尝试安装windows的RabbitMQ。尝试失败,果然像老师所说的,有很多的坑:

还是使用云服务器的Linux系统吧。

1.安装Erlang

下载安装包,一路next;

电脑上环境变量;

cmd命令行,表示安装Erlang成功:

2.安装RabbitMQ

一路next,RabbitMQ安装好后;

再去安装RabbitMQ-Plugins插件:打开命令行cd,输入RabbitMQ的sbin目录:

rabbitmq-plugins enable rabbitmq_management

打开命令行命令行,进入RabbitMQ的安装目录: sbin

输入 rabbitmqctl status , 如果出现以下的图,说明安装是成功的,并且说明现在RabbitMQ Server已经启动了,运行正常。

3.3 RabbitMQ常用命令

开启web管理界面

rabbitmq-plugins enable rabbitmq_management

停止RabbitMQ

rabbitmqctl stop

设置开机启动

systemctl enable rabbitmq-server

启动RabbitMQ

systemctl start rabbitmq-server

看看端口有没有起来,查看状态

rabbitmqctl status

要检查RabbitMQ服务器的状态,请运行:

systemctl status rabbitmq-server

四、RabbitMQ的管理后台

管理后台是亮点之一。

4.1 开启web形式的管理后台

rabbitmq-plugins enable rabbitmq_management

4.2 添加admin用户

因为默认情况下,用户只有guest,但是guest在远端是访问不上去的,所以要新添加一个admin用户。

rabbitmqctl add_user admin password------------------------------ 添加用户

rabbitmqctl set_user_tags admin administrator------------- 将其设置为管理员

4.3 登录管理后台

在阿里云控制台上,配置阿里云安全组,打开15672端口(RabbitMQ的默认端口)

浏览器中,输入121.199.31.220:15672/

用admin,密码password即可登录

成功登录:

让admin管理员,拥有与guest一样的权限:

五、实战案例1:基础运用,只1条消息、1个消费者

使用Java客户端,来连接到RabbitMQ,然后进行消息的发送、接收。

第一次接触新东西,要循序渐进

5.1 目标

5.2 引入依赖

1.引入两个依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.imooc</groupId>
    <artifactId>rabbitmq</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <!--1.rabbitmq依赖 -->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.8.0</version>
        </dependency>

        <!--2. 用于记录日志-->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-nop</artifactId>
            <version>1.7.29</version>
        </dependency>
    </dependencies>

</project>

5.3 前提准备

(1)

编写发送类Send
虽然该类很简单,但是能学到很多rabbitmq的通用流程:

   // 1.创建连接工厂
   // 2.设置RabbitMQ的地址
   // 3.建立连接
   // 4.获得信道
   // 5.声明队列
   // 6.发布消息
   // 7.关闭连接

(2)

在阿里云控制台上,配置网络与安全组,开放5672端口:

(3)

(4)

/是最常用的virtual host,因为没有必要区分过多的虚拟主机。那么就需要给/的virtual host加上admin用户的权限:

5.4 编写发送类Send

package helloworld;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 *  Hello World 的发送类:连接到RabbitMQ的服务端,然后发送一条消息,最后退出。
 */
public class Send {

    private final static String QUEUE_NAME = "hello";               // 用类常量,来体现队列的名字


    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2.设置RabbitMQ的地址
        factory.setHost("121.199.31.220");              // 填入ip地址,即之前安装了MQ的阿里云的实例
        factory.setUsername("admin");
        factory.setPassword("password");
        // 3.建立连接
        Connection connection = factory.newConnection();
        // 4.获得信道
        Channel channel = connection.createChannel();
        // 5.声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 6.发布消息
        String message = "Hello World!";
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
        System.out.println("发送了消息:" + message);
        // 7.关闭连接
        channel.close();
        connection.close();
    }


}

其中:

  • 关于5.声明队列
  • channel.queueDeclare(String var1, boolean var2, boolean var3, boolean var4, Map<String, Object> var5)
        var1:消息队列的名字
        var2:这个队列需要持久吗?持久就是如果服务器重启了,该队列仍然存在。一般不需要持久存在。
        var3:是否独有。该队列是否是仅能给这个连接使用?
        var4:是有需要自动删除?如果队列没有使用,就会自动删除。
        var5:
    Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
                                     Map<String, Object> arguments) throws IOException;
    Params:
    queue – the name of the queue
    durable – true if we are declaring a durable queue (the queue will survive a server restart)
    exclusive – true if we are declaring an exclusive queue (restricted to this connection)
    autoDelete – true if we are declaring an autodelete queue (server will delete it when no longer in use)
    arguments – other properties (construction arguments) for the queue

    publish

    发布,公布,放出(let out)

    结果为:

    发送了消息:Hello World!

    5.5 编写接收类Recv

    package helloworld;
    import com.rabbitmq.client.*;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    /**
     *  接收消息,并打印,持续运行(即持续的在消费消息)
     */
    public class Recv {
      private final static String QUEUE_NAME = "hello";               // 用类常量,来体现队列的名字
    
      public static void main(String[] args) throws IOException, TimeoutException {
    
          // 1.创建连接工厂
          ConnectionFactory factory = new ConnectionFactory();
          // 2.设置RabbitMQ的地址
          factory.setHost("121.199.31.220");              // 填入ip地址,即之前安装了MQ的阿里云的实例
          factory.setUsername("admin");
          factory.setPassword("password");
          // 3.建立连接
          Connection connection = factory.newConnection();
          // 4.获得信道
          Channel channel = connection.createChannel();
          // 5.声明队列
          channel.queueDeclare(QUEUE_NAME, false, false, false, null);
          // 6.接收消息,并消费
          channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {       // 队列名;自动确认消息;对消息的处理;
              @Override
              public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                  String message = new String(body, "UTF-8");
                  System.out.println("已收到消息:" + message);
              }
          });
          
      }
    }

    Consume

    消费,消耗(use up)

    结果为:

    因为前面的发送类,发送的消息,存储在队列中;

    所以接受类在启动之后,可以拿到这个消息

    已收到消息:Hello World!

    如果发送类改变了消息内容,接受类因为一直在运行,就会马上接收到最新的信息:

    以上,就完成了对RabbitMQ的基础运用。

    5.6 RabbitMQ控制台的变化

    1.

    RabbitMQ的控制台上:

    刚才发送消息的流量:

    2.

    java程序的操作,都会在RabbitMQ控制台中有所体现。

    六、实战案例2:10条信息、多消费者

    如果消息很多,一个消费者处理不过来,能否让多个消费者一起承担呢?

    当然可以。

    6.1 编写发送类NewTask

    package workqueues;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    /**
     *  任务会很耗时,因为有多个任务
     */
    public class NewTask {
      private final static String TASK_QUEUE_NAME = "task_queue";               // 用类常量,来体现队列的名字
    
      public static void main(String[] args) throws IOException, TimeoutException {
          // 1.创建连接工厂
          ConnectionFactory factory = new ConnectionFactory();
          // 2.设置RabbitMQ的地址
          factory.setHost("121.199.31.220");              // 填入ip地址,即之前安装了MQ的阿里云的实例
          factory.setUsername("admin");
          factory.setPassword("password");
          // 3.建立连接
          Connection connection = factory.newConnection();
          // 4.获得信道
          Channel channel = connection.createChannel();
          // 5.声明队列
          channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
          // 6.发布消息(模拟消息很多的情况)
          for (int i = 0; i < 10; i++) {
              String message;
              message = i + "...";
              channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes("UTF-8"));
              System.out.println("发送了消息:" + message);
          }
          // 7.关闭连接
          channel.close();
          connection.close();
      }
    }

    其中:

  • 如果自己电脑已安装了本地客户端,也可以连接它:

            factory.setHost("localhost");            // 即默认用的是guest用户进行登录,此时就不用用户名、密码了

    对于发送方,它只管发送消息,不管后续。至于谁接收、什么时候接收,这是RabbitMQ消息队列的事情。

    所以,发送类,在发送完消息后,就会直接退出。

    结果为:

    发送了消息:0...
    发送了消息:1...
    发送了消息:2...
    发送了消息:3...
    发送了消息:4...
    发送了消息:5...
    发送了消息:6...
    发送了消息:7...
    发送了消息:8...
    发送了消息:9...

    6.2 编写接收类Worker

    package workqueues;
    import com.rabbitmq.client.*;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    /**
     *  消费者,用于接收前面的批量消息
     */
    public class Worker {
      private final static String TASK_QUEUE_NAME = "task_queue";               // 用类常量,来体现队列的名字
    
      public static void main(String[] args) throws IOException, TimeoutException {
          // 1.创建连接工厂
          ConnectionFactory factory = new ConnectionFactory();
          // 2.设置RabbitMQ的地址
          factory.setHost("121.199.31.220");              // 填入ip地址,即之前安装了MQ的阿里云的实例
          factory.setUsername("admin");
          factory.setPassword("password");
          // 3.建立连接
          Connection connection = factory.newConnection();
          // 4.获得信道
          Channel channel = connection.createChannel();
          // 5.声明队列
          channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
          // 6.接收消息,并消费
          System.out.println("开始接收消息");
          channel.basicConsume(TASK_QUEUE_NAME, true, new DefaultConsumer(channel) {
              @Override
              public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                  String message = new String(body, "UTF-8");
                  System.out.println("已收到消息:" + message);
                  try {
                      doWork(message);
                  } finally {
                      System.out.println("已完成消息处理");
                  }
              }
          });
      }
      // 类方法:根据字符是否有英文句号(点),进行消息区分。被上面调用。
      private static void doWork(String task) {
          char[] chars = task.toCharArray();          // 将字符串形式的task,转换为字符数组
          for (char ch : chars) {
              if (ch == '.') {                        // 如果消息中,有一个点,就现成休眠1秒,模拟处理消息的过程
                  try {
                      Thread.sleep(1000);
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
              }
          }
    
      }
    }

    结果为:

    开始接收消息
    已收到消息:0...
    已完成消息处理
    已收到消息:1...
    已完成消息处理
    已收到消息:2...
    已完成消息处理
    已收到消息:3...
    已完成消息处理
    已收到消息:4...
    已完成消息处理
    已收到消息:5...
    已完成消息处理
    已收到消息:6...
    已完成消息处理
    已收到消息:7...
    已完成消息处理
    已收到消息:8...
    已完成消息处理
    已收到消息:9...
    已完成消息处理

    以上,虽然是消息已处理完成,但是只有一个worker的原因,速度很慢,大约有30秒。

    所以,下一步,要加入更多的worker,用于消息的处理。

    6.3 创建3个消费者,并配置并行运行

    1.

    如果继续运行,会提示不允许并行运行:

    因为当前worker只有一个实例,就算是重启,还是原来那一个工人,把活儿重新再干一遍,还是花30秒。

    所以,不重启,接下来需要一定的配置:

    应用即可。此时就能并行运行了:

    multiply

    乘法;众多的(many)

    我曾遇到的问题:

    我的IDEA没有allow parallel run:

    我的IDEA:

    老师的IDEA:

    原因:

    因为老师是旧的IDEA,我是新的IDEA。

    解决:

    结果:

    两个消费者,能成功的并行运行了:

    2.

    再增加一个消费者,至一共三个消费者。看下任务的分配方式:

    worker1

    已收到消息:0...
    已完成消息处理
    已收到消息:3...
    已完成消息处理
    已收到消息:6...
    已完成消息处理
    已收到消息:9...
    已完成消息处理

    worker2:

    已收到消息:1...
    已完成消息处理
    已收到消息:4...
    已完成消息处理
    已收到消息:7...
    已完成消息处理

    worker3:

    开始接收消息
    已收到消息:2...
    已完成消息处理
    已收到消息:5...
    已完成消息处理
    已收到消息:8...
    已完成消息处理

    由上可知:
    任务的分配方式是平均分配。即按照任务的数量,平均分。

    6.4 优化:当每个消息压力不同时,通过信息确认实现公平分配

    但是,上面按照任务数量的方式来分配调度任务,一定公平吗?

    不公平。因为实际工作中,每个消息的压力很可能是不一样的。

    1.按数量平均分配时的不公平情况:

    (1)

    发送类NewTask

    /**
     *  任务会很耗时,因为有多个任务
     */
    public class NewTask {
      private final static String TASK_QUEUE_NAME = "task_queue";               // 用类常量,来体现队列的名字
    
      public static void main(String[] args) throws IOException, TimeoutException {
          // 1.创建连接工厂
          ConnectionFactory factory = new ConnectionFactory();
          // 2.设置RabbitMQ的地址
          factory.setHost("121.199.31.220");              // 填入ip地址,即之前安装了MQ的阿里云的实例
          factory.setUsername("admin");
          factory.setPassword("password");
          // 3.建立连接
          Connection connection = factory.newConnection();
          // 4.获得信道
          Channel channel = connection.createChannel();
          // 5.声明队列
          channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
          // 6.发布消息
          for (int i = 0; i < 10; i++) {
              String message;
              if (i % 2 == 0) {
                  message = i + "...";
              }else {
    //                message = i;
                  message = String.valueOf(i);            // 让整型,转换为字符串
              }
              channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes("UTF-8"));
              System.out.println("发送了消息:" + message);
          }
          // 7.关闭连接
          channel.close();
          connection.close();
      }
    }

    结果为:

    模拟出了不同压力的消息:

    发送了消息:0...
    发送了消息:1
    发送了消息:2...
    发送了消息:3
    发送了消息:4...
    发送了消息:5
    发送了消息:6...
    发送了消息:7
    发送了消息:8...
    发送了消息:9

    (2)

    ps:要先启动接受类的实例,再运行发送类中发送信息的方法。

    接收类:

    worker1:

    开始接收消息
    已收到消息:0...
    已完成消息处理
    已收到消息:2...            // 实际上,执行到这里时,worker1早就已经执行完了 :)
    已完成消息处理
    已收到消息:4...
    已完成消息处理
    已收到消息:6...
    已完成消息处理
    已收到消息:8...
    已完成消息处理

    worker2:

    开始接收消息
    已收到消息:1
    已完成消息处理
    已收到消息:3
    已完成消息处理
    已收到消息:5
    已完成消息处理
    已收到消息:7
    已完成消息处理
    已收到消息:9
    已完成消息处理

    很明显,这是分配任务不公平的情况:

  • 按数量平均分配:你干奇数的活儿,他干偶数的活儿。
  • 先把任务分配完,你们工人再去干活儿

    重活儿,全让人家worker1自己干,你worker2早就干完了自己的轻活儿,然后在那里闲着?

    2.按压力来分配时的公平情况:

    (1)

    (2)

    接收类Worker
    优化点有以下三处

    /**
     *  消费者,用于接收前面的批量消息
     */
    public class Worker {
    
      private final static String TASK_QUEUE_NAME = "task_queue";               // 用类常量,来体现队列的名字
    
      public static void main(String[] args) throws IOException, TimeoutException {
          // 1.创建连接工厂
          ConnectionFactory factory = new ConnectionFactory();
          // 2.设置RabbitMQ的地址
          factory.setHost("121.199.31.220");              // 填入ip地址,即之前安装了MQ的阿里云的实例
          factory.setUsername("admin");
          factory.setPassword("password");
          // 3.建立连接
          Connection connection = factory.newConnection();
          // 4.获得信道
          Channel channel = connection.createChannel();
          // 5.声明队列
          channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
          // 6.接收消息,并消费
          System.out.println("开始接收消息");
          channel.basicQos(1);            // 1.在没有处理完自己的1个任务之前,是不会接收下一个任务的
          channel.basicConsume(TASK_QUEUE_NAME, false, new DefaultConsumer(channel) {   // 2.自动确认:false
              @Override
              public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                  String message = new String(body, "UTF-8");
                  System.out.println("已收到消息:" + message);
                  try {
                      doWork(message);
                  } finally {
                      System.out.println("已完成消息处理");
                      channel.basicAck(envelope.getDeliveryTag(), false);     // 3.在自己的活儿干完之后,要手动确认消息,即跟MQ说一声“我活儿干完了!”
                  }
              }
          });
      }
      // 类方法:根据字符是否有英文句号(点),进行消息区分。被上面调用。
      private static void doWork(String task) {
          char[] chars = task.toCharArray();          // 将字符串形式的task,转换为字符数组
          for (char ch : chars) {
              if (ch == '.') {                        // 如果消息中,有一个点,就现成休眠1秒,模拟处理消息的过程
                  try {
                      Thread.sleep(1000);
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
              }
          }
      }
    }
    ack
    acknowledge=confirm

    先开启两个worker实例,再启动发送类:结果为:

    worker1:

    开始接收消息
    已收到消息:0...
    已完成消息处理
    已收到消息:3
    已完成消息处理
    已收到消息:4...
    已完成消息处理
    已收到消息:7
    已完成消息处理
    已收到消息:8...
    已完成消息处理

    worker2:

    开始接收消息
    已收到消息:1
    已完成消息处理
    已收到消息:2...
    已完成消息处理
    已收到消息:5
    已完成消息处理
    已收到消息:6...
    已完成消息处理
    已收到消息:9
    已完成消息处理

    很明显,这是分配任务公平的情况,是按照压力调度的。

  • 在没有处理完自己的1个任务之前,是不会接收下一个任务的;
  • worker在谁先把自己的活儿干完,就要手动确认消息,即跟MQ说一声“我活儿干完了!”

    七、交换机的类型

    7.1 四种类型

    重点是前三种,第四种几乎不用。

    fanout

    =fan out。扇出。

    7.2 fanout类型:扇形广播的群发

    fanout类型,是一个基础的交换机类型。

    1.图示

  • 只要是与该交换机相关联(绑定)的queue,一律都会受到消息。
  • 每个队列受到的消息内容是一样的,即有很多副本。

    目标:搭建日志记录系统:

  • 程序1:发出日志信息
  • 程序2:负责接收日志消息 。接收器1:接收,存到磁盘;接收器2:接收,打印到控制台上

    2.编写发送类EmitLog

    package fanout;
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    /**
     *  发送日志信息
     */
    public class EmitLog {
    
      private static final String EXCHANGE_NAME = "logs";         // 给交换机起个名儿
    
      public static void main(String[] args) throws IOException, TimeoutException {
          // 1.创建连接工厂
          ConnectionFactory factory = new ConnectionFactory();
          // 2.设置RabbitMQ的地址、连接用户
          factory.setHost("121.199.31.220");              // 填入ip地址,即之前安装了MQ的阿里云服务器的实例
          factory.setUsername("admin");
          factory.setPassword("password");
          // 3.建立连接
          Connection connection = factory.newConnection();
          // 4.获得信道
          Channel channel = connection.createChannel();
          // 5.声明定义一个交换机
          channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
          // 6.把消息发送到交换机上
          String message = "info:hello world!";
          channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
          System.out.println("发送了消息:" + message);
          // 7.关闭连接
          channel.close();
          connection.close();
      }
    }

    emit

    =launch。发射,发出

    结果为:

    发送了消息:info:hello world!

    3.编写接受类ReceiveLogs:1个接收者

    日志的特点:

  • 只对当前的消息感兴趣,像以前发生的日志没必要再记录。
  • 时效性强。如果线上日志量很大,可能保存不超过3天就清理了。因为日志记录太多浪费磁盘,且本身也没有太大价值。

    基于以上日志的特点,队列使用新的空队列,让RabbitMQ自动生成队列的名字。即,非持久的、能自动删除的队列。

    当接收者不存在时,该临时队列也会自动的删除。即,这样搞就可以在同一个类中多次启动它,每一次队列的名字都是不一样的。

    package fanout;
    import com.rabbitmq.client.*;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    public class ReceiveLogs {
      private static final String EXCHANGE_NAME = "logs";         // 给交换机起个名儿
    
      public static void main(String[] args) throws IOException, TimeoutException {
          // 1.创建连接工厂
          ConnectionFactory factory = new ConnectionFactory();
          // 2.设置RabbitMQ的地址、连接用户
          factory.setHost("121.199.31.220");              // 填入ip地址,即之前安装了MQ的阿里云服务器的实例
          factory.setUsername("admin");
          factory.setPassword("password");
          // 3.建立连接
          Connection connection = factory.newConnection();
          // 4.获得信道
          Channel channel = connection.createChannel();
          // 5.声明定义一个交换机
          channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
          // 6.声明一个临时队列(非持久的、能自动删除的队列)
          String queueName = channel.queueDeclare().getQueue();
          // 7.将上述临时队列绑定到交换机上
          channel.queueBind(queueName, EXCHANGE_NAME, "");
          // 8.接收消息
          System.out.println("开始接收消息");
          Consumer consumer = new DefaultConsumer(channel) {
              @Override
              public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                  String message = new String(body, "UTF-8");         // 将字符转换为字符串
                  System.out.println("收到消息:" + message);
              }
          };
          // 9.进行消息的消费
          channel.basicConsume(queueName, true, consumer);
      }
    }

    其中:

  • 接收,是跟队列绑定的,不跟交换机绑定,这是一种解耦的关系。

    先启动接受类,让其持续运行。然后,启动发送类:

    发送类已成功发送消息:

    发送了消息:info:hello world!

    接受类也已成功的接收消息:

    开始接收消息
    收到消息:info:hello world!

    4.如果有多个接收者

    设置,允许并行计算

    先启动接受类,让其持续运行。然后,启动发送类:

    两个接受类,同时收到了消息:

    7.3 direct类型:灵活选择接收的信息

    1.图示

    场景:有时候不需要订阅所有的信息。对于发送者:需要灵活的去区分不同类型的信息,究竟应该发送到哪一个队列中。对于接收者:只你想接收哪个消息,就去绑定该消息对应的队列。

    比如,对于日志系统,不需要把所有的日志都存储到磁盘上个,只需要在磁盘上存储那些错误的日志error。

    选择信息的关键:RoutingKey

  • 设置RoutingKey,用于对哪些信息感兴趣。对于不是RoutingKey的信息,会直接丢弃,不会转发到任何的队列中。
  • 允许相同的RoutingKey,绑定到多个队列中。

    在实际生产中,是这样的图示:

    目标:优化日志记录系统:

  • 程序1:发出日志信息
  • 程序2:负责接收日志消息。接收器1:只接收error信息;接收器2:info、error、warning的信息

    2.编写发送类EmitLogDirect

    package direct;
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    /**
     *  用direct类型的交换机,来发送信息
     */
    public class EmitLogDirect {
    
      private static final String EXCHANGE_NAME = "direct_logs";         // 给交换机起个名儿
    
      public static void main(String[] args) throws IOException, TimeoutException {
          // 1.创建连接工厂
          ConnectionFactory factory = new ConnectionFactory();
          // 2.设置RabbitMQ的地址、连接用户
          factory.setHost("121.199.31.220");              // 填入ip地址,即之前安装了MQ的阿里云服务器的实例
          factory.setUsername("admin");
          factory.setPassword("password");
          // 3.建立连接
          Connection connection = factory.newConnection();
          // 4.获得信道
          Channel channel = connection.createChannel();
          // 5.声明定义一个direct类型的交换机
          channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);     // (1)
          // 6.把消息发送到交换机上
          String message = "info:hello world!";
          channel.basicPublish(EXCHANGE_NAME, "info", null, message.getBytes("UTF-8"));// (2)
          System.out.println("发送了消息," +"等级为info,信息内容:"+ message);         // (3)
    
          message = "warning:hello world!";
          channel.basicPublish(EXCHANGE_NAME, "warning", null, message.getBytes("UTF-8"));
          System.out.println("发送了消息," +"等级为warning,信息内容:"+ message);
    
          message = "error:hello world!";
          channel.basicPublish(EXCHANGE_NAME, "error", null, message.getBytes("UTF-8"));
          System.out.println("发送了消息," +"等级为error,信息内容:"+ message);
    
          // 7.关闭连接
          channel.close();
          connection.close();
      }
    }

    老师:

    重点不在于敲每一行的代码,而在于理解、提升自己的能力。

    老师和我曾遇到的问题:

    解决:
    记得更改交换机的名儿

    结果为:

    发送了消息,等级为info,信息内容:info:hello world!
    发送了消息,等级为warning,信息内容:warning:hello world!
    发送了消息,等级为error,信息内容:error:hello world!

    3.编写两个接受类

    第一个接收类ReceiveLogsDirect1

    package direct;
    import com.rabbitmq.client.*;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    /**
     *  接收3个不同等级的日志:根据自己需求,通过对应的绑定,灵活控制接收的信息
     */
    public class ReceiveLogsDirect1 {
      private static final String EXCHANGE_NAME = "direct_logs";         // 给交换机起个名儿
    
      public static void main(String[] args) throws IOException, TimeoutException {
          // 1.创建连接工厂
          ConnectionFactory factory = new ConnectionFactory();
          // 2.设置RabbitMQ的地址、连接用户
          factory.setHost("121.199.31.220");              // 填入ip地址,即之前安装了MQ的阿里云服务器的实例
          factory.setUsername("admin");
          factory.setPassword("password");
          // 3.建立连接
          Connection connection = factory.newConnection();
          // 4.获得信道
          Channel channel = connection.createChannel();
          // 5.声明定义一个交换机
          channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
          // 6.声明一个随机的临时队列(非持久的、能自动删除的队列)
          String queueName = channel.queueDeclare().getQueue();
          // 7.将3个临时队列绑定到1个交换机上
          channel.queueBind(queueName, EXCHANGE_NAME, "info");
          channel.queueBind(queueName, EXCHANGE_NAME, "warning");
          channel.queueBind(queueName, EXCHANGE_NAME, "error");
          // 8.接收消息
          System.out.println("开始接收消息");
          Consumer consumer = new DefaultConsumer(channel) {
              @Override
              public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                  String message = new String(body, "UTF-8");         // 将字符转换为字符串
                  System.out.println("收到消息:" + message);
              }
          };
          // 9.进行消息的消费
          channel.basicConsume(queueName, true, consumer);
      }
    }

    第二个接收类ReceiveLogsDirect2

    package direct;
    import com.rabbitmq.client.*;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    /**
     *  只接收1个不同等级的日志
     */
    public class ReceiveLogsDirect2 {
      private static final String EXCHANGE_NAME = "direct_logs";         // 给交换机起个名儿
    
      public static void main(String[] args) throws IOException, TimeoutException {
          // 1.创建连接工厂
          ConnectionFactory factory = new ConnectionFactory();
          // 2.设置RabbitMQ的地址、连接用户
          factory.setHost("121.199.31.220");              // 填入ip地址,即之前安装了MQ的阿里云服务器的实例
          factory.setUsername("admin");
          factory.setPassword("password");
          // 3.建立连接
          Connection connection = factory.newConnection();
          // 4.获得信道
          Channel channel = connection.createChannel();
          // 5.声明定义一个交换机
          channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
          // 6.声明一个随机的临时队列(非持久的、能自动删除的队列)
          String queueName = channel.queueDeclare().getQueue();
          // 7.将1个临时队列绑定到1个交换机上
          channel.queueBind(queueName, EXCHANGE_NAME, "error");
          // 8.接收消息
          System.out.println("开始接收消息");
          Consumer consumer = new DefaultConsumer(channel) {
              @Override
              public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                  String message = new String(body, "UTF-8");         // 将字符转换为字符串
                  System.out.println("收到消息:" + message);
              }
          };
          // 9.进行消息的消费
          channel.basicConsume(queueName, true, consumer);
      }
    }

    结果为:

    接受类1:

    收到消息:info:hello world!
    收到消息:warning:hello world!
    收到消息:error:hello world!

    接受类2:

    收到消息:error:hello world!

    以上,通过direct类型的交换机,实现了根据消息类型不同,灵活接收自己所需要的信息。

    7.4 topic类型:对于复杂条件,能模糊匹配

    1.direct类型的交换机的不足

    跟direct类型的交换机,是一种升级关系。

    direct类型的交换机的不足:

    不能根据一些复杂的条件,进行路由。

    比如:有时候不仅要根据消息的严重性进行接收,还要根据消息的源头。

    即,目标:优化日志记录系统:

  • 程序1:发出日志信息
  • 程序2:负责接收日志消息。接收器1:只接收来自用户模块的error信息;接收器2:只接收来自用户、商品模块的,info的信息

    2.模糊匹配

    两种符号:

  • 注意是单词,不是字母
  • 星号:1个单词,且不能为空

    图示:

    3.编写发送类EmitLogTopic

    发送了9个消息,它们内容相同,但是路由键却是各不相同的;

    package topic;
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    /**
     *  用topic类型的交换机,来发送信息
     */
    public class EmitLogTopic {
    
      private static final String EXCHANGE_NAME = "topic_logs";         // 给交换机起个名儿
    
      public static void main(String[] args) throws IOException, TimeoutException {
          // 1.创建连接工厂
          ConnectionFactory factory = new ConnectionFactory();
          // 2.设置RabbitMQ的地址、连接用户
          factory.setHost("121.199.31.220");              // 填入ip地址,即之前安装了MQ的阿里云服务器的实例
          factory.setUsername("admin");
          factory.setPassword("password");
          // 3.建立连接
          Connection connection = factory.newConnection();
          // 4.获得信道
          Channel channel = connection.createChannel();
          // 5.声明定义一个direct类型的交换机
          channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);     // (1)
          // 6.把消息发送到交换机上
          String message = "Animal World";
          // 7.建立String类型的数组,模拟需要发出很多类型的消息:9个路由键
          String[] routingKeys = new String[9];
          routingKeys[0] = "quick.orange.rabbit";
          routingKeys[1] = "lazy.orange.elephant";
          routingKeys[2] = "quick.orange.fox";
          routingKeys[3] = "lazy.brown.fox";
          routingKeys[4] = "lazy.pink.fox";
          routingKeys[5] = "quick.brown.fox";
          routingKeys[6] = "orange";
          routingKeys[7] = "quick.orange.male.rabbit";
          routingKeys[8] = "lazy.orange.male.rabbit";
    
          for (int i = 0; i < routingKeys.length; i++) {
              channel.basicPublish(EXCHANGE_NAME, routingKeys[i], null, message.getBytes("UTF-8"));
              System.out.println("发送了:" + message + " routingKey:" + routingKeys[i]);
          }
    
          // 7.关闭连接
          channel.close();
          connection.close();
      }
    }

    4.编写两个接受类

    第一个接收类ReceiveLogsTopic1

    package topic;
    import com.rabbitmq.client.*;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    /**
     *  特定路由键
     */
    public class ReceiveLogsTopic1 {
      private static final String EXCHANGE_NAME = "topic_logs";         // 给交换机起个名儿
    
      public static void main(String[] args) throws IOException, TimeoutException {
          // 1.创建连接工厂
          ConnectionFactory factory = new ConnectionFactory();
          // 2.设置RabbitMQ的地址、连接用户
          factory.setHost("121.199.31.220");              // 填入ip地址,即之前安装了MQ的阿里云服务器的实例
          factory.setUsername("admin");
          factory.setPassword("password");
          // 3.建立连接
          Connection connection = factory.newConnection();
          // 4.获得信道
          Channel channel = connection.createChannel();
          // 5.声明定义一个topic类型的交换机
          channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
          // 6.声明一个随机的临时队列(非持久的、能自动删除的队列)
          String queueName = channel.queueDeclare().getQueue();
          // 7.声明一个路由键
          String routingKey = "*.orange.*";
          // 7.5 将上述的临时队列、路由键,绑定到1个交换机上
          channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
    
          // 8.接收消息
          System.out.println("开始接收消息");
          Consumer consumer = new DefaultConsumer(channel) {
              @Override
              public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                  String message = new String(body, "UTF-8");         // 将字符转换为字符串
                  System.out.println("收到消息:" + message + " routingKey:" + envelope.getRoutingKey());
              }
          };
          // 9.进行消息的消费
          channel.basicConsume(queueName, true, consumer);
      }
    }

    第二个接收类ReceiveLogsTopic2

    package topic;
    import com.rabbitmq.client.*;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    /**
     *  特定路由键
     */
    public class ReceiveLogsTopic2 {
      private static final String EXCHANGE_NAME = "topic_logs";         // 给交换机起个名儿
    
      public static void main(String[] args) throws IOException, TimeoutException {
          // 1.创建连接工厂
          ConnectionFactory factory = new ConnectionFactory();
          // 2.设置RabbitMQ的地址、连接用户
          factory.setHost("121.199.31.220");              // 填入ip地址,即之前安装了MQ的阿里云服务器的实例
          factory.setUsername("admin");
          factory.setPassword("password");
          // 3.建立连接
          Connection connection = factory.newConnection();
          // 4.获得信道
          Channel channel = connection.createChannel();
          // 5.声明定义一个topic类型的交换机
          channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
          // 6.声明一个随机的临时队列(非持久的、能自动删除的队列)
          String queueName = channel.queueDeclare().getQueue();
          // 7.声明一个路由键
          String routingKey = "*.*.rabbit";
          // 7.5 将上述的临时队列、路由键,绑定到1个交换机上
          channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
    
          String routingKey2 = "lazy.#";
          channel.queueBind(queueName, EXCHANGE_NAME, routingKey2);
    
          // 8.接收消息
          System.out.println("开始接收消息");
          Consumer consumer = new DefaultConsumer(channel) {
              @Override
              public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                  String message = new String(body, "UTF-8");         // 将字符转换为字符串
                  System.out.println("收到消息:" + message + " routingKey:" + envelope.getRoutingKey());
              }
          };
          // 9.进行消息的消费
          channel.basicConsume(queueName, true, consumer);
      }
    }

    在两个接受类都启动运行的前提下,启动发送类:

    发送类:

    预判下

    发送了:Animal World routingKey:quick.orange.rabbit            // 接受类1:           // 接受类2:
    发送了:Animal World routingKey:lazy.orange.elephant        // 接受类1:           // 接受类2:
    发送了:Animal World routingKey:quick.orange.fox            // 接受类1:
    发送了:Animal World routingKey:lazy.brown.fox                                     // 接受类2:
    发送了:Animal World routingKey:lazy.pink.rabbit                                 // 接受类2:虽然接受类2的两个路由键都满足,但只会显示1次
    发送了:Animal World routingKey:quick.brown.fox
    发送了:Animal World routingKey:orange
    发送了:Animal World routingKey:quick.orange.male.rabbit                          // 不满足接受类2,因为路由键显示只能是三个单词
    发送了:Animal World routingKey:lazy.orange.male.rabbit        // 不满足接受类1,因为*只能代表1个单词  // 接受类2:

    接受类1:

    其路由键为String routingKey = "*.orange.*";

    收到消息:Animal World routingKey:quick.orange.rabbit
    收到消息:Animal World routingKey:lazy.orange.elephant
    收到消息:Animal World routingKey:quick.orange.fox

    接受类2:

    String routingKey = "*.*.rabbit";

    String routingKey2 = "lazy.#";(此处的井号#不会限制单词的数量)

    收到消息:Animal World routingKey:quick.orange.rabbit
    收到消息:Animal World routingKey:lazy.orange.elephant
    收到消息:Animal World routingKey:lazy.brown.fox
    收到消息:Animal World routingKey:lazy.pink.rabbit
    收到消息:Animal World routingKey:lazy.orange.male.rabbit

    上述9个例子,包含了几乎所有的情况。

    八、SpringBoot整合RabbitMQ

    8.1 创建SpringBoot项目

    1.消费者项目

    创建SpringBoot项目spring-boot-rabbitmq-consumer

    我曾遇到的问题:

    忘记了选择类型为maven:最后项目一创建时就会爆红出问题

    配置路径:

    2.生产者项目

    同理,创建项目spring-boot-rabbitmq-producer

    8.2 引入依赖

    在以上两个springboot项目的配置文件pom.xml中,均做出以下4点的修改和依赖补充:

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
    
      <parent>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-parent</artifactId>
          <version>2.2.1.RELEASE</version>      <!--1.将springboot的版本改为2.2.1.RELEASE-->
          <relativePath/> <!-- lookup parent from repository -->
      </parent>
    
      <groupId>com.imooc</groupId>
      <artifactId>spring-boot-rabbitmq-consumer</artifactId>
      <version>0.0.1-SNAPSHOT</version>
      <name>spring-boot-rabbitmq-consumer</name>
      <description>Demo project for Spring Boot</description>
      <properties>
          <java.version>17</java.version>
      </properties>
    
      <!--2.阿里云Maven镜像-->
      <repositories>
          <repository>
              <id>aliyun</id>
              <name>aliyun</name>
              <url>https://maven.aliyun.com/repository/public</url>
          </repository>
      </repositories>
    
      <dependencies>
          <dependency>
              <groupId>org.springframework.boot</groupId>
              <artifactId>spring-boot-starter</artifactId>
          </dependency>
    
          <dependency>
              <groupId>org.springframework.boot</groupId>
              <artifactId>spring-boot-starter-test</artifactId>
              <scope>test</scope>
          </dependency>
    
          <!--3.1 引入rabbitmq相关的依赖-->
          <dependency>
              <groupId>org.springframework.boot</groupId>
              <artifactId>spring-boot-starter-amqp</artifactId>
          </dependency>
      </dependencies>
    
      <build>
          <plugins>
              <plugin>
                  <groupId>org.springframework.boot</groupId>
                  <artifactId>spring-boot-maven-plugin</artifactId>
                  <version>2.2.1.RELEASE</version> <!--4.这里加上版本信息,否则会出错-->
              </plugin>
          </plugins>
      </build>
    </project>

    8.3 发送者:编写application.properties配置文件

    由于使用到了rabbitmq,同时因为后续要启动生产者(发送类)连接阿里云的远程云服务器,所以要配置下各自的端口号:

    application.properties配置文件中,进行程序的配置:

    server.port=8080
    spring.application.name=producer
    spring.rabbitmq.addresses=121.199.31.220:5672
    spring.rabbitmq.username=admin
    spring.rabbitmq.password=password
    spring.rabbitmq.virtual-host=/
    spring.rabbitmq.connection-timeout=15000

    8.4 发送者:编写Rabbitmq的配置类

    新建一个Java类,用于配置tabbitmq
    声明有哪些交换机、哪些队列;

    下面生成的对象,均放在了Spring IoC容器中进行管理:

    package com.imooc.springbootrabbitmqproducer;
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.core.TopicExchange;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    /**
     *  Rabbitmq的配置类
     */
    @Configuration
    public class TopicRabbitmqConfig {
      // 定义2个队列
      @Bean
      public Queue queue1(){
          return new Queue("queue1");     // 队列名儿,同方法名,这样spring会自动匹配
      }
    
      @Bean
      public Queue queue2(){
          return new Queue("queue2");     // 队列名儿,同方法名,这样spring会自动匹配
      }
    
      // 定义1个交换机
      @Bean
      TopicExchange exchange(){
          return new TopicExchange("bootExchange");
      }
    
      // 第一次绑定:将上述的队列1,通过自己的路由键,绑定到交换机上
      @Bean
      Binding bingExchangeMessage1(Queue queue1, TopicExchange exchange) {
          return BindingBuilder.bind(queue1).to(exchange).with("dog.red");
      }
    
      // 第二次绑定:将上述的队列2,通过自己的路由键,绑定到交换机上
      @Bean
      Binding bingExchangeMessage2(Queue queue2, TopicExchange exchange) {
          return BindingBuilder.bind(queue2).to(exchange).with("dog.#");
      }
    }

    8.5 发送者:编写发送类MsgSender

    编写发送消息的方法

    package com.imooc.springbootrabbitmqproducer;
    import org.springframework.amqp.core.AmqpTemplate;
    import javax.annotation.Resource;
    /**
     *  发送类:用于发送消息
     */
    public class MsgSender {
    
      @Resource
      private AmqpTemplate rabbitmqTemplate;          // 依赖注入
      
      public void send1(){
          String message = "This is message 1, routingKey is dog.red";
          System.out.println("发送了:" + message);
          // 发送消息
          this.rabbitmqTemplate.convertAndSend("bootExchange", "dog.red", message);
      }
    
      public void send2(){
          String message = "This is message 2, routingKey is dog.black";
          System.out.println("发送了:" + message);
          // 发送消息
          this.rabbitmqTemplate.convertAndSend("bootExchange", "dog.black", message);
      }
    }

    8.6 发送者:编写测试类

    为了发送消息,需要编写一个测试类:

    package com.imooc.springbootrabbitmqproducer;
    import org.junit.jupiter.api.Test;
    import org.springframework.boot.test.context.SpringBootTest;
    import javax.annotation.Resource;
    @SpringBootTest
    class SpringBootRabbitmqProducerApplicationTests {
    
      @Resource
      MsgSender msgSender;            // 依赖注入
      
      @Test
      public void send1(){
          msgSender.send1();
      }
      
      @Test
      public void send2(){
          msgSender.send2();
      }
    }

    以上,发送者,就已编写完成。

    接下来,开始编写接收者。

    springboot中要想使用rabbitmq来接收消息,其实是非常方便的。

    8.7 接收者:编写application.properties配置文件

    由于使用到了rabbitmq,同时因为后续要启动消费者(接受类)来连接阿里云的远程服务器,所以要配置下各自的端口号:

    application.properties配置文件中,进行程序的配置:

    server.port=8081
    spring.application.name=consumer
    spring.rabbitmq.addresses=121.199.31.220:5672
    spring.rabbitmq.username=admin
    spring.rabbitmq.password=password
    spring.rabbitmq.virtual-host=/
    spring.rabbitmq.connection-timeout=15000

    8.8 接收者:编写两个接受类

    接受类1:

    package com.imooc.springbootrabbitmqconsumer;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    /**
     * 消费者1
     */
    @Component
    @RabbitListener(queues = "queue1")      // 用注解的形式,指定队列为queue1(queue1就是生产者那边定义的队列)
    public class Receiver1 {
    
      @RabbitHandler                     // 受到消息后,会怎么处理
      public void process(String message) {
          System.out.println("Receiver1:" + message);
      }
    }

    接受类2:

    package com.imooc.springbootrabbitmqconsumer;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    /**
     * 消费者2
     */
    @Component
    @RabbitListener(queues = "queue2")      // 用注解的形式,指定队列为queue1(queue1就是生产者那边定义的队列)
    public class Receiver2 {
    
      @RabbitHandler                     // 受到消息后,会怎么处理
      public void process(String message) {
          System.out.println("Receiver2:" + message);
      }
    }

    8.9 启动

    在保证阿里云服务器的RabbitMQ成功运行的前提下:

    1.先启动发送类:

    控制台结果为 :

    发送了:This is message 1, routingKey is dog.red
    发送了:This is message 2, routingKey is dog.black

    2.再启动接收类:

    会分别监听两个队列:

    控制台结果为 :

    Receiver1:This is message 1, routingKey is dog.red
    Receiver2:This is message 1, routingKey is dog.red
    Receiver2:This is message 2, routingKey is dog.black

拓展:

我曾遇到的问题:

原因:
JDK的版本不一致导致。
解决:将JDK的版本改为1.8


借鉴:
https://blog.csdn.net/qq_42025798/article/details/113917231

综上所述,已成功的通过rabbitmq,在两个独立的项目之间,即生产者(发送类)、消费者(接受类)之间,建立连接,对消息进行发送并接收。

也实现了SpringBoot整合RabbitMQ。

不管是Java客户端,还是SpringBoot,都能很好的运用了RabbitMQ。

后续如果想更深入的了解RabbitMQ,可以参考官网如下内容:

声明:Jerry's Blog|版权所有,违者必究|如未注明,均为原创|本网站采用BY-NC-SA协议进行授权

转载:转载请注明原文链接 - 30.5 消息队列 RabbitMQ


Follow excellence, and success will chase you.