import java.io.DataOutputStream
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem}
import org.apache.hadoop.io.compress.{CompressionCodec, GzipCodec}
import org.apache.hadoop.mapred.{JobConf, RecordWriter, TextOutputFormat}
import org.apache.hadoop.util.{Progressable, ReflectionUtils}
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
object writeToHdfs {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("writeToHdfs")
//优雅的终止sparkstream,保证数据不丢失
.set("spark.streaming.stopGracefullyOnShutdown", "true")
// .setMaster("local[*]")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(10))
KafkaToHdfs(ssc,"/test")
ssc.start()
ssc.awaitTermination()
sc.stop()
}
def KafkaToHdfs(ssc: StreamingContext,savePath:String): Unit = {
val TOPICS : Set[String] = Set("msg1","msg2","msg3")
val groupid = "group_id_01"
val BROKER_LIST ="localhost:9092"
val kafkaParams = Map[String, String]("bootstrap.servers" -> BROKER_LIST,
"value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"group.id" -> groupid,
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> "true", //自动commit
"auto.commit.interval.ms" -> "100", //定时commit的周期
"session.timeout.ms" -> "30000", //consumer活性超时时间
"auto.offset.reset" -> "latest"
)
val kafkaDStream :DStream[(String,String)]= KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](TOPICS, kafkaParams))map(m => (m.key(),m.value())) // pairRDD
kafkaDStream
.foreachRDD(rdd=>{
var date:Date=new Date()
val simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
val dir=simpleDateFormat.format(date)
RDD.rddToPairRDDFunctions(rdd).partitionBy(new HashPartitioner(1)) //减少分区
.saveAsHadoopFile("sink_"+dir+"/", classOf[String], classOf[String], classOf[RDDMultipleTextOutputFormat])
})
}
case class RDDMultipleTextOutputFormat() extends TextOutputFormat[Any, Any] {
val currentTime: Date = new Date()
val formatter = new SimpleDateFormat("yyyy-MM-dd-HH");
val dateString = formatter.format(currentTime);
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapred.FileOutputFormat
override def getRecordWriter(ignored: FileSystem, job: JobConf, name: String, progress: Progressable): RecordWriter[Any, Any] = {
val isCompressed: Boolean = FileOutputFormat.getCompressOutput(job)
val keyValueSeparator: String = job.get("mapreduce.output.textoutputformat.separator", "\t")
// 一个小时一个文件 追加形式 避免了 hdfs小文件问题
val iname = "sink_" + dateString
if (!isCompressed) {
val file: Path = FileOutputFormat.getTaskOutputPath(job, iname)
val fs: FileSystem = file.getFileSystem(job)
val newFile : Path = new Path(FileOutputFormat.getOutputPath(job), iname)
// 如果存在 那么追加 如果不存在 新建
val fileOut : FSDataOutputStream = if (fs.exists(newFile)) {
fs.append(newFile)
} else {
fs.create(file, progress)
}
new TextOutputFormat.LineRecordWriter[Any, Any](fileOut, keyValueSeparator)
} else {
val codecClass: Class[_ <: CompressionCodec] = FileOutputFormat.getOutputCompressorClass(job, classOf[GzipCodec])
val codec: CompressionCodec = ReflectionUtils.newInstance(codecClass, job)
val file: Path = FileOutputFormat.getTaskOutputPath(job, iname + codec.getDefaultExtension)
val fs: FileSystem = file.getFileSystem(job)
val newFile : Path = new Path(FileOutputFormat.getOutputPath(job), iname + codec.getDefaultExtension)
val fileOut: FSDataOutputStream = if (fs.exists(newFile)) {
fs.append(newFile)
} else {
fs.create(file, progress)
}
new TextOutputFormat.LineRecordWriter[Any, Any](new DataOutputStream(codec.createOutputStream(fileOut)), keyValueSeparator)
}
}
}
}
版权声明:本文为qq_38250124原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。