mysql广播_广播变量

从 mysql 读取数据作为广播变量时, 虽然有 checkpoint 但是 kill 掉任务后,重启程序会失败。

class GacXs6Offline @Inject()( sparkConf : SparkConfiguration,

mysqlConf : MysqlConfiguration,

hbaseConf : HbaseConfiguration,

sparkContext : EnterpriseSparkContext[SparkContext],

source : NationDStream[(String,NaSourceData)],

naDriveTrip : NaDriveTrip

) extends Serializable {

val naDriveTripDS = naDriveTrip.exteact(source)

saveNaDriveTrip("drive_trip", naDriveTrip)

}

// serializable error, because the use of hbaseConf

def saveNaDriveTrip(tableName: String, naDriveTrip: DStream[(String, TripState)]): Unit = {

naDriveTrip.foreachRDD(rdd => {

val conf = HBaseConfiguration.create()

val jobConf = new JobConf(conf)

jobConf.set("hbase.zookeeper.quorum", hbaseConf.hbaseUrl)

jobConf.set("zookeeper.znode.parent", "/hbase")

jobConf.set(TableOutputFormat.OUTPUT_TABLE, tableName)

jobConf.setOutputFormat(classOf[TableOutputFormat])

rdd.map(x => {

(new ImmutableBytesWritable, (new NaHbaseDao).putNaTripData(x._1, x._2))

}).saveAsHadoopDataset(jobConf)

})

}

// serializable ok, because we create a new hbaseConf

def saveNaDriveTrip(tableName: String, naDriveTrip: DStream[(String, TripState)]): Unit = {

naDriveTrip.foreachRDD(rdd => {

val conf = HBaseConfiguration.create()

val jobConf = new JobConf(conf)

val naHbaseConf = new HbaseConfiguration

jobConf.set("hbase.zookeeper.quorum", naHbaseConf.hbaseUrl)

jobConf.set("zookeeper.znode.parent", "/hbase")

jobConf.set(TableOutputFormat.OUTPUT_TABLE, tableName)

jobConf.setOutputFormat(classOf[TableOutputFormat])

rdd.map(x => {

(new ImmutableBytesWritable, (new NaHbaseDao).putNaTripData(x._1, x._2))

}).saveAsHadoopDataset(jobConf)

})

}

// serializable ok, because we create a new hbaseConf

def saveNaDriveTrip(tableName: String, naDriveTrip: DStream[(String, TripState)]): Unit = {

naDriveTrip.foreachRDD(rdd => {

rdd.foreachPartition(partitionRdd => {

val hbaseConf = new HbaseConfiguration

val hbase = new HbaseUtil(hbaseConf)

val connection = hbase.getHbaseConn

val table = connection.getTable(TableName.valueOf(tableName))

val list = partitionRdd.map(data => {

NaHbaseDao.putNaTripData(data._1, data._2)

}).toList

if (null != list && list.nonEmpty) {

NaHbaseDao.saveData(list, table)

}

})

})

}


版权声明:本文为weixin_36387422原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。