sparkStreaming 写入 hdfs

import java.io.DataOutputStream
import java.text.SimpleDateFormat
import java.util.Date

import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem}
import org.apache.hadoop.io.compress.{CompressionCodec, GzipCodec}
import org.apache.hadoop.mapred.{JobConf, RecordWriter, TextOutputFormat}
import org.apache.hadoop.util.{Progressable, ReflectionUtils}
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}

object writeToHdfs {
  def main(args: Array[String]): Unit = {

    val conf = new SparkConf()
      .setAppName("writeToHdfs")
      //优雅的终止sparkstream,保证数据不丢失
      .set("spark.streaming.stopGracefullyOnShutdown", "true") 
          //      .setMaster("local[*]")

    val sc = new SparkContext(conf)

    val ssc = new StreamingContext(sc, Seconds(10))


    KafkaToHdfs(ssc,"/test")


    ssc.start()
    ssc.awaitTermination()
    sc.stop()

  }

  def KafkaToHdfs(ssc: StreamingContext,savePath:String): Unit = {

    val TOPICS : Set[String] = Set("msg1","msg2","msg3")
    val groupid = "group_id_01"
    val BROKER_LIST ="localhost:9092"
    val kafkaParams = Map[String, String]("bootstrap.servers" -> BROKER_LIST,
      "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "group.id" -> groupid,
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> "true", //自动commit
      "auto.commit.interval.ms" -> "100", //定时commit的周期
      "session.timeout.ms" -> "30000", //consumer活性超时时间
    "auto.offset.reset" -> "latest"
    )

    val kafkaDStream :DStream[(String,String)]= KafkaUtils.createDirectStream(
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](TOPICS, kafkaParams))map(m => (m.key(),m.value())) // pairRDD

    kafkaDStream
      .foreachRDD(rdd=>{
        var date:Date=new Date()
        val simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
        val dir=simpleDateFormat.format(date)
        RDD.rddToPairRDDFunctions(rdd).partitionBy(new HashPartitioner(1)) //减少分区
          .saveAsHadoopFile("sink_"+dir+"/", classOf[String], classOf[String], classOf[RDDMultipleTextOutputFormat])
      })


  }

  case class RDDMultipleTextOutputFormat() extends TextOutputFormat[Any, Any] {

    val currentTime: Date = new Date()
    val formatter = new SimpleDateFormat("yyyy-MM-dd-HH");
    val dateString = formatter.format(currentTime);

    import org.apache.hadoop.fs.Path
    import org.apache.hadoop.mapred.FileOutputFormat


    override def getRecordWriter(ignored: FileSystem, job: JobConf, name: String, progress: Progressable): RecordWriter[Any, Any] = {
      val isCompressed: Boolean = FileOutputFormat.getCompressOutput(job)
      val keyValueSeparator: String = job.get("mapreduce.output.textoutputformat.separator", "\t")
      // 一个小时一个文件 追加形式 避免了 hdfs小文件问题
      val iname = "sink_" + dateString
      if (!isCompressed) {
        val file: Path = FileOutputFormat.getTaskOutputPath(job, iname)
        val fs: FileSystem = file.getFileSystem(job)
        val newFile : Path = new Path(FileOutputFormat.getOutputPath(job), iname)
        // 如果存在 那么追加 如果不存在 新建
        val fileOut : FSDataOutputStream = if (fs.exists(newFile)) {
          fs.append(newFile)
        } else {
          fs.create(file, progress)
        }
        new TextOutputFormat.LineRecordWriter[Any, Any](fileOut, keyValueSeparator)
      } else {
        val codecClass: Class[_ <: CompressionCodec] = FileOutputFormat.getOutputCompressorClass(job, classOf[GzipCodec])
        val codec: CompressionCodec = ReflectionUtils.newInstance(codecClass, job)
        val file: Path = FileOutputFormat.getTaskOutputPath(job, iname + codec.getDefaultExtension)
        val fs: FileSystem = file.getFileSystem(job)
        val newFile : Path = new Path(FileOutputFormat.getOutputPath(job), iname + codec.getDefaultExtension)

        val fileOut: FSDataOutputStream = if (fs.exists(newFile)) {
          fs.append(newFile)
        } else {
          fs.create(file, progress)
        }
        new TextOutputFormat.LineRecordWriter[Any, Any](new DataOutputStream(codec.createOutputStream(fileOut)), keyValueSeparator)
      }
    }
  }
}

版权声明:本文为qq_38250124原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。