代码如下
package steaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
object StreamReadKafka extends App {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkkafka")
val streamingContext = new StreamingContext(sparkConf,Seconds(5))
val kafkaDStream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(
streamingContext,
"192.168.106.107:2181",
"testa",
Map("kb07demo" -> 1)
)
val worldStream: DStream[String] = kafkaDStream.flatMap(x=>x._2.split(" "))
worldStream.print()
streamingContext.start()
streamingContext.awaitTermination()
}
pom.xml 添加的
注意不要导入顺序错误 有的kafka依赖要在spark下面(我的配置也不完整正确 这是我学习使用自用可以 一下是部分必备)
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-graphx_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>2.4.5</version>
</dependency>
版权声明:本文为sunbin11220904原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。