RabbitMQ

为什么需要消息中间件

​ 中间件屏蔽掉了底层操作系统的复杂性,使程序开发人员面对的是一个简单统一的开发环境,减少程序设计的复杂性,将注意力集中在自己的业务上,不必在为程序在不同的系统上移植而重复工作,大大减轻技术上的负担。中间件带给应用系统的不止是开发的便捷,开发周期的缩短,减轻系统的维 运营和管理的工作量。

使用场景

​ 当我们使用微服务架构 时,模块之间可能语言不一样,互通时就需要用到MQ技术,使之串联
TCP协议只是底层的协议 当我们的程序TCP协议满足不了的时候 我们会在TCP协议之上构建自己的协议(MQTT)

使用场景

  1. 跨系统的消息传递

  2. 高并发的流量晓锋

  3. 数据的分布和异步处理

    1. 串行执行(只有上一个执行完后才会继续下一个)所以是阻塞的

    2. 串行执行就是接力赛,并行执行是一起跑

  4. 大数据分析与传递

  5. 分布式事务

    消息的分发策略

ActiveMQ

RabbitMQ

Kafka

RocketMQ

发布订阅

轮询发布

×

公平分发

×

×

重发

×

消息拉取

×

掌握的五种模式

1.最简单那的模式
一个生产一个消费img
红色的是队列 P是生产,C是消费

2.工作模式(Work queues)
派单模式用的较多|img

3.发布订阅模式(Publish/Suscribe) img
X 是交换机

4.Routing
是在发布订阅的基础上加了路由key img

5.主题模式(Topic)
增加了可以模糊匹配的路由key img
*号代表是一个 #号代表是多个

什么是分布式系统

通俗的说 就是一个请求有服务器端的多个服务协同完成

和单体架构不同的是,单体架构是一个JVM调度线程(Tomcat线程池)分配线程Thread来处理直到释放,而分布式系统是:一个请求由多个系统协同完成,JVM和环境可能是独立的

什么是消息中间件

1.利用可靠的消息传递机制进行系统和系统之间的通讯

2.通过提供消息传递和信息的派对机制,它可以在分布式系统环境下扩展进程间的通讯

消息中间件就是负责数据的传递,存储和分发消费三个部分,但是数据的存储和分发的过程中必须遵循某种约定的规范,底层操作系统为我们提供了TCP/IP UDP协议,或者自己构建,但是自己构建必须是在这些协议基础之上构建

网络协议的三要素

1.语法。语法是用户数据与控制信息的结构与格式,以及数据出现的顺序
2.语义。语义是解释控制信息每个部分的意义,它规定了 需要发出何种控制信息, 以及完成的动作与做出什么样的响应。
3.时序。时序是对事件发生的顺序的详细说明。

为什么消息中间件不直接使用http协议

http请求是比较复杂的,包含cookie,数据的加密解密,状态码,响应码等附加功能,但是对于一个消息而言,不需要那么复杂,也没有这个必要,他其实就是消息的传递,存储和分发操作,一定要追求的是高性能,尽量简洁,快速。

大部分情况下http是短链接,在实际的交互过程中,一个请求到响应会很快结束中断,中断后就不会进行持久化,就会造成数据的丢失

Linux安装RabbitMQ

注意RabbitMQ和Erlang的对应版本

RabbitMQ version

Minimum required Erlang/OTP

Maximum supported Erlang/OTP

Notes

3.8.16

23.2

24.x

Erlang/OTP 24 support announcement Erlang 24 was released on May 12, 2021 Some community plugins and tools may be incompatible with Erlang 24

3.8.15 3.8.14 3.8.13 3.8.12 3.8.11 3.8.10 3.8.9

22.3

23.x

Erlang/OTP 23 compatibility notes Erlang 23.x is recommended Erlang 22.x dropped support for HiPE

3.8.8 3.8.7 3.8.6 3.8.5 3.8.4

21.3

23.x

Erlang/OTP 23 compatibility notes Erlang 22.x or 23.x is recommended Erlang 22.x dropped support for HiPE

#创建目录
mkdir /usr/rabbitmq
#将rpm包放进
erlang-23.2.6-1.el7.x86_64.rpm  rabbitmq-server-3.8.16-1.el7.noarch.rpm
#解压
rpm -Uvh erlang-23.2.6-1.el7.x86_64.rpm	
yum install erlang -y 
yum install -y socat
rpm -Uvh rabbitmq-server-3.8.16-1.el7.noarch.rpm
yum install  rabbitmq -y 
#开机启动
systemctl enable rabbitmq-server 

#授权账号
#新增用户
rabbitmqctl add_user admin admin 
#授权 授予超级管理员权限
rabbitmqctl set_user_tags admin administrator
#为用户添加资源权限
rabbitmqctl.bat set_permissions -p / admin ".*" ".*" ".*" 
#用户级别
administrator	登录控制台,查看所有信息,对rabbitmq管理
monitoring 		监控,登录控制台,查看所有信息
policymaker		策略定制,登录控制台,指定策略
management		普通的管理员,登录控制台

