【消息队列】RabbitMQ五种消息模式

RabbitMQ

  • RabbitMQ
    • RabbitMQ安装
  • 常见的消息模型
    • 基本消息队列
    • SpringAMQP
    • WorkQueue
    • 消息预取
    • 发布订阅模式
      • Fanout Exchange
      • DirectExchange
      • TopicExchange
    • 消息转换器

RabbitMQ

RabbitMQ是基于Erlang语言开发的开源消息通信中间件
官网地址:https://www.rabbitmq.com/

RabbitMQ安装

我们在Centos虚拟机中使用Docker来安装

  1. 下载镜像,在线拉取
    docker pull rabbitmq
  2. 安装MQ
docker run\
--env RABBITMQ_DEFAULT_USER=itcast \  # 设置环境变量用户名
--env RABBITMQ_DEFAULT_PASS= \  # 设置环境变量密码
--name mq \   # 队列名称
--hostname mq1 \  #配置主机名
-p 15672:15672 \  # MQ管理端口
-p 5672:5672 \   #MQ消息传输端口
-d \   # 后台运行
rabbitmq

在这里插入图片描述
在这里插入图片描述

交换机的创建与消息的发送由虚拟主机来完成,每个用户的虚拟主机是相互隔离的

在RabbitMQ中:
channel:操作MQ的工具
exchange:路由消息到队列中
queue:缓存消息
virtual host:虚拟主机,是对queue,exchange等资源的逻辑分组

常见的消息模型

  1. 基本消息队列
  2. 工作消息队列

这两种并没有用到交换机,而是直接到达队列

  1. 发布订阅(Publish,Subscribe),根据交换机类型不同分为三种:
    Fanout Exchange:广播
    Direct Exchange:路由
    Topic Exchange:主题

基本消息队列

publisher:消息发布者,将消息发送到队列queue
queue:消息队列,负责接收并缓存消息
consumer:订阅队列,处理队列中的消息

java模型(消息发布者)

@Test
public void test() throws IOException,TimeoutException{
    //1.建立连接,与消息队列进行连接
    ConnetionFactory factory =new ConnetionFactory();
    //设置连接参数,主机名,端口号,vhost,用户名,密码
    factory.setHost(192.168.75.136);
    factory.setPort(5672);
    factory.setVirtualHost("/");
    factory.setUsername("itcast");
    factory.setPassword("");
    //建立连接
    Connection connection =factory.newConnection();
    //创建通道Channel,就可以向队列发送消息了
    Channel channel =connection.createChannel();
    //创建队列
    String queuename="hlh";
    channel.queueDeclare(queuename,false,false,false,null);
    //发送消息
    String message="hello";
    channel.basicPublish("",queuename,null,message.getBytes());
    //关闭通道和连接
    channel.close();
    connection.close();
}

java模型(消息消费者)

    //1.建立连接,与消息队列进行连接
    ConnetionFactory factory =new ConnetionFactory();
    //设置连接参数,主机名,端口号,vhost,用户名,密码
    factory.setHost(192.168.75.136);
    factory.setPort(5672);
    factory.setVirtualHost("/");
    factory.setUsername("itcast");
    factory.setPassword("");
    //建立连接
    Connection connection =factory.newConnection();
    //创建通道Channel,就可以向队列发送消息了
    Channel channel =connection.createChannel();
    //创建队列
    String queuename="hlh";
    channel.queueDeclare(queuename,false,false,false,null);
    //订阅消息
    channel.basicConsume(queuename,true,new DefaultConsumer(channel){
    @Override
    //处理消息的代码,绑定函数,有了消息才执行
    public void handleDelivery(String consumerTag,Envelope envelope,
                               AMQP.BasicProperties properties,byte[] body)throws IOException{
                  //处理消息
                  String message=new String(body);             
               }
    })

注意:上边生产者消费者都创建了队列:

这是为了防止消息队列中的队列不存在,在进行消息队列初始化的时候不知道是先建立消费者,还是先建立生产者,所以都执行创建函数,但是创建的队列只有一个不会重复

SpringAMQP

  • AMQP

是用于在应用程序或之间传递业务消息的开放标准,该协议与语言和平台无关,更符合微服务中的独立性的要求

  • Spring AMQP

