【大数据】利用Geomesa从HBase数据库中导出shp数据并分析数据组成

package com.ht.bigdata.service
import org.apache.hadoop.conf.Configuration
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.geotools.data.{DataStoreFinder, DataUtilities, Query}
import org.geotools.factory.CommonFactoryFinder
import org.geotools.feature.simple.SimpleFeatureBuilder
import org.geotools.filter.text.ecql.ECQL
import org.geotools.geojson.feature.FeatureJSON
import org.locationtech.geomesa.hbase.data.HBaseDataStore
import org.locationtech.geomesa.spark.GeoMesaSpark
import org.locationtech.jts.geom.Geometry
import org.opengis.feature.simple.SimpleFeature

import scala.collection.JavaConversions._
import org.springframework.stereotype.{Component}


@Component
class SparkUnionService extends java.io.Serializable{
  val params = Map(
    "hbase.zookeepers" ->"192.168.6.129",
    "hbase.catalog"-> "building_1")
  // see geomesa-tools/conf/sfts/gdelt/reference.conf
//  val filter = ECQL.toFilter(null)
var n = 0
  def addOne(x:SimpleFeature):SimpleFeature =
  {
    val feajson = new FeatureJSON

    n=n+1
    println(n)//export count
    println(x.getAttribute(0))//export Attribute_1
    println(x.getAttribute(1))//export Attribute_2
    println(x.getAttribute(2))//export Attribute_3
    println(x.getAttribute(3))//export Attribute_4
    println(x.getType)//export type
    println(feajson.toString(x))//export geojson
    println("+++++++++++++++++++++++++++++++++++")
    return x
  }
  def runSparkDemo: String = {

    val ds = DataStoreFinder.getDataStore(params).asInstanceOf[HBaseDataStore]
//    val query = new Query("my_type", filter)
    val typeName=ds.getTypeNames()(0)
    val query = new Query(typeName)
    println("---------------------------------------------------------")
    println(typeName)
    println("---------------------------------------------------------")
    println(query.getTypeName)
    println("---------------------------------------------------------")
    // set SparkContext
    val conf = new SparkConf().setMaster("local[*]").setAppName("hrfhdk").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
    val sc = SparkContext.getOrCreate(conf)

    // create RDD with a geospatial query using GeoMesa functions
    val spatialRDDProvider = GeoMesaSpark(params)

    val resultRDD = spatialRDDProvider.rdd(new Configuration, sc, params, query)

    val result=resultRDD.map(addOne).collect()
    ds.dispose()
    val feajson = new FeatureJSON
    sc.stop()
    return "11111111111111111111111111111111111";

    //  return ""
  }
}

 


版权声明:本文为weixin_42034217原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。