#命令总结
rabbitmqctl add_user 账号  密码
rabbitmqctl set_user_tags 账号 administrator
rabbitmqctl change_password Username Nwepassword 修改密码
rabbitmqctl delete_user Username 删除用户
rabbitmqctl list_user 查看用户列表
rabbitmqctl.bat set_permissions -p / 用户名 ".*" ".*" ".*" 为用户设置administrator角色
rabbitmqctl.bat set_permissions -p / root ".*" ".*" ".*" 

安装和授权完毕

通过服务器的IP地址和端口号访问

http://118.178.57.158:15672

image-20210606222045950

Docker安装RabbitMQ

# 创建并运行容器	
docker run -di --name myrabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 25672:25672 -p 61613:61613 -p 1881:1883 rabbitmq:management

springBoot整合rabbitMQ(Fanout模式)

案例:image-20210620154237324

1.创建springboot工程

导入依赖

 	 <!--rabbitmq stater  依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>

2.配置文件

# 服务端口
server:
  port: 8080
# rabbitmq 配置
spring:
  rabbitmq:
    password: admin		
    username: admin
    virtual-host: /
    host: 118.178.57.158
    port: 5672
# 如果是本机的MQ 那么账号密码和主机是默认的就OK

3.创建订单服务

package org.ph.springbootorderrabbitmaproducer.service;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.UUID;

/**
 * <p>
 * 订单服务
 * </p>
 *
 * @author lph
 * @since 2021 -06 -20 15:45
 */
@Service
public class OrdersService {


    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * @param userid    用户ID
     * @param productId 订单ID
     * @param num       数量
     */
    public void makeOrder(String userid, String productId, int num) {
        /**
         * 业务流程
         * 1.根据商品ID查询库存是否
         * 2.保存订单
         * 3.通过MQ来分发消息
         */
        String orderUUID = UUID.randomUUID().toString();
        System.out.println("订单生成成功:" + orderUUID);

        String exchangeName="fanout_order_exchange";
        String rountingKey="";
        //参数1. 交换机  2.路由key/queue对列名称   3.消息内容
        rabbitTemplate.convertAndSend(exchangeName,rountingKey,orderUUID);

        /**
         * 此时我们已经链接MQ, 并且创建了派发订单消息
         *  问题:
         *      交换机怎么声明成fanout模式。
         *      队列又怎么和交换机绑定关系
         *  解决:
         *      配置类
         */
    }
}

4.配置类

package org.ph.springbootorderrabbitmaproducer.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


/**
 * <p>
 *
 * </p>
 *
 * @author lph
 * @since 2021 -06 -20 15:57
 */
@Configuration
public class RabbitMqConfiguration {
    /**
     * 1.声明注册fanout模式的交换机
     *
     * @return
     */
    @Bean
    public FanoutExchange fanoutExchange() {
        //交换机名称
        return new FanoutExchange("fanout_order_exchange", true, false);
    }

    /**
     * 2.声明队列    sms.fanout.queue    email.fanout.queue  duanxin.fanout.queue
     */
    @Bean
    public Queue smsQueue() {
        return new Queue("sms.fanout.queue", true);
    }

    @Bean
    public Queue emailQueue() {
        return new Queue("email.fanout.queue", true);
    }

    @Bean
    public Queue duanxinQueue() {
        return new Queue("duanxin.fanout.queue", true);
    }

    /**
     * 3.完成绑定关系(队列和交换机完成绑定关系)
     *
     * @return
     */
    @Bean
    public Binding smsBinding() {
        //将sms队列绑定到fanout交换机上
        return BindingBuilder.bind(smsQueue()).to(fanoutExchange());
    }

    @Bean
    public Binding emailBinding() {
        //将email队列绑定到fanout交换机上
        return BindingBuilder.bind(emailQueue()).to(fanoutExchange());
    }

    @Bean
    public Binding duanxinBinding() {
        //将duanxin队列绑定到fanout交换机上
        return BindingBuilder.bind(duanxinQueue()).to(fanoutExchange());
    }
}

5.消费者

创建springboot项目

复制yml文件过去,创建消费

6.短信消费

package org.ph.springbootorderrabbitmqconsumer.service.fanout;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

/**
 * <p>
 *
 * </p>
 *
 * @author lph
 * @since 2021 -06 -20 17:31
 */
@Service
@RabbitListener(queues = "duanxin.fanout.queue")
public class FanoutDuanXinConsumer {

    @RabbitHandler
    public void reviceMessage(String message) {
        System.out.println("短信 fanout》》接收到信息:->" + message);
    }
}

7.Email消费

package org.ph.springbootorderrabbitmqconsumer.service.fanout;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

/**
 * <p>
 *
 * </p>
 *
 * @author lph
 * @since 2021 -06 -20 17:31
 */
@Service
@RabbitListener(queues = "email.fanout.queue")
public class FanoutEmailConsumer {

