java使用kafka进行内网节点生产信息和公网节点消费消息,使用kafka-eagle监控
最近需要使用消息队列推送实时数据给第三方的公司,但是生产消息的服务节点在内网服务器,kafka也只能部署在另一台有公网ip的公司另一台服务器(不然第三方无法消费),配置信息有点特殊,于是浅显地做了个单节点的demo,我自己使用的是2.4.0的版本,下载地址https://kafka.apache.org/downloads
直接解压就行,tar -xzf kafka_2.11-2.4.0.tgz。
修改kafka配置信息
修正config下的server.properties下的配置信息,取消listeners和advertised.listeners的注释
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
advertised.listeners=INTERNAL://192.168.30.128:9092,EXTERNAL://19.129.30.119:9093
inter.broker.listener.name=INTERNAL
启动kafka,先启动zookeeper;停止的时候需要先停止zookeeper,再停掉kafka,
//启动zookeeper
nohup /soft/kafka_2.11-2.4.0/bin/zookeeper-server-start.sh /soft/kafka_2.11-2.4.0/config/zookeeper.properties > /soft/kafka_2.11-2.4.0/nohup1.out 2>&1 &
//启动kafka
nohup /soft/kafka_2.11-2.4.0/bin/kafka-server-start.sh /soft/kafka_2.11-2.4.0/config/server.properties > /soft/kafka_2.11-2.4.0/nohup2.out 2>&1 &
netstat -tunlp | grep 9092,看看有没有对应的进程号,就说明是否启动成功。
生产者,部署在内网环境
- 首先pom文件引入依赖
<!-- kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.0</version>
</dependency>
- 定义基本配置信息
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.Properties;
@Configuration
public class KafkaConfig {
public final static String servers = "192.168.30.128:9092";
@Bean(destroyMethod = "close")
public KafkaProducer<String, String> kafkaProducer() {
Properties properties = new Properties();
properties.put("bootstrap.servers", servers);
properties.put("acks", "all");
properties.put("retries", 0);
properties.put("batch.size", 16384);
properties.put("linger.ms", 1);
properties.put("buffer.memory", 33554432);
properties.put("key.serializer", StringSerializer.class.getName());
properties.put("value.serializer", StringSerializer.class.getName());
return new KafkaProducer<String, String>(properties);
}
}
- 然后在需要推送信息的地方,指定主题名去推送消息就行
@Autowired
private KafkaProducer<String, String> kafkaProducer;
kafkaProducer.send(new ProducerRecord<String, String>("topicName",message));
消费者,部署在公网环境
- 也一样需要引入依赖
<!-- kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.0</version>
</dependency>
- 定义连接配置信息,消费消息,执行主方法,打印出实时传递过来的消息
import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Arrays;
import java.util.Properties;
@Log4j2
public class KafkaConsumerTest implements Runnable {
private final KafkaConsumer<String, String> consumer;
private ConsumerRecords<String, String> msgList;
private final String topic;
private static final String GROUPID = "groupB";
public KafkaConsumerTest(String topicName) {
Properties props = new Properties();
//这里填的是kafka服务器的公网ip
props.put("bootstrap.servers", "19.129.30.119:9093");
props.put("group.id", GROUPID);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
this.consumer = new KafkaConsumer<String, String>(props);
this.topic = topicName;
this.consumer.subscribe(Arrays.asList(topic));
}
@Override
public void run() {
int messageNo = 1;
log.error("---------开始消费---------");
try {
for (;;) {
msgList = consumer.poll(1000);
if(null!=msgList&&msgList.count()>0){
for (ConsumerRecord<String, String> record : msgList) {
log.error(messageNo+"=======成功消费:receive: key = " + record.key() + ", value = " + record.value()+" offset==="+record.offset());
}
}else{
Thread.sleep(1000);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
consumer.close();
}
}
public static void main(String args[]) {
KafkaConsumerTest test = new KafkaConsumerTest("cv");
Thread thread = new Thread(test);
thread.start();
}
}
kafka-eagle简单的安装和使用
下载并解压kafka-eagle-bin-2.1.0.tar.gz,需要解压两次,window环境下就能使用
填写config下system-config.properties文件的属性,由于我是单节点启动,需要把cluster2和cluster3的那些信息注释掉,指定cluster1的zookeeper节点信息,还有指定mysql的库的连接信息就行
###################################### # multi zookeeper & kafka cluster list # Settings prefixed with 'kafka.eagle.' will be deprecated, use 'efak.' instead ###################################### efak.zk.cluster.alias=cluster1 #,cluster2 cluster1.zk.list=192.168.30.128:2181 #,tdn2:2181,tdn3:2181 #cluster2.zk.list=xdn10:2181,xdn11:2181,xdn12:2181 ###################################### # zookeeper enable acl ###################################### cluster1.zk.acl.enable=false cluster1.zk.acl.schema=digest cluster1.zk.acl.username=test cluster1.zk.acl.password=test123 ###################################### # broker size online list ###################################### cluster1.efak.broker.size=20 ###################################### # zk client thread limit ###################################### kafka.zk.limit.size=16 ###################################### # EFAK webui port ###################################### efak.webui.port=8048 ###################################### # EFAK enable distributed ###################################### efak.distributed.enable=false efak.cluster.mode.status=master efak.worknode.master.host=localhost efak.worknode.port=8085 ###################################### # kafka jmx acl and ssl authenticate ###################################### cluster1.efak.jmx.acl=false cluster1.efak.jmx.user=keadmin cluster1.efak.jmx.password=keadmin123 cluster1.efak.jmx.ssl=false cluster1.efak.jmx.truststore.location=/data/ssl/certificates/kafka.truststore cluster1.efak.jmx.truststore.password=ke123456 ###################################### # kafka offset storage ###################################### cluster1.efak.offset.storage=kafka #cluster2.efak.offset.storage=zk ###################################### # kafka jmx uri ###################################### cluster1.efak.jmx.uri=service:jmx:rmi:///jndi/rmi://%s/jmxrmi ###################################### # kafka metrics, 15 days by default ###################################### efak.metrics.charts=true efak.metrics.retain=15 ###################################### # kafka sql topic records max ###################################### efak.sql.topic.records.max=5000 efak.sql.topic.preview.records.max=10 ###################################### # delete kafka topic token ###################################### efak.topic.token=keadmin ###################################### # kafka sasl authenticate ###################################### cluster1.efak.sasl.enable=false cluster1.efak.sasl.protocol=SASL_PLAINTEXT cluster1.efak.sasl.mechanism=SCRAM-SHA-256 cluster1.efak.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="kafka-eagle"; cluster1.efak.sasl.client.id= cluster1.efak.blacklist.topics= cluster1.efak.sasl.cgroup.enable=false cluster1.efak.sasl.cgroup.topics= #cluster2.efak.sasl.enable=false #cluster2.efak.sasl.protocol=SASL_PLAINTEXT #cluster2.efak.sasl.mechanism=PLAIN #cluster2.efak.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafka-eagle"; #cluster2.efak.sasl.client.id= #cluster2.efak.blacklist.topics= #cluster2.efak.sasl.cgroup.enable=false #cluster2.efak.sasl.cgroup.topics= ###################################### # kafka ssl authenticate ###################################### #cluster3.efak.ssl.enable=false #cluster3.efak.ssl.protocol=SSL #cluster3.efak.ssl.truststore.location= #cluster3.efak.ssl.truststore.password= #cluster3.efak.ssl.keystore.location= #cluster3.efak.ssl.keystore.password= #cluster3.efak.ssl.key.password= #cluster3.efak.ssl.endpoint.identification.algorithm=https #cluster3.efak.blacklist.topics= #cluster3.efak.ssl.cgroup.enable=false #cluster3.efak.ssl.cgroup.topics= ###################################### # kafka sqlite jdbc driver address ###################################### #efak.driver=org.sqlite.JDBC #efak.url=jdbc:sqlite:/hadoop/kafka-eagle/db/ke.db #efak.username=root #efak.password=www.kafka-eagle.org ###################################### # kafka mysql jdbc driver address ###################################### efak.driver=com.mysql.cj.jdbc.Driver efak.url=jdbc:mysql://192.168.20.36:3306/kafka-eagle?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull efak.username=root efak.password=aa123456
双击bin下的ke.bat,访问连接地址localhost:8048/account/signin?/,默认账号密码是admin 123456
登录进来后就可以看到主题信息的生产情况和消费情况
版权声明:本文为weixin_42667972原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。