阿帕奇集群 linux,apache-kafka集群部署

前提:zk

什么是kafka

百度百科

Kafka是一个分布式的、分区的、多复本的日志提交服务。它通过一种独一无二的设计提供了一个消息系统的功能。

目标是为处理实时数据提供一个统一、高通量、低等待的平台。

kafka是一个分布式流媒体平台。

安装配置

主机列表

hostname

ip

master

192.168.3.58

slave1

192.168.3.54

slave2

192.168.3.31

下载kafka

cd /data

wget http://mirrors.hust.edu.cn/apache/kafka/1.0.0/kafka_2.11-1.0.0.tgz

解压

tar axf kafka_2.11-1.0.0.tgz

创建日志存放目录

mkdir -p /data/kafka_2.11-1.0.0/logs

设置环境变量(每个节点都改)

vim /etc/profile

#kafka

export KAFKA_HOME=/data/kafka_2.11-1.0.0

export PATH=$PATH:${KAFKA_HOME}/bin

source /etc/profile

修改配置文件

cd kafka_2.11-1.0.0/config/

修改zookeeper.properties(用zookeeper的配置文件zoo.conf直接覆盖)

grep -v "^#" zookeeper.properties

dataDir=/data/zookeeper/data

dataLogDir=/data/zookeeper/logs

clientPort=2181

tickTime=2000

initLimit=10

syncLimit=5

server.1=master:2887:3887

server.2=slave1:2887:3887

server.3=slave2:2887:3887

vim server.properties

#ID号,集群中唯一,一般使用本机ip最后一位

broker.id=58

#"允许删除topic,默认为false"

delete.topic.enable=true

#"服务器监控地址,如果没有设置会通过java.net.InetAddress.getCanonicalHostName()获取,端口默认为9092"

listeners=PLAINTEXT://master:9092

#“网络连接进程数,适量上调”

num.network.threads=3

#"io连接数,适量上调 "

num.io.threads=8

#"socket发送缓存大小 "

socket.send.buffer.bytes=102400

#"socket接受缓存大小"

socket.receive.buffer.bytes=102400

#"socket应答缓存大小 "

socket.request.max.bytes=104857600

#"日志文件存放路径"

log.dirs=/data/kafka_2.11-1.0.0/logs

#"复制数量,允许更大的并行消费,不能大于节点数据"

num.partitions=2

#“每个目录线程数”

num.recovery.threads.per.data.dir=1

#"topic元数据复制数,建议大于1"

offsets.topic.replication.factor=3

transaction.state.log.replication.factor=3

transaction.state.log.min.isr=3

#"清理日志方法"

log.cleanup.policy = delete

#"日志保留时长"

log.retention.hours=30

#"日志最大值"

log.segment.bytes=1073741824

#“检查日志时间”

log.retention.check.interval.ms=300000

#"zookeeper连接"

zookeeper.connect=master:2181,slave1:2181,slave2:2181

#"zookeeper连接超时"

zookeeper.connection.timeout.ms=6000

#"同步延迟"

group.initial.rebalance.delay.ms=0

将文件scp到其他两个机器上

scp -r /data/kafka_2.11-1.0.0 slave1:/data

scp -r /data/kafka_2.11-1.0.0 slave2:/data

slave1和slave2修改 server.properties

修改其中的

broker.id

listeners

如slave1

broker.id=54

listeners=PLAINTEXT://slave1:9092

启动kafka(每个节点)(会占用窗口)

kafka-server-start.sh /data/kafka_2.11-1.0.0/config/server.properties

后台启动kafka(生产环境使用)

nohup kafka-server-start.sh /data/kafka_2.11-1.0.0/config/server.properties 2&> /data/kafka_2.11-1.0.0/logs/kafka.log &

关闭kafka

kafka-server-stop.sh

创建topic

kafka-topics.sh --create --zookeeper master:2181,slave1:2181,slave2:2181 --partitions 3 --replication-factor 3 --topic test

查看topic

kafka-topics.sh --list --zookeeper master:2181,slave1:2181,slave2:2181

测试

创建producer

kafka-console-producer.sh --broker-list master:9092,slave1:9092,slave2:9092 --topic producerest

另外一个机器上创建consumer

kafka-console-consumer.sh --zookeeper master:2181,slave1:2181,slave2:2181 --topic producerest --from-beginning

producer产生消费,consumer接收消费

34039ad549db02ec8011cbb938f9be6b.png

429da83e704be398d14c16a88ebc4719.png

使用ctrl+c退出

参考