    @RabbitHandler
    public void reviceMessage(String message) {
        System.out.println("Email fanout》》接收到信息:->" + message);
    }
}

8.SMS消费

package org.ph.springbootorderrabbitmqconsumer.service.fanout;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

/**
 * <p>
 *
 * </p>
 *
 * @author lph
 * @since 2021 -06 -20 17:31
 */
@Service
@RabbitListener(queues = "sms.fanout.queue")
public class FanoutSmsConsumer {
    @RabbitHandler
    public void reviceMessage(String message) {
        System.out.println("SMS fanout》》接收到信息:->" + message);
    }
}

Direct模式

创建生产者

1.DirectRabbitMqConfiguration

package org.ph.direct.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


/**
 * <p>
 *
 * </p>
 *
 * @author lph
 * @since 2021 -06 -20 15:57
 */
@Configuration
public class DirectRabbitMqConfiguration {
    /**
     * 1.声明注册fanout模式的交换机
     *
     * @return
     */
    @Bean
    public DirectExchange directExchange() {
        //交换机名称
        return new DirectExchange("direct_order_exchange", true, false);
    }

    /**
     * 2.声明队列    sms.direct.queue    email.direct.queue  duanxin.direct.queue
     */
    @Bean
    public Queue smsQueue() {
        return new Queue("sms.direct.queue", true);
    }

    @Bean
    public Queue emailQueue() {
        return new Queue("email.direct.queue", true);
    }

    @Bean
    public Queue duanxinQueue() {
        return new Queue("duanxin.direct.queue", true);
    }

    /**
     * 3.完成绑定关系(队列和交换机完成绑定关系)
     *
     * @return
     */
    @Bean
    public Binding smsBinding() {
        //将sms队列绑定到fanout交换机上 比fanout多路由key
        return BindingBuilder.bind(smsQueue()).to(directExchange()).with("sms");
    }

    @Bean
    public Binding emailBinding() {
        //将email队列绑定到fanout交换机上
        return BindingBuilder.bind(emailQueue()).to(directExchange()).with("email");
    }

    @Bean
    public Binding duanxinBinding() {
        //将duanxin队列绑定到fanout交换机上
        return BindingBuilder.bind(duanxinQueue()).to(directExchange()).with("duanxin");
    }
}

2.OrdersService

package org.ph.direct.service;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.UUID;

/**
 * <p>
 * 订单服务
 * </p>
 *
 * @author lph
 * @since 2021 -06 -20 15:45
 */
@Service
public class OrdersService {


    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * @param userid    用户ID
     * @param productId 订单ID
     * @param num       数量
     */
    public void makeOrder(String userid, String productId, int num) {
        /**
         * 业务流程
         * 1.根据商品ID查询库存是否
         * 2.保存订单
         * 3.通过MQ来分发消息
         */
        String orderUUID = UUID.randomUUID().toString();
        System.out.println("订单生成成功:" + orderUUID);

        String exchangeName="direct_order_exchange";
        //参数1. 交换机  2.路由key/queue对列名称   3.消息内容
        rabbitTemplate.convertAndSend(exchangeName,"email",orderUUID);
        rabbitTemplate.convertAndSend(exchangeName,"duanxin",orderUUID);

        /**
         * 此时我们已经链接MQ, 并且创建了派发订单消息
         *  问题:
         *      交换机怎么声明成fanout模式。
         *      队列又怎么和交换机绑定关系
         *  解决:
         *      配置类
         */
    }
}

创建消费者

1.EmailConsumer

package org.ph.directconsumer.service.direct;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

/**
 * <p>
 *
 * </p>
 *
 * @author lph
 * @since 2021 -06 -20 17:31
 */
@Service
@RabbitListener(queues = "email.direct.queue")
public class DirectEmailConsumer {

    @RabbitHandler
    public void reviceMessage(String message) {
        System.out.println("Email direct》》接收到信息:->" + message);
    }
}

2.SMSConsumer

package org.ph.directconsumer.service.direct;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

/**
 * <p>
 *
 * </p>
 *
 * @author lph
 * @since 2021 -06 -20 17:31
 */
@Service
@RabbitListener(queues = "sms.direct.queue")
public class DirectSmsConsumer {
    @RabbitHandler
    public void reviceMessage(String message) {
        System.out.println("SMS direct》》接收到信息:->" + message);
    }
}

3.duanxinConumer

package org.ph.directconsumer.service.direct;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

/**
 * <p>
 *
 * </p>
 *
 * @author lph
 * @since 2021 -06 -20 17:31
 */
@Service
@RabbitListener(queues = "duanxin.direct.queue")
public class DirectDuanXinConsumer {

    @RabbitHandler
    public void reviceMessage(String message) {
        System.out.println("短信 direct》》接收到信息:->" + message);
    }
}

Direct会比Fanout多key

所以在绑定交换机的时候需要指定key

@Bean
public Binding duanxinBinding() {
//将duanxin队列绑定到fanout交换机上
return BindingBuilder.bind(duanxinQueue()).to(directExchange()).with(“duanxin”);
}