MQ,KAFKA,消息队列

消息队列

什么是消息:A要通知B,发送的东西叫做消息。

什么是队列:先进先出,顺序。

存放消息的队列。

为什么会产生消息队列思想呢?是为了解决什么问题产生的。

比如淘宝服务

如果哪个环节经常导致网络波荡出错误就会报错,导致用户下单失败。

 假设网络传输是100ms,理想情况下。访问的服务越多耗时越长。

传统串行化服务的缺点:

      1.  耦合性强(系统各个模块之间的耦合性)

      2.  系统吞吐量不大,耗时多

优点:系统架构简单,排查错误比较方便

看一下消息队列的形式

 客户端往消息队列放消息,放成功之后就返回成功了。

 优点:解耦,提升性能

缺点:分布式事务解决方案

事务

只要涉及到消息队列,肯定要处理事务问题。

事务:原子性。我所有的动作,要么不执行,要么不全部执行成功。

比如说:张三给我转账100元。可以拆分为两个动作:张三账户减100,我的账户加100。

优秀的程序架构设计需要遵循的守则:低耦合,高内聚。

我们写的这四个服务并不是顺序执行的,是随机执行的。

吞吐量是:系统发送请求给服务器,到服务器返回客户端这是一次吞吐量。

架构设计:

从功能块的访问量来选择。消息队列,redis,zookeeper,锁,jvm,Spring,servlet

cas:就是操作系统里的cas,并发编程就是操作系统里的线程。

操作系统,计算机网络,数据结构,计算机组成原理。

 

 

什么是MQ

(详细讲解查看此博客:

什么是MQ?_清醒温柔的博客-CSDN博客

Message Query(MQ),消息队列中间件,很多初学者认为,MQ通过消息的发送和接受来实现程序的异步和解耦,mq主要用于异步操作,这个不是mq的真正目的,只不过是mq的应用,mq真正的目的是为了通讯。他屏蔽了复杂的通讯协议,像常用的dubbo,http协议都是同步的。

这两种协议很难实现 双端通讯,A调用B,B也可以主动调用A,而且不支持长连接。mq做的就是在这些协议上构建一个简单协议——生产者、消费者模型,mq带给我们的不是底层的通讯协议,而是更高层次的通讯模型。他定义了两个对象:发送数据的叫做生产者,接受消息的叫做消费者,我们可以无视底层的通讯协议,我们可以自己定义生产者消费者。

MQ的两种流派

1.有broker的

broker是什么,可以理解为是一个中转站。生产者将消息发送给他就结束自己的任务了,broker将消息主动推送给消费者(具体的将消息推送到哪个队列,或者说消费者主动请求)

重topic

必须要有topic

kafka:全球消息处理性能最快的一款mq

rocket:阿里内部的一个大神根据kafka的执行原理手写的,性能与kafka差不多,但是功能上比kafka要多,比如说顺序消费。

轻topic

可以没有topic,topic只是一种中转模式

rabbitmq

2.无broker的

zeromq:没有使用broker,是直接使用socket进行通信。

穿插知识点

并行和并发的区别

事务简单介绍

消息队列和普通串行程序相比有哪些优缺点

一、kafka介绍(详解如下链接博客)

https://blog.csdn.net/weixin_64881460/article/details/123974331

1.kafka简单介绍

kafka是一款分布式、支持分区的、多副本,基于zookeeper协调的分布式消息系统。最大的特性就是可以实时处理大量数据来满足需求。

2.kafka使用场景

1,日志收集:可以用kafka收集各种服务的日志 ,通过已统一接口的形式开放给各种消费者。

2,消息系统:解耦生产和消费者,缓存消息。

3,用户活动追踪:kafka可以记录webapp或app用户的各种活动,如浏览网页,点击等活动,这些活动可以发送到kafka,然后订阅者通过订阅这些消息来做监控。

4,运营指标:可以用于监控各种数据。

3.kafka基本概念

kafka是一个分布式的分区的消息,提供消息系统应该具备的功能。

名称解释
broker消息中间件处理节点,一个broker就是一个kafka节点,多个broker构成一个kafka集群。
topickafka根据消息进行分类,发布到kafka的每个消息都有一个对应的topic
producer消息生产(发布)者
consumer消息消费(订阅)者
consumergroup消息订阅集群,一个消息可以被多个consumergroup消费,但是一个consumergroup只有一个consumer可以消费消息。

4.kafka的安装

#下载安装包并解压
tar -xzvf  
#修改配置文件
#默认端口号
#修改日志位置
#zk地址
#启动
./kafka-server-start.sh -daemon  ../config/server.properties

5.java实现消息的生产和消费

引入maven依赖

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.1.0</version>
        </dependency>
    </dependencies>

生产者代码

import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
​
/**
 * @author zhencong
 * @title: Kafkapro
 * @projectName kafkapro
 * @description: TODO
 * @date 2022/3/24下午 02:55
 */
public class Kafkapro {
    public static void main(String[] s){
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "192.168.129.129:9092");
        properties.put("acks", "all");
        properties.put("retries", 0);
        properties.put("batch.size", 16384);
        properties.put("linger.ms", 1);
        properties.put("buffer.memory", 33554432);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        Producer<String, String> producer = null;
        try {
            producer = new KafkaProducer<>(properties);
            for (int i = 0; i < 100; i++) {
                String msg = "Message " + i;
                producer.send(new ProducerRecord<>("test", msg));
                System.out.println("Sent:" + msg);
            }
            producer.send(new ProducerRecord<>("test", "msg"));
            System.out.println("Sent:" + "msg");
        } catch (Exception e) {
            e.printStackTrace();
​
        } finally {
            producer.close();
        }
​
    }
}
​

消息生产成功

消费者代码

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
​
import java.util.Arrays;
import java.util.Properties;
/**
 * @author zhencong
 * @title: Kafkacus
 * @projectName kafkacus
 * @description: TODO
 * @date 2022/3/24下午 03:07
 */
public class Kafkacus {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "192.168.129.129:9092");
        properties.put("group.id", "group-1");
        properties.put("enable.auto.commit", "true");
        properties.put("auto.commit.interval.ms", "1000");
        properties.put("auto.offset.reset", "earliest");
        properties.put("session.timeout.ms", "30000");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
​
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
        kafkaConsumer.subscribe(Arrays.asList("test"));
        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(10000);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, value = %s", record.offset(), record.value());
                System.out.println();
            }
        }
​
    }
​
}
​消息消费成功

目录

什么是MQ

MQ的两种流派

1.有broker的

2.无broker的

穿插知识点

一、kafka介绍

1.kafka简单介绍

2.kafka使用场景

3.kafka基本概念

4.kafka的安装

5.java实现消息的生产和消费



版权声明:本文为weixin_64881460原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。