RabbitMQ
RabbitMQ
为什么需要消息中间件
中间件屏蔽掉了底层操作系统的复杂性,使程序开发人员面对的是一个简单统一的开发环境,减少程序设计的复杂性,将注意力集中在自己的业务上,不必在为程序在不同的系统上移植而重复工作,大大减轻技术上的负担。中间件带给应用系统的不止是开发的便捷,开发周期的缩短,减轻系统的维 运营和管理的工作量。
使用场景
当我们使用微服务架构 时,模块之间可能语言不一样,互通时就需要用到MQ技术,使之串联
TCP协议只是底层的协议 当我们的程序TCP协议满足不了的时候 我们会在TCP协议之上构建自己的协议(MQTT)
跨系统的消息传递
高并发的流量晓锋
数据的分布和异步处理
串行执行(只有上一个执行完后才会继续下一个)所以是阻塞的
串行执行就是接力赛,并行执行是一起跑
大数据分析与传递
分布式事务
消息的分发策略
ActiveMQ | RabbitMQ | Kafka | RocketMQ | |
---|---|---|---|---|
发布订阅 | √ | √ | √ | √ |
轮询发布 | √ | √ | √ | × |
公平分发 | × | √ | √ | × |
重发 | √ | √ | × | √ |
消息拉取 | × | √ | √ | √ |
掌握的五种模式
1.最简单那的模式
一个生产一个消费
红色的是队列 P是生产,C是消费
2.工作模式(Work queues)
派单模式用的较多|
3.发布订阅模式(Publish/Suscribe)
X 是交换机
4.Routing
是在发布订阅的基础上加了路由key
5.主题模式(Topic)
增加了可以模糊匹配的路由key
*号代表是一个 #号代表是多个
什么是分布式系统
通俗的说 就是一个请求有服务器端的多个服务协同完成
和单体架构不同的是,单体架构是一个JVM调度线程(Tomcat线程池)分配线程Thread来处理直到释放,而分布式系统是:一个请求由多个系统协同完成,JVM和环境可能是独立的
什么是消息中间件
1.利用可靠的消息传递机制进行系统和系统之间的通讯
2.通过提供消息传递和信息的派对机制,它可以在分布式系统环境下扩展进程间的通讯
消息中间件就是负责数据的传递,存储和分发消费三个部分,但是数据的存储和分发的过程中必须遵循某种约定的规范,底层操作系统为我们提供了TCP/IP UDP协议,或者自己构建,但是自己构建必须是在这些协议基础之上构建
网络协议的三要素
1.语法。语法是用户数据与控制信息的结构与格式,以及数据出现的顺序
2.语义。语义是解释控制信息每个部分的意义,它规定了 需要发出何种控制信息, 以及完成的动作与做出什么样的响应。
3.时序。时序是对事件发生的顺序的详细说明。
为什么消息中间件不直接使用http协议
http请求是比较复杂的,包含cookie,数据的加密解密,状态码,响应码等附加功能,但是对于一个消息而言,不需要那么复杂,也没有这个必要,他其实就是消息的传递,存储和分发操作,一定要追求的是高性能,尽量简洁,快速。
大部分情况下http是短链接,在实际的交互过程中,一个请求到响应会很快结束中断,中断后就不会进行持久化,就会造成数据的丢失
Linux安装RabbitMQ
注意RabbitMQ和Erlang的对应版本
Minimum required Erlang/OTP | Maximum supported Erlang/OTP | Notes | |
---|---|---|---|
3.8.16 | 23.2 | 24.x | Erlang/OTP 24 support announcement Erlang 24 was released on May 12, 2021 Some community plugins and tools may be incompatible with Erlang 24 |
3.8.15 3.8.14 3.8.13 3.8.12 3.8.11 3.8.10 3.8.9 | 22.3 | 23.x | Erlang/OTP 23 compatibility notes Erlang 23.x is recommended Erlang 22.x dropped support for HiPE |
3.8.8 3.8.7 3.8.6 3.8.5 3.8.4 | 21.3 | 23.x | Erlang/OTP 23 compatibility notes Erlang 22.x or 23.x is recommended Erlang 22.x dropped support for HiPE |
#创建目录
mkdir /usr/rabbitmq
#将rpm包放进
erlang-23.2.6-1.el7.x86_64.rpm rabbitmq-server-3.8.16-1.el7.noarch.rpm
#解压
rpm -Uvh erlang-23.2.6-1.el7.x86_64.rpm
yum install erlang -y
yum install -y socat
rpm -Uvh rabbitmq-server-3.8.16-1.el7.noarch.rpm
yum install rabbitmq -y
#开机启动
systemctl enable rabbitmq-server
#授权账号
#新增用户
rabbitmqctl add_user admin admin
#授权 授予超级管理员权限
rabbitmqctl set_user_tags admin administrator
#为用户添加资源权限
rabbitmqctl.bat set_permissions -p / admin ".*" ".*" ".*"
#用户级别
administrator 登录控制台,查看所有信息,对rabbitmq管理
monitoring 监控,登录控制台,查看所有信息
policymaker 策略定制,登录控制台,指定策略
management 普通的管理员,登录控制台
#命令总结
rabbitmqctl add_user 账号 密码
rabbitmqctl set_user_tags 账号 administrator
rabbitmqctl change_password Username Nwepassword 修改密码
rabbitmqctl delete_user Username 删除用户
rabbitmqctl list_user 查看用户列表
rabbitmqctl.bat set_permissions -p / 用户名 ".*" ".*" ".*" 为用户设置administrator角色
rabbitmqctl.bat set_permissions -p / root ".*" ".*" ".*"
安装和授权完毕
通过服务器的IP地址和端口号访问
Docker安装RabbitMQ
# 创建并运行容器
docker run -di --name myrabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 25672:25672 -p 61613:61613 -p 1881:1883 rabbitmq:management
springBoot整合rabbitMQ(Fanout模式)
案例:
1.创建springboot工程
导入依赖
<!--rabbitmq stater 依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
2.配置文件
# 服务端口
server:
port: 8080
# rabbitmq 配置
spring:
rabbitmq:
password: admin
username: admin
virtual-host: /
host: 118.178.57.158
port: 5672
# 如果是本机的MQ 那么账号密码和主机是默认的就OK
3.创建订单服务
package org.ph.springbootorderrabbitmaproducer.service;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.UUID;
/**
* <p>
* 订单服务
* </p>
*
* @author lph
* @since 2021 -06 -20 15:45
*/
@Service
public class OrdersService {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* @param userid 用户ID
* @param productId 订单ID
* @param num 数量
*/
public void makeOrder(String userid, String productId, int num) {
/**
* 业务流程
* 1.根据商品ID查询库存是否
* 2.保存订单
* 3.通过MQ来分发消息
*/
String orderUUID = UUID.randomUUID().toString();
System.out.println("订单生成成功:" + orderUUID);
String exchangeName="fanout_order_exchange";
String rountingKey="";
//参数1. 交换机 2.路由key/queue对列名称 3.消息内容
rabbitTemplate.convertAndSend(exchangeName,rountingKey,orderUUID);
/**
* 此时我们已经链接MQ, 并且创建了派发订单消息
* 问题:
* 交换机怎么声明成fanout模式。
* 队列又怎么和交换机绑定关系
* 解决:
* 配置类
*/
}
}
4.配置类
package org.ph.springbootorderrabbitmaproducer.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* <p>
*
* </p>
*
* @author lph
* @since 2021 -06 -20 15:57
*/
@Configuration
public class RabbitMqConfiguration {
/**
* 1.声明注册fanout模式的交换机
*
* @return
*/
@Bean
public FanoutExchange fanoutExchange() {
//交换机名称
return new FanoutExchange("fanout_order_exchange", true, false);
}
/**
* 2.声明队列 sms.fanout.queue email.fanout.queue duanxin.fanout.queue
*/
@Bean
public Queue smsQueue() {
return new Queue("sms.fanout.queue", true);
}
@Bean
public Queue emailQueue() {
return new Queue("email.fanout.queue", true);
}
@Bean
public Queue duanxinQueue() {
return new Queue("duanxin.fanout.queue", true);
}
/**
* 3.完成绑定关系(队列和交换机完成绑定关系)
*
* @return
*/
@Bean
public Binding smsBinding() {
//将sms队列绑定到fanout交换机上
return BindingBuilder.bind(smsQueue()).to(fanoutExchange());
}
@Bean
public Binding emailBinding() {
//将email队列绑定到fanout交换机上
return BindingBuilder.bind(emailQueue()).to(fanoutExchange());
}
@Bean
public Binding duanxinBinding() {
//将duanxin队列绑定到fanout交换机上
return BindingBuilder.bind(duanxinQueue()).to(fanoutExchange());
}
}
5.消费者
创建springboot项目
复制yml文件过去,创建消费
6.短信消费
package org.ph.springbootorderrabbitmqconsumer.service.fanout;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
/**
* <p>
*
* </p>
*
* @author lph
* @since 2021 -06 -20 17:31
*/
@Service
@RabbitListener(queues = "duanxin.fanout.queue")
public class FanoutDuanXinConsumer {
@RabbitHandler
public void reviceMessage(String message) {
System.out.println("短信 fanout》》接收到信息:->" + message);
}
}
7.Email消费
package org.ph.springbootorderrabbitmqconsumer.service.fanout;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
/**
* <p>
*
* </p>
*
* @author lph
* @since 2021 -06 -20 17:31
*/
@Service
@RabbitListener(queues = "email.fanout.queue")
public class FanoutEmailConsumer {
@RabbitHandler
public void reviceMessage(String message) {
System.out.println("Email fanout》》接收到信息:->" + message);
}
}
8.SMS消费
package org.ph.springbootorderrabbitmqconsumer.service.fanout;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
/**
* <p>
*
* </p>
*
* @author lph
* @since 2021 -06 -20 17:31
*/
@Service
@RabbitListener(queues = "sms.fanout.queue")
public class FanoutSmsConsumer {
@RabbitHandler
public void reviceMessage(String message) {
System.out.println("SMS fanout》》接收到信息:->" + message);
}
}
Direct模式
创建生产者
1.DirectRabbitMqConfiguration
package org.ph.direct.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* <p>
*
* </p>
*
* @author lph
* @since 2021 -06 -20 15:57
*/
@Configuration
public class DirectRabbitMqConfiguration {
/**
* 1.声明注册fanout模式的交换机
*
* @return
*/
@Bean
public DirectExchange directExchange() {
//交换机名称
return new DirectExchange("direct_order_exchange", true, false);
}
/**
* 2.声明队列 sms.direct.queue email.direct.queue duanxin.direct.queue
*/
@Bean
public Queue smsQueue() {
return new Queue("sms.direct.queue", true);
}
@Bean
public Queue emailQueue() {
return new Queue("email.direct.queue", true);
}
@Bean
public Queue duanxinQueue() {
return new Queue("duanxin.direct.queue", true);
}
/**
* 3.完成绑定关系(队列和交换机完成绑定关系)
*
* @return
*/
@Bean
public Binding smsBinding() {
//将sms队列绑定到fanout交换机上 比fanout多路由key
return BindingBuilder.bind(smsQueue()).to(directExchange()).with("sms");
}
@Bean
public Binding emailBinding() {
//将email队列绑定到fanout交换机上
return BindingBuilder.bind(emailQueue()).to(directExchange()).with("email");
}
@Bean
public Binding duanxinBinding() {
//将duanxin队列绑定到fanout交换机上
return BindingBuilder.bind(duanxinQueue()).to(directExchange()).with("duanxin");
}
}
2.OrdersService
package org.ph.direct.service;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.UUID;
/**
* <p>
* 订单服务
* </p>
*
* @author lph
* @since 2021 -06 -20 15:45
*/
@Service
public class OrdersService {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* @param userid 用户ID
* @param productId 订单ID
* @param num 数量
*/
public void makeOrder(String userid, String productId, int num) {
/**
* 业务流程
* 1.根据商品ID查询库存是否
* 2.保存订单
* 3.通过MQ来分发消息
*/
String orderUUID = UUID.randomUUID().toString();
System.out.println("订单生成成功:" + orderUUID);
String exchangeName="direct_order_exchange";
//参数1. 交换机 2.路由key/queue对列名称 3.消息内容
rabbitTemplate.convertAndSend(exchangeName,"email",orderUUID);
rabbitTemplate.convertAndSend(exchangeName,"duanxin",orderUUID);
/**
* 此时我们已经链接MQ, 并且创建了派发订单消息
* 问题:
* 交换机怎么声明成fanout模式。
* 队列又怎么和交换机绑定关系
* 解决:
* 配置类
*/
}
}
创建消费者
1.EmailConsumer
package org.ph.directconsumer.service.direct;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
/**
* <p>
*
* </p>
*
* @author lph
* @since 2021 -06 -20 17:31
*/
@Service
@RabbitListener(queues = "email.direct.queue")
public class DirectEmailConsumer {
@RabbitHandler
public void reviceMessage(String message) {
System.out.println("Email direct》》接收到信息:->" + message);
}
}
2.SMSConsumer
package org.ph.directconsumer.service.direct;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
/**
* <p>
*
* </p>
*
* @author lph
* @since 2021 -06 -20 17:31
*/
@Service
@RabbitListener(queues = "sms.direct.queue")
public class DirectSmsConsumer {
@RabbitHandler
public void reviceMessage(String message) {
System.out.println("SMS direct》》接收到信息:->" + message);
}
}
3.duanxinConumer
package org.ph.directconsumer.service.direct;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
/**
* <p>
*
* </p>
*
* @author lph
* @since 2021 -06 -20 17:31
*/
@Service
@RabbitListener(queues = "duanxin.direct.queue")
public class DirectDuanXinConsumer {
@RabbitHandler
public void reviceMessage(String message) {
System.out.println("短信 direct》》接收到信息:->" + message);
}
}
Direct会比Fanout多key
所以在绑定交换机的时候需要指定key
@Bean
public Binding duanxinBinding() {
//将duanxin队列绑定到fanout交换机上
return BindingBuilder.bind(duanxinQueue()).to(directExchange()).with(“duanxin”);
}