package com.ht.bigdata.service
import org.apache.hadoop.conf.Configuration
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.geotools.data.{DataStoreFinder, DataUtilities, Query}
import org.geotools.factory.CommonFactoryFinder
import org.geotools.feature.simple.SimpleFeatureBuilder
import org.geotools.filter.text.ecql.ECQL
import org.geotools.geojson.feature.FeatureJSON
import org.locationtech.geomesa.hbase.data.HBaseDataStore
import org.locationtech.geomesa.spark.GeoMesaSpark
import org.locationtech.jts.geom.Geometry
import org.opengis.feature.simple.SimpleFeature
import scala.collection.JavaConversions._
import org.springframework.stereotype.{Component}
@Component
class SparkUnionService extends java.io.Serializable{
val params = Map(
"hbase.zookeepers" ->"192.168.6.129",
"hbase.catalog"-> "building_1")
// see geomesa-tools/conf/sfts/gdelt/reference.conf
// val filter = ECQL.toFilter(null)
var n = 0
def addOne(x:SimpleFeature):SimpleFeature =
{
val feajson = new FeatureJSON
n=n+1
println(n)//export count
println(x.getAttribute(0))//export Attribute_1
println(x.getAttribute(1))//export Attribute_2
println(x.getAttribute(2))//export Attribute_3
println(x.getAttribute(3))//export Attribute_4
println(x.getType)//export type
println(feajson.toString(x))//export geojson
println("+++++++++++++++++++++++++++++++++++")
return x
}
def runSparkDemo: String = {
val ds = DataStoreFinder.getDataStore(params).asInstanceOf[HBaseDataStore]
// val query = new Query("my_type", filter)
val typeName=ds.getTypeNames()(0)
val query = new Query(typeName)
println("---------------------------------------------------------")
println(typeName)
println("---------------------------------------------------------")
println(query.getTypeName)
println("---------------------------------------------------------")
// set SparkContext
val conf = new SparkConf().setMaster("local[*]").setAppName("hrfhdk").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
val sc = SparkContext.getOrCreate(conf)
// create RDD with a geospatial query using GeoMesa functions
val spatialRDDProvider = GeoMesaSpark(params)
val resultRDD = spatialRDDProvider.rdd(new Configuration, sc, params, query)
val result=resultRDD.map(addOne).collect()
ds.dispose()
val feajson = new FeatureJSON
sc.stop()
return "11111111111111111111111111111111111";
// return ""
}
}
版权声明:本文为weixin_42034217原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。