OpenShift 4 之AMQ Streams(1) - 多个Consumer从Partition接收数据

OpenShift 4.x HOL教程汇总
说明:本文已经在OpenShift 4.10 环境中验证

AMQ Streams 是什么 ?

Red Hat AMQ Stream 是红帽基于社区版 Kafka 提供的软件订阅。它提供了所有 Kafka 的功能,同时又可以和红帽其他软件能更好的集成使用。在 OpenShift 中我们使用 AMQ Stream Operator 来构建并维护 AMQ Stream 的容器化运行环境。

安装 AMQ Streams 环境

安装 AMQ Streams Operator

  1. 创建 kafka 项目
$ oc new-project kafka
  1. 使用缺省配制在 kafka 项目中安装 Red Hat Integration - AMQ Streams Operator,成功后可在 Installed Operators 中看到 Red Hat Integration - AMQ Streams ,并且还可执行命令查看 pod 和 api-resources 资源状态。
$ oc get pod -n kafka
NAME                                                   READY   STATUS    RESTARTS   AGE
amq-streams-cluster-operator-v1.4.0-59c7778c88-7bvzx   1/1     Running   0          22s
 
$ oc api-resources --api-group='kafka.strimzi.io'
NAME                 SHORTNAMES   APIGROUP           NAMESPACED   KIND
kafkabridges         kb           kafka.strimzi.io   true         KafkaBridge
kafkaconnectors      kctr         kafka.strimzi.io   true         KafkaConnector
kafkaconnects        kc           kafka.strimzi.io   true         KafkaConnect
kafkaconnects2is     kcs2i        kafka.strimzi.io   true         KafkaConnectS2I
kafkamirrormaker2s   kmm2         kafka.strimzi.io   true         KafkaMirrorMaker2
kafkamirrormakers    kmm          kafka.strimzi.io   true         KafkaMirrorMaker
kafkas               k            kafka.strimzi.io   true         Kafka
kafkatopics          kt           kafka.strimzi.io   true         KafkaTopic
kafkausers           ku           kafka.strimzi.io   true         KafkaUser

创建 Kafka 集群

  1. 在安装好的 Red Hat Integration - AMQ Streams Operator 中创建 Kafka 实例。使用缺省配置创建 Kafka 集群实例,其中含 3 副本 Kafka 实例和 3 副本 Zookeeper 实例。
    在这里插入图片描述
  2. 然后在 “开发者” 视图的 “拓扑” 页面中查看 Kafka 资源的部署状态。
    在这里插入图片描述
  3. 执行命令查看 Kafka 实例的资源状态。
$ oc get kafka  -n kafka
NAME         DESIRED KAFKA REPLICAS   DESIRED ZK REPLICAS   READY   WARNINGS
my-cluster   3                        3                     True
 
$ oc get pod -n kafka
NAME                                                   READY   STATUS    RESTARTS   AGE
amq-streams-cluster-operator-v1.4.0-59c7778c88-7bvzx   1/1     Running   0          23m
my-cluster-entity-operator-c4cfc5695-zm5m7             3/3     Running   0          2s
my-cluster-kafka-0                                     1/2     Running   0          61s
my-cluster-kafka-1                                     1/2     Running   0          61s
my-cluster-kafka-2                                     1/2     Running   1          61s
my-cluster-zookeeper-0                                 2/2     Running   0          94s
my-cluster-zookeeper-1                                 2/2     Running   0          94s
my-cluster-zookeeper-2                                 2/2     Running   0          94s

运行Topic应用:Hello World

创建Topic

  1. 在安装好的 Red Hat Integration - AMQ Streams Operator 中创建 KafkaTopic实例。
    在这里插入图片描述
  2. 执行命令查看 my-topic 的状态。
$ oc get kafkatopic my-topic -n kafka
NAME       CLUSTER      PARTITIONS   REPLICATION FACTOR   READY
my-topic   my-cluster   10           3                    True

发送接收消息

  1. 根据以下内容在 kafka 项目中创建发送消息的应用 hello-world-producer。
apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    app: hello-world-producer
  name: hello-world-producer
spec:
  replicas: 1
  selector:
    matchLabels:
      app: hello-world-producer
  template:
    metadata:
      labels:
        app: hello-world-producer
    spec:
      containers:
      - name: hello-world-producer
        image: strimzici/hello-world-producer:latest
        env:
          - name: BOOTSTRAP_SERVERS
            value: my-cluster-kafka-bootstrap:9092
          - name: TOPIC
            value: my-topic
          - name: DELAY_MS
            value: "5000"
          - name: LOG_LEVEL
            value: "INFO"
          - name: MESSAGE_COUNT
            value: "5000"
  1. 执行命令查看 Pod 的运行情况。
$ oc get pod -l app=hello-world-producer -n kafka
NAME                                   READY   STATUS    RESTARTS   AGE
hello-world-producer-f85d9f755-l77sz   1/1     Running   0          50s
  1. 在OpenShift 控制台进入名为 hello-world-producer-f85d9f755-l77sz 的 Pod,然后查看 Logs。
    在这里插入图片描述

  2. 根据以下内容在 kafka 项目中创建发送消息的应用 hello-world-consumer。

apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    app: hello-world-consumer
  name: hello-world-consumer
spec:
  replicas: 1
  selector:
    matchLabels:
      app: hello-world-consumer
  template:
    metadata:
      labels:
        app: hello-world-consumer
    spec:
      containers:
      - name: hello-world-consumer
        image: strimzici/hello-world-consumer:latest
        env:
          - name: BOOTSTRAP_SERVERS
            value: my-cluster-kafka-bootstrap:9092
          - name: TOPIC
            value: my-topic
          - name: GROUP_ID
            value: my-group
          - name: LOG_LEVEL
            value: "INFO"
          - name: MESSAGE_COUNT
            value: "5000"
  1. 执行命令查看 Pod 运行情况。
$ oc get pod -l app=hello-world-consumer -n kafka
NAME                                    READY   STATUS    RESTARTS   AGE
hello-world-consumer-6f9766f94c-l7wcp   1/1     Running   0          60s
  1. 查看名为 hello-world-consumer-6f9766f94c-l7wcp 的 Pod 的输出日志。可以确认测试唯一一个kafka-consumer可以从编号0/1/2的三个partition接收数据。
    在这里插入图片描述
  2. 执行命令,将运行 hello-world-consumer 的 Pod 数增加到2 个。
$ oc scale deployment hello-world-consumer --replicas=2 -n kafka
 
$ oc get pod -l app=hello-world-consumer -n kafka
NAME                                    READY   STATUS    RESTARTS   AGE
hello-world-consumer-6f9766f94c-l7wcp   1/1     Running   0          4m16s
hello-world-consumer-6f9766f94c-ltdfr   1/1     Running   0          6s
  1. 在 OpenShift 控制台确认以上 2 个运行 hello-world-consumer 的 Pod 的日志。根据 partition 编号确认其中 1 个 Pod 只能从 0-4 号 partition 接收数据,另一个 Pod 能从 5-9 号 partition 接收数据。
    在这里插入图片描述
    在这里插入图片描述
  2. 执行命令,将运行 hello-world-consumer 的 Pod 再次数降为 1。然后再次从 Pod 日志确认它可从所有 partition 接收到数据。
$ oc scale deployment hello-world-consumer --replicas=1 -n kafka

参考

https://github.com/shpaz/openshift-data-workshop/tree/main/2-amq-persistent-odf


版权声明:本文为weixin_43902588原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。