所用到的软件版本
spark2.3.0
idea2019.1
kafka_2.11-0.10.2.2
spark-streaming-kafka-0-10_2.11-2.3.0
设想是在win7系统下爬虫得到JSON数据存储到win7文件夹,利用共享文件,Centos7 mount共享文件,得到JSON数据,然后利用kafka自带的connect-file-source监听该文件:
bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties
利用如下代码查看经过kafka产生的数据:
./bin/kafka-console-consumer.sh --bootstrap-server master:9092,slave1:9092,slave2:9092 --topic streaming_kafka --from-beginning
其数据格式如下:
{"schema":{"type":"string","optional":false},"payload":"{\"like_count\": 832, \"view_count\": 37210, \"user_name\": \" ֪ʶ \", \"play_url\": \"http://jsmov2.a.yximgs.com/upic/2019/04/12/19/A0MNc3NjIxXzJfMw==_b_B12594561fec10c99ab12c417bfbc8b7d.mp4?tag=1-1555243582-h-0-mznoh8fetl-6e60d4850f55979f\", \"description\": \" ٻ С֪ʶ \\n# л Ҫ \", \"cover\": \"http://ali2.a.yximgs.com/uhead/AB/2019/02/18/01/BjYxXzJfaGQ1NTZfNzg3_s.jpg\", \"video_id\": 5229242128224334952, \"comment_count\": 178, \"download_url\": \"http://txmov2-fallback.a.yximgs.com/upic/2019/04/12/19/BNDQxMjYxXzEyMTQ0ODc3NjIxXzJfMw==_b_B12594561fec10c99ab12c417bfbc8b7d.mp4?tag=1-1555243582-h-1-jf11d5efbf-43abbeebb510c74a\", \"user_id\": 900441261, \"video_create_time\": \"2019-04-12 19:42:54\", \"user_sex\": \"M\"}"}
kafka再将数据传输到sparkstreaming中,sparkstreaming对JSON数据进行消费处理,直接利用KafkaUtils.createDirectStream方法得到JSON的DStream:
val conf = new SparkConf().setAppName("kafka-spark-demo")
val ssc = new StreamingContext(conf, Seconds(5))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "master:9092,salve1:9092,slave2:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "streamingkafka1",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false:java.lang.Boolean)
)
val topics = Array("streaming_kafka")
val stream: InputDStream[ConsumerRecord[String, String]]= KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
得到stream后,利用stream.print()方法,发现所得到的数据如下格式:
ConsumerRecord(topic = kafka_streaming, partition = 0, offset = 0, CreateTime = 1555243610459, checksum = 554481609, serialized key size = 30, serialized value size = 399, key = {"schema":null,"payload":null}, value = {"schema":{"type":"string","optional":false},"payload":" {\"comment_count\": 32, \"download_url\": \"http://txmov2-fallback.a.yximgs.com/upic/2019/04/10/16/jM3MjFfNjUzMTM5NzYxXzEyMDk0MDk3MjE5XzFfMw==_b_Ba131d3edd32f313334226eda9f72d446.mp4?tag=1-1555243542-h-1-unll5jdoau-eaabcc96eaf99f34\", \"user_id\": 653139761, \"video_create_time\": \"2019-04-10 16:37:23\", \"user_sex\": \"M\"}"})
继续使用stream.map(record => record.value()),得到的数据如下格式:
{"schema":{"type":"string","optional":false},"payload":" {\"comment_count\": 32, \"download_url\": \"http://txmov2-fallback.a.yximgs.com/upic/2019/04/10/16/BAxNjM3MjFfNjUzMTM5NzYxXzEyMDk0MDk3MjE5XzFfMw==_b_Ba131d3edd32f313334226eda9f72d446.mp4?tag=1-1555243542-h-1-unll5jdoau-eaabcc96eaf99f34\", \"user_id\": 653139761, \"video_create_time\": \"2019-04-10 16:37:23\", \"user_sex\": \"M\"}"}
上述数据中payload是win7系统中所爬取的到JSON数据,有两个问题:
第一:上述数据变成了多级JSON,需要解析其中payload数据
第二:payload后面有双引号“”,并且里面有反斜杠\。
通过对kafka的/config中connect-standalone.properties和connect-file-source.properties分析,要把shcema这部分数据进行去除,要对connect-standalone.properties中的key.converter.schemas.enable和value.converter.schemas.enable设置为false(参考文章:https://www.cnblogs.com/listenfwind/p/8610487.html),经过重新设置后:
stream.map(record => record.value()),得到的数据格式如下:
" {\"comment_count\": 32, \"download_url\": \"http://txmov2-fallback.a.yximgs.com/upic/2019/04/10/16/MjFfNjUzMTM5NzYxXzEyMDk0MDk3MjE5XzFfMw==_b_Ba131d3edd32f313334226eda9f72d446.mp4?tag=1-1555243542-h-1-unll5jdoau-eaabcc96eaf99f34\", \"user_id\": 653139761, \"video_create_time\": \"2019-04-10 16:37:23\", \"user_sex\": \"M\"}"
然后经过map(str => str.replace("\"{","{")).map(str => str.replace("}\"", "}")).map(str => str.replace("\\\"", "\"")),即可将首尾两个冒号和里面的反斜杠去除,得到如下正常格式的JSON数据:
{"user_id": 667396227, "video_id": 5253730450234103327, "cover": "http://ali2.a.yximgs.com/uhead/AB/2019/01/31/22/zk2MjI3XzFfaGQyNjFfNzE5_s.jpg", "video_create_time": "2019-04-15 20:42:33", "download_url": "http://txmov2-fallback.a.yximgs.com/upic/2019/04/15/20k2MjI3XzEyMjMzMjA3MjI0XzFfMw==_b_B6f22e69099eaf94d88e839e79dded4cf.mp4?tag=1-1555384085-h-1-qwa8yuf2kp-725c069fddd00afa", "user_sex": "M", "comment_count": 109, "user_name": "橙子耶??_", "view_count": 22218, "like_count": 1714, "play_url": "http://qnmov.a.yximgs.com/upic/2019/04/15/20/zk2MjI3XzEyMjMzMjA3MjI0XzFfMw==_b_B6f22e69099eaf94d88e839e79dded4cf.mp4?tag=1-1555384085-h-0-imjgwczruv-8e58855fe574a205", "description": "多喝热水"}
然后利用如下代码对JSON数据进行处理分析,(参考文章:https://blog.csdn.net/shirukai/article/details/85211951)
val readyData = stream.map(record => record.value()).map(str => str.replace("\"{","{")).map(str => str.replace("}\"", "}")).map(str => str.replace("\\\"", "\""))
readyData.foreachRDD(rdd => {
val session = SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate()
import session.implicits._
val df = session.read.json(session.createDataset(rdd))
df.registerTempTable("sparkjson")
session.sql("select * from sparkjson where view_count >500000").show()
session.sql("select * from sparkjson where view_count >500000").write.format("json").mode(saveMode = "append").save("hdfs://master:9000/data/highViewVideo")
})
思考:
1:为什么JSON数据源经过kafka自带的connect-file-source监听该文件后会出现首尾双冒号和反斜杠,看过org.apache.kafka.connect.json.JsonConverter的源码,未看懂!!
2:对特殊符号,利用replace()和 replaceAll()进行处理,这是一个很好的方法!!!
做好记录,供以后参考!!