SparkStream 读取kafka 做消费者读取kafka生产者传的数据

代码如下

package steaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

object StreamReadKafka extends App {

  val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkkafka")
  val streamingContext = new StreamingContext(sparkConf,Seconds(5))

  val kafkaDStream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(
  streamingContext,
  "192.168.106.107:2181",
  "testa",
  Map("kb07demo" -> 1)

 )
 val worldStream: DStream[String] = kafkaDStream.flatMap(x=>x._2.split(" "))

 worldStream.print()

 streamingContext.start()

 streamingContext.awaitTermination()


}

pom.xml 添加的

注意不要导入顺序错误 有的kafka依赖要在spark下面(我的配置也不完整正确 这是我学习使用自用可以 一下是部分必备)

 <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-graphx_2.11</artifactId>
      <version>2.2.0</version>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-flume_2.11</artifactId>
      <version>2.2.0</version>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
      <version>2.2.0</version>
    </dependency>

    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>0.10.0.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.12</artifactId>
      <version>2.2.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-streams</artifactId>
      <version>2.2.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
      <version>2.4.5</version>
    </dependency>

版权声明:本文为sunbin11220904原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。