Spring AMQP是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息,包含两部分,其中Spring-amqp是基础抽象,spring-rabbit是底层的默认实现

  • 特征:
  1. 监听器容器,用于异步处理入站消息
  2. 用于发送和接收消息的RabbitTemplate
  3. Rabbitadmin用于自动声明队列,交换和绑定
  • 使用:
  1. 引入spring-amqp的依赖
    在这里插入图片描述
    在yml中配置mq连接信息:
spring: 
   rabbitmq:
     host: 192.168.75.136 #主机名
     port: 5672 #端口
     virtual-host: / #虚拟主机
     username: itcast #用户名
     password:   #密码
  1. 在生产者服务中利用RabbitTemplate发送消息到hlh.queue这个队列
public class springamqptest{
   @Autowired
   private RabbitTemplate rabbittemplate;
   @Test
   public void test(){
     String queuename="hlh.queue";
     String message="hello";
     rabbittemplate.convertAndSend(queuename,message);
   }
}
  1. 在消费者服务端编写消费逻辑,绑定到hlh.queue这个队列中
@Component
public class SpringrabbitListener {
   @RabbitListener(queues="hlh.queue")
   public void listenSimple(String msg) throws InterruptedException{
     //消费逻辑代码
   }
}

注意:消息一旦消费就会从队列中删除,rabbitmq没有消息回溯功能

WorkQueue

Work queue,工作队列。可以提高消息处理速度,避免队列消息堆积

一个消息队列绑定多个消费者

假设现在生产者每秒循环发送50条消息,此时的消费者怎么处理:

@Component
public class SpringrabbitListener {
   @RabbitListener(queues="hlh.queue")
   public void listenSimple(String msg) throws InterruptedException{
     //消费逻辑代码
   }
      @RabbitListener(queues="hlh.queue")
   public void listenSimple2(String msg) throws InterruptedException{
     //消费逻辑代码
   }
}

通过定义多个消费者进行消费,追上生产者生产的速度,同一个消息只能被一个消费者消费,一旦消费完就会在队列中删除

消息预取

指的每个消费者每次取多少条消息:
可以通过配置进行配置:

spring:
   rabbitmq:
      host: 192.168.75.136
      port: 5672
      virtual-host: /
      username: itcast
      password: 
      listener:
         simple:
           prefecth: 1 #每次只能获取一条消息,处理完才能获得下一个消息

发布订阅模式

发布订阅可以使得同一个消息发送给多个消费者,实现方式是加入了exchange(交换机)
在这里插入图片描述

注意:exchange负责消息路由,而不是存储,路由失败则消息丢失

交换机的作用:

  1. 接收生产者的消息,将消息按照规则路由到与之绑定的队列
  2. 不能缓存消息,路由失败,消息丢失
  3. FanoutExchange的会将消息路由到每个绑定的队列

SpringAMQP提过了声明交换机,队列,绑定关系的API:
在这里插入图片描述

Fanout Exchange

Fanout Exchange 会将所有的消息路由到每一个跟其绑定的queue
在创建配置类,在配置类中进行消息队列绑定交换机

@Configuration
public class FanoutConfig{
    // 声明FanoutExchange交换机
    @Bean
    public FanoutExchange fanoutExchange(){
       return new FanoutExchange("itcast.fanout");
    }
    //声明一个队列
    @Bean
    public Queue fanoutQueue1(){
       return new Queue("fanout.queue1");
    }
    // 绑定队列跟交换机
    @Bean
    public Binding bindingQueue1(Queue fanoutQueue1,FanoutExchange fanoutExchange){
       return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }
}

此时的生产者如何发送消息:

public void test(){
  //给出交换机名称
  String exchangeName="itcast.fanout";
  String message="hello";
  //发送消息
  rabbitTemplate.convertAndSend(exchangeName,"",message);
}

监听者如何收到消息

@RabbitListener(queues="fanout.queue1")
public void listener(String msg){
   //处理得到的消息
}

DirectExchange

Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此成为路由模式(routes)

每一个Queue都与Exchange设置一个BindingKey

发布者发送消息时,指定消息的RoutingKey,Exchange将消息路由到BindingKey与消息RoutingKey一致的队列

一个队列可以指定多个Key

我们可以通过 @RabbitListener声明Exchange,Queue,RoutingKey
在消费者方法上注解

