java spark读写hdfs文件_Spark读取HDFS中的Zip文件

1. 任务背景

近日有个项目任务,要求读取压缩在Zip中的百科HTML文件,经分析发现,提供的Zip文件有如下特点(=>指代对应解决方案):

(1) 压缩为分卷文件 => 只需将解压缩在同一目录中的一个分卷zip即可解压缩出整个文件

(2) 压缩文件中又包含不同的两个文件夹,且各包含n个小zip文件,小zip文件中包含目录及对应的HTML文本文件

采用第一方案:依次解压缩各小zip文件,存放在一个目录中,然后上传到HDFS中

存在问题:每个小zip都包含上万个小文件,按照第一方案解压缩,耗费的时间太太太多了

(3) 解析的zip存在多文件的情况

(4) 数据总量共计50W

2. 优化方案

直接上传小zip文件,然后让Spark直接从zip文件中读取HTML文本,再使用jsoup解析,并存储至elasticsearch中。

实现过程中有一处需要注意! => 解析zip会遍历的ZipEntry,会识别文件夹和文件夹下的文件,即文件夹和文件在ZipEntry中被当成同类对象来对待。

例1:本地解析zip压缩文件demo

2b65ef29a5872cc0e4771c25889edd04.gif

6a087676c59fa8b19d76e6bb55a32902.gif

importjava.io.{BufferedInputStream, BufferedReader, FileInputStream, InputStreamReader}importjava.util.zip.{ZipFile, ZipInputStream}importnet.sf.json.JSONObjectimportorg.jsoup.Jsoupimportscala.collection.mutable

object Test {

def testZip(): Unit={

val baseDir= "part2/"val path= s"$baseDir\\06.zip"val zf= newZipFile(path)

val in= new BufferedInputStream(newFileInputStream(path))

val zin= newZipInputStream(in)

var zipEn=zin.getNextEntry

var count= 0

try{while (zipEn != null) {if (!zipEn.isDirectory) {

val buff= new BufferedReader(newInputStreamReader(zf.getInputStream(zipEn)))

val sb= newStringBuilder()

var line=buff.readLine()while (line != null) {

count= count + 1

if(line.nonEmpty) {

sb.append(line.trim)

}

line=buff.readLine()

}

val id= zipEn.getName.substring(zipEn.getName.indexOf("/") + 1, zipEn.getName.indexOf("."))

val doc=Jsoup.parse(sb.toString())

val title= doc.select(".lemmaWgt-lemmaTitle-title h1").text()

val sb1= newmutable.StringBuilder()

val eles= doc.select(".para")for (i

sb1.append(eles.get(i).text().trim).append("\t")

}

val json= newJSONObject()

json.put("id", id)

json.put("title", title)

json.put("content", sb1.toString().trim)

println(json)

buff.close()

}

zipEn=zin.getNextEntry

}

zin.closeEntry()

}catch{case _ =>}

println(count)

}

}

View Code

例2:Spark读取HDFS中的含有多文件的zip文件

2b65ef29a5872cc0e4771c25889edd04.gif

6a087676c59fa8b19d76e6bb55a32902.gif

def parseBaike(): Unit ={

val baseDir= "/work/ws/temp/baike"val sc= new SparkContext(new SparkConf().setAppName("parseBaike"))

val rdd= sc.binaryFiles(s"$baseDir/data/*.zip", 40)

.flatMap{case (zipFilePath: String, content: PortableDataStream) =>{

val zis= newZipInputStream(content.open())

Stream.continually(zis.getNextEntry)

.takeWhile(_!= null)

.flatMap(zipEn=>{if(zipEn.isDirectory) Noneelse{//基于文件名获取百科词条的id信息

val id = zipEn.getName.substring(zipEn.getName.indexOf("/")+1, zipEn.getName.indexOf("."))

val html= scala.io.Source.fromInputStream(zis, "UTF-8").getLines.mkString("")if(html.nonEmpty){

val doc=Jsoup.parse(html)//解析百科中的词条名称

val title = doc.select(".lemmaWgt-lemmaTitle-title h1").text()//获取词条HTML中的全部正文内容

val sb = newmutable.StringBuilder()

val eles= doc.select(".para")for(i

sb.append(eles.get(i).text().trim).append("\t")

}if(title.trim.nonEmpty &&sb.toString.trim.nonEmpty){

val json= newJSONObject()

json.put("id", id)

json.put("title", title)

json.put("content", sb.toString().trim)

Some(json)

}elseNone

}elseNone

}

})

}

}

rdd.cache()

rdd.saveAsTextFile(HDFSFileUtil.clean(s"$baseDir/result/json"))

rdd.foreach(f=>{//保存在Es中

ESHelper.saveToEs("baike", "baike", f, "id")

})

rdd.unpersist()

sc.stop()

}

View Code

注意:如上代码仅供参考,并隐去了部分业务相关代码,如HDFS和Es工具类,如若需要,可留言沟通交流!

3. 参考

原文:https://www.cnblogs.com/mengrennwpu/p/10859584.html


版权声明:本文为weixin_42484858原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。