在cdh6.2.0下,pyspark2.3.0读写hbase2.1.0
1.使用Hive
先建立Hbase表,插入数据
create 'books', 'info', 'analytics'
put 'books', 'In Search of Lost Time', 'info:author', 'Marcel Proust'
put 'books', 'In Search of Lost Time', 'info:year', '1922'
put 'books', 'In Search of Lost Time', 'analytics:views', '3298'
put 'books', 'Godel, Escher, Bach', 'info:author', 'Douglas Hofstadter'
put 'books', 'Godel, Escher, Bach', 'info:year', '1979'
put 'books', 'Godel, Escher, Bach', 'analytics:views', '820'
再建立hive表:
CREATE EXTERNAL TABLE IF NOT EXISTS `default`.`books_ext` (
`title` string,
`author` string,
`year` int,
`views` double
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.hbase.HBaseSerDe'
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES (
'hbase.columns.mapping'=':key,info:author,info:year,analytics:views'
)
TBLPROPERTIES (
'hbase.mapred.output.outputtable'='books',
'hbase.table.name'='books'
);
下载hive-hbase-handler-2.1.1-cdh6.2.0.jar,放到/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/spark/jars里
#! /usr/bin/python3.6
# -*- encoding:utf-8 -*-
"""
@author: xuan
@file:.py
@time:20-3-19
"""
from pyspark.sql import SparkSession
import os
if __name__ == "__main__":
os.environ["PYSPARK_PYTHON"] = "/home/venv/bin/python3.6"
warehouse_location = "/user/hive/warehouse/"
spark = SparkSession.builder \
.master("local[2]") \
.appName("Hive2Hbase")\
.enableHiveSupport() \
.config("spark.sql.warehouse.dir", warehouse_location) \
.getOrCreate()
#设置打印warn及以上级别的日志
spark.sparkContext.setLogLevel("WARN")
'''
执行扫描时,调整会话上的缓存参数有助于提高性能,例如:
spark.sql('SET hbase.scan.cache=10000')
spark.sql('SET hbase.client.scanner.cache=10000')
'''
'''
当直接从配置单元使用book_ext时,我们可以通过explain看到配置单元将简单谓词优化为正确的HBase语句:
其中title='something'成为HBase get。
其中title>=“something”和title<“something”被正确转换为范围扫描
例如,如果使用between运算符,则不是这种情况。
虽然配置单元hbasstoragehandler能够理解并正确转换简单的查询谓词,但spark引擎并没有这么聪明:
查询会立即转换为全表扫描,然后在Spark中进行过滤,这样Hive就无法优化和下推过滤器。Dataframe.explain()将显示Spark物理计划。
后一点意味着,在对整个表执行操作(如全表扫描)时,从Spark通过Hive访问HBase是一个不错的选择。
'''
spark.sql("select* from default.books_ext where title='In Search of Lost Time'").show(truncate=False)
spark.sql("select* from default.books_ext where title='In Search of Lost Time'").explain()
#spark.table("default.books_ext").show(truncate=False)
#先落地成Parquet文件再读取,速度会快一些
# spark.sql("create table default.books_ext_parquet stored as parquet as select * from default.books_ext")
spark.sql("select * from default.books_ext").write.parquet("%s/books_ext_parquet"%warehouse_location,"overwrite")
# spark.sql("select * from default.books_ext").write.format("parquet").mode("overwrite").save("%s/books_ext_parquet"%warehouse_location)
spark.read.parquet("%s/books_ext_parquet"%warehouse_location).show()
spark.stop()
2.使用hortonworks的开源框架shc:源码编译
软件准备:
- shc源码包,根据自己环境的spark版本选择下载
下载地址:https://github.com/hortonworks-spark/shc/releases
我下载的版本是v1.1.3-2.3-SystemTest,适用于spark2.3+ - idea打开v1.1.3-2.3-SystemTest文件夹,修改pom.xml文件,下载相关的依赖
1.v1.1.3-2.3-SystemTest/pom.xml,根据需要修改各版本
2.v1.1.3-2.3-SystemTest/core/pom.xml<groupId>com.hortonworks</groupId> <artifactId>shc</artifactId> <version>spark-2.3.0-hbase-2.1.0</version> <packaging>pom</packaging> <name>HBase Spark Connector Project Parent POM</name> <url>https://github.com/hortonworks-spark/shc</url> <spark.version>2.3.1</spark.version> <hbase.version>2.1.0-cdh6.2.0</hbase.version> <phoenix.version>5.0.0-alpha-HBase-2.0</phoenix.version> <test_classpath_file>${project.build.directory}/spark-test- classpath.txt</test_classpath_file> <java.version>1.8</java.version> <scala.version>2.11.12</scala.version> <repository> <id>cloudera-repo</id> <name>Internal Repository</name> <url>https://repository.cloudera.com/artifactory/cloudera-repos</url> <releases> <enabled>true</enabled> </releases> <snapshots> <enabled>false</enabled> </snapshots> </repository>
3.v1.1.3-2.3-SystemTest/example/pom.xml<parent> <groupId>com.hortonworks</groupId> <artifactId>shc</artifactId> <version>spark-2.3.0-hbase-2.1.0</version> <relativePath>../pom.xml</relativePath> </parent> <artifactId>shc-core</artifactId> <version>spark-2.3.0-hbase-2.1.0</version> <packaging>jar</packaging> <name>HBase Spark Connector Project Core</name> <!--加入repository--> <repository> <id>nexus-aliyun</id> <name>Nexus aliyun</name> <url>http://maven.aliyun.com/nexus/content/groups/public</url> <releases> <enabled>true</enabled> </releases> <snapshots> <enabled>false</enabled> </snapshots> </repository> <repository> <id>cloudera-repo</id> <name>Internal Repository</name> <url>https://repository.cloudera.com/artifactory/cloudera-repos</url> <releases> <enabled>true</enabled> </releases> <snapshots> <enabled>false</enabled> </snapshots> </repository> <!--加入dependency json4s,否则运行时会报错java.lang.NoSuchMethodError: org.json4s.jackson.JsonMethods$.parse(Lorg/json4s/JsonInput;Z)Lorg/json4s/JsonAST$JValue;--> <dependency> <groupId>org.json4s</groupId> <artifactId>json4s-jackson_2.11</artifactId> <version>3.3.0</version> </dependency>
在v1.1.3-2.3-SystemTest目录下运行mvn的clean install,打包成功后在/shc-1.1.3-2.3-SystemTest/core/target会生成shc-core-spark-2.3.0-hbase-2.1.0.jar,将此包放入/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/spark/jars里。若没时间自行编译可以在这里 shc-core-spark-2.3.0-hbase-2.1.0.jar下载已经编译好的包。<parent> <groupId>com.hortonworks</groupId> <artifactId>shc</artifactId> <version>spark-2.3.0-hbase-2.1.0</version> <relativePath>../pom.xml</relativePath> </parent> <artifactId>shc-examples</artifactId> <version>spark-2.3.0-hbase-2.1.0</version> <packaging>jar</packaging> <name>HBase Spark Connector Project Examples</name>
-代码编写
在idea edit Configurations设置环境变量:
PYTHONUNBUFFERED=1;HBASE_HOME=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/hbase;SPARK_HOME=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/spark;PYTHONPATH=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/spark/python
#! /usr/bin/python3.6
# -*- encoding:utf-8 -*-
"""
@author: xuan
@file:.py
@time:20-3-19 下午17:40
"""
from pyspark.sql import SparkSession
import os
if __name__ == "__main__":
os.environ["PYSPARK_PYTHON"] = "/home/venv/bin/python3.6"
warehouse_location = "/user/hive/warehouse/"
spark = SparkSession.builder \
.master("local[2]") \
.appName("Spark-shc-Hbase")\
.enableHiveSupport() \
.config("spark.sql.warehouse.dir", warehouse_location) \
.getOrCreate()
#设置打印warn及以上级别的日志
spark.sparkContext.setLogLevel("WARN")
dep = "org.apache.spark.sql.execution.datasources.hbase"
catalog = ''.join("""{
"table":{"namespace":"default", "name":"student"},
"rowkey":"key",
"columns":{
"row_key":{"cf":"rowkey", "col":"key", "type":"string"},
"id":{"cf":"info", "col":"id", "type":"string"},
"name":{"cf":"info", "col":"name", "type":"string"}
}
}""".split())
catalog_output = ''.join("""{
"table":{"namespace":"default", "name":"student_tmp"},
"rowkey":"key",
"columns":{
"row_key":{"cf":"rowkey", "col":"key", "type":"string"},
"id":{"cf":"info", "col":"id", "type":"string"},
"name":{"cf":"info", "col":"name", "type":"string"}
}
}""".split())
df_input = spark.read \
.format(dep) \
.options(catalog=catalog) \
.load()
df_input.where(df_input["row_key"] == "xx").show()
# 写入hbase
df_input.write \
.format(dep) \
.options(catalog=catalog_output, newTable="3") \
.save()
3.Spark-HBase Connector
#! /usr/bin/python3.6
# -*- encoding:utf-8 -*-
"""
@author: xuan
@file:.py
@time:20-3-22 下午2:31
"""
from pyspark.sql import SparkSession
import os
if __name__ == "__main__":
os.environ["PYSPARK_PYTHON"] = "/home/venv/bin/python3.6"
warehouse_location = "/user/hive/warehouse/"
spark = SparkSession.builder \
.master("local[2]") \
.appName("Spark-on-Hbase")\
.enableHiveSupport() \
.config("spark.sql.warehouse.dir", warehouse_location) \
.getOrCreate()
#设置打印warn及以上级别的日志
spark.sparkContext.setLogLevel("WARN")
data_source_format = 'org.apache.hadoop.hbase.spark'
df_input = spark.read.format(data_source_format) \
.option('hbase.table', 'student') \
.option('hbase.config.resources', 'file:///etc/hbase/conf/hbase-site.xml') \
.option('hbase.columns.mapping', \
'rowkey STRING :key, \
id STRING info:id, \
name STRING info:name') \
.option('hbase.use.hbase.context', False) \
.option('hbase-push.down.column.filter', False) \
.load()
df_input.show()
# 用Spark-HBase Connector方法读取的dataframe加上过滤条件会报错,目前本人没有解决这个问题。
# : java.lang.NoSuchMethodError: org.apache.hadoop.hbase.util.ByteStringer.wrap([B)Lcom/google/protobuf/ByteString;
df_input.where(df_input["rowkey"] == "xx").show()
df_input.write.format(data_source_format) \
.option('hbase.table', 'student_tmp') \
.option('hbase.config.resources', 'file:///etc/hbase/conf/hbase-site.xml') \
.option('hbase.columns.mapping', \
'rowkey STRING :key, \
id STRING info:id, \
name STRING info:name') \
.option('hbase.use.hbase.context', False) \
.option('hbase-push.down.column.filter', False) \
.save()
4.newAPIHadoopRDD saveAsNewAPIHadoopDataset
#! /usr/bin/python3.6
# -*- encoding:utf-8 -*-
"""
@author: xuan
@file:.py
@time:20-3-22 下午2:41
"""
from pyspark.sql import SparkSession
import os
if __name__ == "__main__":
os.environ["PYSPARK_PYTHON"] = "/home/venv/bin/python3.6"
warehouse_location = "/user/hive/warehouse/"
spark = SparkSession.builder \
.master("local[2]") \
.appName("Spark-APIHadoopRDD-Hbase") \
.enableHiveSupport() \
.config("spark.sql.warehouse.dir", warehouse_location) \
.getOrCreate()
# 设置打印warn及以上级别的日志
spark.sparkContext.setLogLevel("WARN")
hosts = "192.168.5.128,192.168.5.129,192.168.5.130"
input_table = "student"
input_conf = {"hbase.zookeeper.quorum": hosts, \
"hbase.mapreduce.inputtable": input_table
}
keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"
valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"
hbase_rdd = spark.sparkContext.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat",
"org.apache.hadoop.hbase.io.ImmutableBytesWritable",
"org.apache.hadoop.hbase.client.Result", keyConverter=keyConv,
valueConverter=valueConv, conf=input_conf)
hbase_rdd.cache()
output = hbase_rdd.collect()
for (k, v) in output:
print(k, v)
'''
rdd转换为dataframe
参考:1.https://blog.csdn.net/hchzhao_1985/article/details/82717949?depth_1-utm_source=distribute.pc_relevant.none-task&utm_source=distribute.pc_relevant.none-task
2.https://blog.csdn.net/u011412768/article/details/93404921
'''
import json
def call_transfor(y1):
y2 = [json.loads(i) for i in y1]
fdc = {}
for i in y2:
colname = i['qualifier']
value = i['value']
fdc[colname] = value
return fdc
def rdd_to_df(hbase_rdd):
fdc_split = hbase_rdd.map(lambda x: (x[0], x[1].split('\n')))
# [('xuan', ['{"qualifier" : "city", "timestamp" : "1584196930408", "columnFamily" : "address", "row" : "xuan", "type" : "Put", "value" : "dongguang"}', '{"qualifier" : "contry", "timestamp" : "1584196930408", "columnFamily" : "address", "row" : "xuan", "type" : "Put", "value" : "china"}', '{"qualifier" : "province", "timestamp" : "1584196930408", "columnFamily" : "address", "row" : "xuan", "type" : "Put", "value" : "guangdong"}', '{"qualifier" : "age", "timestamp" : "1584196930408", "columnFamily" : "info", "row" : "xuan", "type" : "Put", "value" : "28"}', '{"qualifier" : "birthday", "timestamp" : "1584196930408", "columnFamily" : "info", "row" : "xuan", "type" : "Put", "value" : "1992-10-18"}', '{"qualifier" : "company", "timestamp" : "1584196930408", "columnFamily" : "info", "row" : "xuan", "type" : "Put", "value" : "aliba"}'])]
# print(fdc_split.collect())
# 提取列名和取值
fdc_cols = fdc_split.map(lambda x: (x[0], call_transfor(x[1])))
# [('xuan', {'city': 'dongguang', 'contry': 'china', 'province': 'guangdong', 'age': '28', 'birthday': '1992-10-18', 'company': 'aliba'})]
# print(fdc_cols.collect())
# print(fdc_cols.map(lambda x: [i for i in x[1]]).take(1)[0])
colnames = ['row_key'] + fdc_cols.map(lambda x: [i for i in x[1]]).take(1)[0]
fdc_dataframe = fdc_cols.map(lambda x: [x[0]] + [x[1][i] for i in x[1]]).toDF(colnames)
# [['xuan', 'dongguang', 'china', 'guangdong', '28', '1992-10-08', 'aliba']]
return fdc_dataframe
fdc_data = rdd_to_df(hbase_rdd)
fdc_data.show()
fdc_data.cache()
fdc_data.createOrReplaceTempView("student")
output_table = "student_tmp"
output_conf = {"hbase.zookeeper.quorum": hosts, \
"hbase.mapreduce.outputtable": output_table, \
"mapreduce.output.format.class": "org.apache.hadoop.hbase.mapreduce.TableOutputFormat", \
"mapreduce.job.output,key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable", \
"mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable", \
"mapreduce.output.fileoutputformat.outputdir": "/tmp/hbase_output/"
}
keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"
valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter"
# 利用stack函数将行转换成行
# ( rowkey , [ row key , column family , column name , value ] )
#
hbase_split_df = spark.sql(
"""
select stack(2,`id`,`name`) as (`col`)from(
select concat(row_key,",info,id,",id) id, concat(row_key,",info,name,",name) name
from student)
"""
)
hbase_split_df.show(truncate=False)
hbase_split_rdd = hbase_split_df.rdd.map(lambda x: (x["col"])).map(lambda x: (x.split(',')[0], x.split(',')))
print(hbase_split_rdd.collect())
# saveAsNewAPIHadoopDataset写入hbase时报错
# 在https://mvnrepository.com/artifact/org.apache.spark/spark-examples?repo=typesafe-maven-releases下载的spark-examples_2.11-1.6.0-typesafe-001会提示java.lang.NoSuchMethodError: org.apache.hadoop.hbase.client.Put.add([B[B[B)Lorg/apache/hadoop/hbase/client/Put;
# 需要下载spark的源码重新编译打包,有兴趣的可以自行编译
# 参考:
# 1.https://download.csdn.net/download/sinat_37992109/11491308
# 2.https://blog.csdn.net/sinat_37992109/article/details/98735188
hbase_split_rdd.saveAsNewAPIHadoopDataset(conf=output_conf, keyConverter=keyConv, valueConverter=valueConv)
参考:
https://diogoalexandrefranco.github.io/interacting-with-hbase-from-pyspark/#spark-hbase-connector
https://zhuanlan.zhihu.com/p/70092198
版权声明:本文为qq_16755563原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。