@RabbitListener(bindings=@QueueBinding(value=@Queue(name="direct.queue1"),exchange=@Exchange(name="itcast.direct",type=ExchangeTypes.DIRECT),key={"red","blue"}))
public void Listener(String msg){
   //进行消息的处理
}

在生产者生产时:

public void test(){
  //给出交换机名称
  String exchangeName="itcast.fanout";
  String message="hello";
  //发送消息
  rabbitTemplate.convertAndSend(exchangeName,"blue",message);
}

TopicExchange

TopicExchange与路由模式类似,区别在于routingKey必须是多个单词的列表,并且以.分隔
Queue与Exchange指定BindingKey时可以使用通配符:
#:代指0个或多个单词
*:代指一个单词
同样也是使用 @RabbitListener进行声明

@RabbitListener(bindings=@QueueBinding(value=@Queue(name="direct.queue1"),exchange=@Exchange(name="itcast.direct",type=ExchangeTypes.DIRECT),key="hi.#"))
public void Listener(String msg){
   //进行消息的处理
}

生产者生产消息:

public void test(){
  //给出交换机名称
  String exchangeName="itcast.fanout";
  String message="hello";
  //发送消息
  rabbitTemplate.convertAndSend(exchangeName,"hi.now",message);
}

消息转换器

在SpringAMQP的发送方法中,接收消息的类型是Object,也就是我们可以发送任意对象类型的消息,SpringAMQP会帮助我们序列化为字节后发送

Spring的对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理的,而默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化

如果要修改只需定义一个MessageConverter 类型的Bean即可,推荐使用JSON方式完成序列化

  1. 引入jackson的依赖
    在这里插入图片描述
  2. 声明MessageConverter:
@Bean
public MessageConverter jsonMessageConverter(){
   return new Jackson2JsonMessageConverter();
}

这样发送的消息就会使用自定义的转换类型

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/586020.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

java技术栈快速复习04_javaweb基础总结

javaweb概述 JDBC JDBC(Java DataBase Connectivity,Java数据库连接)是一种用于执行SQL语句的Java API,可以为多种关系数据库提供统一访问。简单说就是用Java语言来操作数据库。 jdbc原理 早期SUN公司的天才们想编写一套可以连接…

C++ ─── 内存管理

