首先导入POM依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>spark</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<hadoop.version>3.2.0</hadoop.version>
<spark.version>3.1.1</spark.version>
</properties>
<dependencies>
<!--阿里巴巴开源json解析框架-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.51</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-yarn_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.27</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>2.14.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.phoenix/phoenix-spark -->
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-spark</artifactId>
<version>4.14.3-HBase-1.4</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.0.5</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>2.0.5</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>2.0.5</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-mapreduce</artifactId>
<version>2.0.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
</dependencies>
</project>
SparkRDD读HBase
package com.jeff.spark
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.mapreduce._
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.HTable
import org.apache.spark.{SparkConf, SparkContext}
object SparkToHbase {
//设置HBaseConfiguration
def getHBaseConfiguration(quorum:String, port:String, tableName:String) = {
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum",quorum)
conf.set("hbase.zookeeper.property.clientPort",port)
conf
}
def main(args: Array[String]): Unit = {
//TODO 创建连接
val sparkconf = new SparkConf().setMaster("local").setAppName("emp")
val sc = new SparkContext(sparkconf)
val tableName = "emp"
val quorum = "hadoop100"
val port = "2181"
// 配置相关信息
val conf = getHBaseConfiguration(quorum,port,tableName)
conf.set(TableInputFormat.INPUT_TABLE,tableName)
// HBase数据转成RDD
val hBaseRDD = sc.newAPIHadoopRDD(conf,classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result]).cache()
// RDD数据操作
val data = hBaseRDD.map(x => {
val result = x._2
val key = Bytes.toString(result.getRow)
val empno = Bytes.toString(result.getValue("info".getBytes,"empno".getBytes))
val ename = Bytes.toString(result.getValue("info".getBytes,"ename".getBytes))
val job = Bytes.toString(result.getValue("info".getBytes,"job".getBytes))
val mgr = Bytes.toString(result.getValue("info".getBytes,"mgr".getBytes))
val hiredate = Bytes.toString(result.getValue("info".getBytes,"hiredate".getBytes))
val sal = Bytes.toString(result.getValue("info".getBytes,"sal".getBytes))
val comm = Bytes.toString(result.getValue("info".getBytes,"comm".getBytes))
val deptno = Bytes.toString(result.getValue("info".getBytes,"deptno".getBytes))
(empno,ename,job,mgr,hiredate,sal,comm,deptno)
})
data.foreach(println)
sc.stop()
}
}
SparkRDD写HBase
package com.jeff.spark
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
object SparkToHbaseWrite {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("HBaseWriteTest").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
// 配置相关信息
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum","hadoop100")
//设置zookeeper连接端口,默认2181
conf.set("hbase.zookeeper.property.clientPort", "2181")
val tablename = "emp"
//初始化jobconf,TableOutputFormat必须是org.apache.hadoop.hbase.mapred包下的!
val jobConf = new JobConf(conf)
jobConf.setOutputFormat(classOf[TableOutputFormat])
jobConf.set(TableOutputFormat.OUTPUT_TABLE, tablename)
// 写入数据到HBase
//val indataRDD: RDD[String] = sc.makeRDD(Array("1003,24,jeff,male", "1004,33,messi,male"))
val indataRDD: RDD[String] = sc.makeRDD(Array("7369,smith,clerk,7902,1980-12-17,800,null,20"
,"7499,allen,salesman,7698,1981-02-20,1600,300,30"
,"7521,ward,salesman,7698,1981-02-22,1250,500,30"
,"7566,jones,manager,7839,1981-04-02,2975,null,20"
,"7654,martin,salesman,7698,1981-09-28,1250,1400,30"
,"7698,blake,manager,7839,1981-05-01,2850,null,30"
,"7782,clark,manager,7839,1981-06-09,2450,null,10"
,"7788,scott,analyst,7566,1987-07-13,3000,null,20"
,"7839,king,president,null,1981-11-07,5000,null,10"
,"7844,turner,salesman,7698,1981-09-08,1500,0,30"
,"7876,adams,clerk,7788,1987-07-13,1100,null,20"
,"7900,james,clerk,7698,1981-12-03,950,null,30"
,"7902,ford,analyst,7566,1981-12-03,3000,null,20"
,"7934,miller,clerk,7782,1982-01-23,1300,null,10")) //7369,smith,clerk,7902,1980-12-17,800,null,20
val rdd = indataRDD
.map(_.split(','))
.map{arr=>{
/*一个Put对象就是一行记录,在构造方法中指定主键
* 所有插入的数据必须用org.apache.hadoop.hbase.util.Bytes.toBytes方法转换
* Put.add方法接收三个参数:列族,列名,数据
*/
println(arr(0),arr(1),arr(2),arr(3),arr(4),arr(5),arr(6),arr(7))
val put = new Put(Bytes.toBytes(arr(0)))
put.addImmutable(Bytes.toBytes("info"),Bytes.toBytes("empno"),Bytes.toBytes(arr(0)))
put.addImmutable(Bytes.toBytes("info"),Bytes.toBytes("ename"),Bytes.toBytes(arr(1)))
put.addImmutable(Bytes.toBytes("info"),Bytes.toBytes("job"),Bytes.toBytes(arr(2)))
put.addImmutable(Bytes.toBytes("info"),Bytes.toBytes("mgr"),Bytes.toBytes(arr(3)))
put.addImmutable(Bytes.toBytes("info"),Bytes.toBytes("hiredate"),Bytes.toBytes(arr(4)))
put.addImmutable(Bytes.toBytes("info"),Bytes.toBytes("sal"),Bytes.toBytes(arr(5)))
put.addImmutable(Bytes.toBytes("info"),Bytes.toBytes("comm"),Bytes.toBytes(arr(6)))
put.addImmutable(Bytes.toBytes("info"),Bytes.toBytes("deptno"),Bytes.toBytes(arr(7)))
//转化成RDD[(ImmutableBytesWritable,Put)]类型才能调用saveAsHadoopDataset
(new ImmutableBytesWritable, put)
}}.saveAsHadoopDataset(jobConf)
sc.stop()
}
}
版权声明:本文为qq_34341930原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。