1 . C / C内存分布 我们先看下面的一段代码和相关问题 int globalVar 1;static int staticGlobalVar 1;void Test(){static int staticVar 1;int localVar 1;int num1[10] {1, 2, 3, 4};char char2[] "abcd";char* pChar3 "abcd";int* ptr1 (int…

Postgresql源码(127)投影ExecProject的表达式执行分析

无论是投影还是别的计算,表达式执行的入口和计算逻辑都是统一的,这里已投影为分析表达式执行的流程。 1 投影函数 用例 create table t1(i int primary key, j int, k int); insert into t1 select i, i % 10, i % 100 from generate_series(1,1000000…

JeeSite框架安装部署

下载JeeSite框架。 依次执行两个sql文件。 如果是mysql8.0,则create_user.sql需要改成下面的内容: -- 打开 my.ini 给 [mysqld] 增加如下配置: -- sql_modeONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREAT…

YOLOv8核心原理深度解析

YOLOv8源码地址: https://github.com/ultralytics/ultralytics 一、简介: 根据官方描述,Yolov8是一个SOTA模型,它建立在Yolo系列历史版本的基础上,并引入了新的功能和改进点,以进一步提升性能和灵活性,使其成为实现目标检测、图像分割、姿态估计等任务的最佳选择。其具体…

代码随想录——双指针与滑动窗口(四)

一.1423. 可获得的最大点数 题目详情 解题思路 这里我们每次只能取最左或最右边的卡牌,第一反应其实是使用双指针,通过局部贪心来解决,但是如果两边相等的话用局部贪心无法来判断到底取哪一边,那我们不妨换一个思路: 我们首先任…

DICOM 测试工具

一个DICOM测试工具。 引用了 fo-dicom 。fo-dicom 算是比较好用的,我的另外一个项目也是用了它。 using System; using System.Collections.Generic; using System.Data; using System.IO; using System.Linq; using System.Text; using System.Threading.Tasks; …

Go语言map

map 概念 在Go语言中,map 是一种内建的数据结构,它提供了一种关联式的存储机制,允许你以键值对的形式存储数据。每个键都是唯一的,并且与一个值相关联。你可以通过键来查找、添加、更新和删除值,这类似于其他编程语言…

Spring Boot的热部署工具“AND”Swagger测试工具

Spring Boot的热部署&Swagger测试页面的使用 热部署指的是在项目无需重启的情况下,只需要刷新页面,即可获得已经修改的样式或功能。要注意该工具一般用于开发环境,在生产环境中最好不要添加这个工具。 对于无需重启便可刷新这么方便的工…

小剧场短剧影视小程序源码_后端PHP

项目运行截图 源码贡献 https://githubs.xyz/boot?app42 部署说明 linux/win任选 PHP版本:7.3/7.2(测试时我用的7.2要安装sg扩展 ) 批量替换域名http://video.owoii.com更换为你的 批量替换域名http://120.79.77.163:1更换为你的 这两个…

交通运输智慧监管平台---强化物流安全与效率的新举措

一、建设背景 随着社会对于交通安全和环境保护的要求不断提高,对卡车运输的监管和合规性要求也逐渐加强。为了满足快速发展的物流需求,提高供应链协同和可追溯性、解决安全问题、提高运输效率和降低成本,我们利用现代技术和信息化手段着力建设…

Spark SQL编程初级实践

参考链接 Spark编程: Spark SQL基本操作 2020.11.01_df.agg("age"->"avg")-CSDN博客 RDD编程初级实践-CSDN博客 Spark和Hadoop的安装-CSDN博客 1. Spark SQL基本操作 { "id":1 , "name":" Ella" , "age":…

数字电路-5路呼叫显示和8路抢答器

本内容涉及两个电路,分别为5路呼叫显示电路和8路抢答器电路,包含Multisim仿真原文件,为掌握FPGA做个铺垫。紫色文字是超链接,点击自动跳转至相关博文。持续更新,原创不易! 目录: 一、5路呼叫显…

每日OJ题_DFS爆搜深搜回溯剪枝②_力扣526. 优美的排列

目录 力扣526. 优美的排列 解析代码 力扣526. 优美的排列 526. 优美的排列 难度 中等 假设有从 1 到 n 的 n 个整数。用这些整数构造一个数组 perm(下标从 1 开始),只要满足下述条件 之一 ,该数组就是一个 优美的排列 &#…

nginx缓存清理

背景 昨天打开我的gpt镜像网站,意外发现静态图片资源全都无法获取了 CoCo-AI 一番排查下来,发现是引用的cdn链接失效了 且cdn源是属于七牛云的,且不再维护,于是果断切换到cloudflare export function getEmojiUrl(unified: str…

JavaScript中的Object方法、Array方法、String方法

个人主页:学习前端的小z 个人专栏:JavaScript 精粹 本专栏旨在分享记录每日学习的前端知识和学习笔记的归纳总结,欢迎大家在评论区交流讨论! 文章目录 🔥Object方法🌞1 Object.is()🌞2 Object.…

区块链 | 由外部实体导致的 NFT 安全问题

🦊原文: Understanding Security Issues in the NFT Ecosystem 🦊警告: 本文只记录了原文的第 6 节。 1 问题描述 NFT 所指向的数字资产(图片、视频等)必须是可以访问的,这样 NFT 才具有意义…

iA Writer for Mac:简洁强大的写作软件

在追求高效写作的今天,iA Writer for Mac凭借其简洁而强大的功能,成为了许多作家、记者和学生的首选工具。这款专为Mac用户打造的写作软件,以其独特的设计理念和实用功能,助你轻松打造高质量的文章。 iA Writer for Mac v7.1.2中文…

数据挖掘之基于Lightgbm等多模型消融实验的信用欺诈检测实现

欢迎大家点赞、收藏、关注、评论啦 ,由于篇幅有限,只展示了部分核心代码。 文章目录 一项目简介 二、功能三、系统四. 总结 一项目简介 一、项目背景 在当前的金融环境中,信用欺诈行为日益增多,给金融机构和消费者带来了巨大的损…

ThingsBoard PE专业版解决方案技术文档——温度湿度

1、项目总览 2、设备接入 3、设备告警 3.1 高温告警 创建一个Flag作为标杆,作为开启告警的开关。 3.2 低湿度告警 创建一个Flag作为标杆,作为开启告警的开关。 4、部件仪表 4.1 Entities table 部件预览: 标题样式: {"…
最新文章