cdh6.2.0 pyspark2.3.0+读写hbase2.1.0的几种方式

1.使用Hive

先建立Hbase表,插入数据

create 'books', 'info', 'analytics'
put 'books', 'In Search of Lost Time', 'info:author', 'Marcel Proust'
put 'books', 'In Search of Lost Time', 'info:year', '1922'
put 'books', 'In Search of Lost Time', 'analytics:views', '3298'
put 'books', 'Godel, Escher, Bach', 'info:author', 'Douglas Hofstadter'
put 'books', 'Godel, Escher, Bach', 'info:year', '1979'
put 'books', 'Godel, Escher, Bach', 'analytics:views', '820'

再建立hive表:

CREATE EXTERNAL TABLE IF NOT EXISTS `default`.`books_ext` (
 `title` string,
 `author` string,
 `year` int,
 `views` double
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.hbase.HBaseSerDe'
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'    
WITH SERDEPROPERTIES (
 'hbase.columns.mapping'=':key,info:author,info:year,analytics:views'
)
TBLPROPERTIES (
 'hbase.mapred.output.outputtable'='books',
 'hbase.table.name'='books'
);

下载hive-hbase-handler-2.1.1-cdh6.2.0.jar,放到/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/spark/jars里

#! /usr/bin/python3.6
# -*- encoding:utf-8 -*-
"""
@author: xuan
@file:.py
@time:20-3-19 
"""
from pyspark.sql import SparkSession
import  os

if __name__ == "__main__":
   os.environ["PYSPARK_PYTHON"] = "/home/venv/bin/python3.6"
   warehouse_location = "/user/hive/warehouse/"
   spark = SparkSession.builder \
       .master("local[2]") \
       .appName("Hive2Hbase")\
       .enableHiveSupport() \
       .config("spark.sql.warehouse.dir", warehouse_location) \
       .getOrCreate()
   #设置打印warn及以上级别的日志
   spark.sparkContext.setLogLevel("WARN")
   '''
       执行扫描时,调整会话上的缓存参数有助于提高性能,例如:
           spark.sql('SET hbase.scan.cache=10000')
           spark.sql('SET hbase.client.scanner.cache=10000')
   '''
   '''
       当直接从配置单元使用book_ext时,我们可以通过explain看到配置单元将简单谓词优化为正确的HBase语句:
       其中title='something'成为HBase get。
       其中title>=“something”和title<“something”被正确转换为范围扫描
       例如,如果使用between运算符,则不是这种情况。
       虽然配置单元hbasstoragehandler能够理解并正确转换简单的查询谓词,但spark引擎并没有这么聪明:
         查询会立即转换为全表扫描,然后在Spark中进行过滤,这样Hive就无法优化和下推过滤器。Dataframe.explain()将显示Spark物理计划。
      后一点意味着,在对整个表执行操作(如全表扫描)时,从Spark通过Hive访问HBase是一个不错的选择。
   '''
   spark.sql("select* from default.books_ext where title='In Search of Lost Time'").show(truncate=False)
   spark.sql("select* from default.books_ext where title='In Search of Lost Time'").explain()
   #spark.table("default.books_ext").show(truncate=False)

   #先落地成Parquet文件再读取,速度会快一些
   # spark.sql("create table default.books_ext_parquet  stored as parquet as select * from default.books_ext")
   spark.sql("select * from default.books_ext").write.parquet("%s/books_ext_parquet"%warehouse_location,"overwrite")
   # spark.sql("select * from default.books_ext").write.format("parquet").mode("overwrite").save("%s/books_ext_parquet"%warehouse_location)
   spark.read.parquet("%s/books_ext_parquet"%warehouse_location).show()
   spark.stop()

2.使用hortonworks的开源框架shc:源码编译

软件准备:

  • shc源码包,根据自己环境的spark版本选择下载
    下载地址:https://github.com/hortonworks-spark/shc/releases
    在这里插入图片描述
    我下载的版本是v1.1.3-2.3-SystemTest,适用于spark2.3+
  • idea打开v1.1.3-2.3-SystemTest文件夹,修改pom.xml文件,下载相关的依赖
    1.v1.1.3-2.3-SystemTest/pom.xml,根据需要修改各版本
    <groupId>com.hortonworks</groupId>
    <artifactId>shc</artifactId>
    <version>spark-2.3.0-hbase-2.1.0</version>
    <packaging>pom</packaging>
    <name>HBase Spark Connector Project Parent POM</name>
    <url>https://github.com/hortonworks-spark/shc</url>
    
    <spark.version>2.3.1</spark.version>
    <hbase.version>2.1.0-cdh6.2.0</hbase.version>
    <phoenix.version>5.0.0-alpha-HBase-2.0</phoenix.version>
    <test_classpath_file>${project.build.directory}/spark-test-       classpath.txt</test_classpath_file>
    <java.version>1.8</java.version>
    <scala.version>2.11.12</scala.version>
    
    <repository>
     <id>cloudera-repo</id>
     <name>Internal Repository</name>
     <url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
     <releases>
       <enabled>true</enabled>
     </releases>
     <snapshots>
       <enabled>false</enabled>
     </snapshots>
    </repository>
    
    2.v1.1.3-2.3-SystemTest/core/pom.xml
    <parent>
     <groupId>com.hortonworks</groupId>
     <artifactId>shc</artifactId>
     <version>spark-2.3.0-hbase-2.1.0</version>
     <relativePath>../pom.xml</relativePath>
    </parent>
    
    <artifactId>shc-core</artifactId>
    <version>spark-2.3.0-hbase-2.1.0</version>
    <packaging>jar</packaging>
    <name>HBase Spark Connector Project Core</name>
    <!--加入repository-->
    <repository>
       <id>nexus-aliyun</id>
       <name>Nexus aliyun</name>
       <url>http://maven.aliyun.com/nexus/content/groups/public</url>
       <releases>
         <enabled>true</enabled>
       </releases>
       <snapshots>
         <enabled>false</enabled>
       </snapshots>
    </repository>
    <repository>
       <id>cloudera-repo</id>
       <name>Internal Repository</name>
       <url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
       <releases>
        <enabled>true</enabled>
       </releases>
       <snapshots>
         <enabled>false</enabled>
       </snapshots>
    </repository>
    <!--加入dependency json4s,否则运行时会报错java.lang.NoSuchMethodError: org.json4s.jackson.JsonMethods$.parse(Lorg/json4s/JsonInput;Z)Lorg/json4s/JsonAST$JValue;-->
    <dependency>
       <groupId>org.json4s</groupId>
       <artifactId>json4s-jackson_2.11</artifactId>
       <version>3.3.0</version>
    </dependency>
    
    3.v1.1.3-2.3-SystemTest/example/pom.xml
    <parent>
     <groupId>com.hortonworks</groupId>
     <artifactId>shc</artifactId>
     <version>spark-2.3.0-hbase-2.1.0</version>
     <relativePath>../pom.xml</relativePath>
    </parent>
    
    <artifactId>shc-examples</artifactId>
    <version>spark-2.3.0-hbase-2.1.0</version>
    <packaging>jar</packaging>
    <name>HBase Spark Connector Project Examples</name>
    
    在v1.1.3-2.3-SystemTest目录下运行mvn的clean install,打包成功后在/shc-1.1.3-2.3-SystemTest/core/target会生成shc-core-spark-2.3.0-hbase-2.1.0.jar,将此包放入/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/spark/jars里。若没时间自行编译可以在这里 shc-core-spark-2.3.0-hbase-2.1.0.jar下载已经编译好的包。

-代码编写
在idea edit Configurations设置环境变量:
PYTHONUNBUFFERED=1;HBASE_HOME=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/hbase;SPARK_HOME=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/spark;PYTHONPATH=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/spark/python

#! /usr/bin/python3.6
# -*- encoding:utf-8 -*-
"""
@author: xuan
@file:.py
@time:20-3-19 下午17:40
"""
from pyspark.sql import SparkSession
import os

if __name__ == "__main__":
    os.environ["PYSPARK_PYTHON"] = "/home/venv/bin/python3.6"
    warehouse_location = "/user/hive/warehouse/"
    spark = SparkSession.builder \
        .master("local[2]") \
        .appName("Spark-shc-Hbase")\
        .enableHiveSupport() \
        .config("spark.sql.warehouse.dir", warehouse_location) \
        .getOrCreate()
    #设置打印warn及以上级别的日志
    spark.sparkContext.setLogLevel("WARN")
    dep = "org.apache.spark.sql.execution.datasources.hbase"
    catalog = ''.join("""{
      "table":{"namespace":"default", "name":"student"},
      "rowkey":"key",
      "columns":{
      "row_key":{"cf":"rowkey", "col":"key", "type":"string"},
      "id":{"cf":"info", "col":"id", "type":"string"},
      "name":{"cf":"info", "col":"name", "type":"string"}
      }
      }""".split())
    catalog_output = ''.join("""{
      "table":{"namespace":"default", "name":"student_tmp"},
      "rowkey":"key",
      "columns":{
      "row_key":{"cf":"rowkey", "col":"key", "type":"string"},
      "id":{"cf":"info", "col":"id", "type":"string"},
      "name":{"cf":"info", "col":"name", "type":"string"}
      }
      }""".split())
    df_input = spark.read \
        .format(dep) \
        .options(catalog=catalog) \
        .load()
        
    df_input.where(df_input["row_key"] == "xx").show()
    # 写入hbase
    df_input.write \
        .format(dep) \
        .options(catalog=catalog_output, newTable="3") \
        .save()

3.Spark-HBase Connector

#! /usr/bin/python3.6
# -*- encoding:utf-8 -*-
"""
@author: xuan
@file:.py
@time:20-3-22 下午2:31
"""

from pyspark.sql import SparkSession
import os
if __name__ == "__main__":
    os.environ["PYSPARK_PYTHON"] = "/home/venv/bin/python3.6"
    warehouse_location = "/user/hive/warehouse/"
    spark = SparkSession.builder \
        .master("local[2]") \
        .appName("Spark-on-Hbase")\
        .enableHiveSupport() \
        .config("spark.sql.warehouse.dir", warehouse_location) \
        .getOrCreate()
    #设置打印warn及以上级别的日志
    spark.sparkContext.setLogLevel("WARN")

    data_source_format = 'org.apache.hadoop.hbase.spark'
    df_input = spark.read.format(data_source_format) \
        .option('hbase.table', 'student') \
        .option('hbase.config.resources', 'file:///etc/hbase/conf/hbase-site.xml') \
        .option('hbase.columns.mapping', \
                'rowkey STRING :key, \
                 id STRING info:id, \
                 name STRING info:name') \
        .option('hbase.use.hbase.context', False) \
        .option('hbase-push.down.column.filter', False) \
        .load()
    df_input.show()
    # 用Spark-HBase Connector方法读取的dataframe加上过滤条件会报错,目前本人没有解决这个问题。
    # : java.lang.NoSuchMethodError: org.apache.hadoop.hbase.util.ByteStringer.wrap([B)Lcom/google/protobuf/ByteString;
    df_input.where(df_input["rowkey"] == "xx").show()

    df_input.write.format(data_source_format) \
        .option('hbase.table', 'student_tmp') \
        .option('hbase.config.resources', 'file:///etc/hbase/conf/hbase-site.xml') \
        .option('hbase.columns.mapping', \
                'rowkey STRING :key, \
                 id STRING info:id, \
                 name STRING info:name') \
        .option('hbase.use.hbase.context', False) \
        .option('hbase-push.down.column.filter', False) \
        .save()

4.newAPIHadoopRDD saveAsNewAPIHadoopDataset

#! /usr/bin/python3.6
# -*- encoding:utf-8 -*-
"""
@author: xuan
@file:.py
@time:20-3-22 下午2:41
"""

from pyspark.sql import SparkSession
import  os

if __name__ == "__main__":
    os.environ["PYSPARK_PYTHON"] = "/home/venv/bin/python3.6"
    warehouse_location = "/user/hive/warehouse/"
    spark = SparkSession.builder \
        .master("local[2]") \
        .appName("Spark-APIHadoopRDD-Hbase") \
        .enableHiveSupport() \
        .config("spark.sql.warehouse.dir", warehouse_location) \
        .getOrCreate()
    # 设置打印warn及以上级别的日志
    spark.sparkContext.setLogLevel("WARN")

    hosts = "192.168.5.128,192.168.5.129,192.168.5.130"
    input_table = "student"
    input_conf = {"hbase.zookeeper.quorum": hosts, \
                  "hbase.mapreduce.inputtable": input_table
                  }
    keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"
    valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"

    hbase_rdd = spark.sparkContext.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat",
                                                   "org.apache.hadoop.hbase.io.ImmutableBytesWritable",
                                                   "org.apache.hadoop.hbase.client.Result", keyConverter=keyConv,
                                                   valueConverter=valueConv, conf=input_conf)
    hbase_rdd.cache()
    output = hbase_rdd.collect()
    for (k, v) in output:
        print(k, v)

    '''
        rdd转换为dataframe
        参考:1.https://blog.csdn.net/hchzhao_1985/article/details/82717949?depth_1-utm_source=distribute.pc_relevant.none-task&utm_source=distribute.pc_relevant.none-task
              2.https://blog.csdn.net/u011412768/article/details/93404921
    '''
    import json
    def call_transfor(y1):
        y2 = [json.loads(i) for i in y1]
        fdc = {}
        for i in y2:
            colname = i['qualifier']
            value = i['value']
            fdc[colname] = value
        return fdc

    def rdd_to_df(hbase_rdd):
        fdc_split = hbase_rdd.map(lambda x: (x[0], x[1].split('\n')))
        # [('xuan', ['{"qualifier" : "city", "timestamp" : "1584196930408", "columnFamily" : "address", "row" : "xuan", "type" : "Put", "value" : "dongguang"}', '{"qualifier" : "contry", "timestamp" : "1584196930408", "columnFamily" : "address", "row" : "xuan", "type" : "Put", "value" : "china"}', '{"qualifier" : "province", "timestamp" : "1584196930408", "columnFamily" : "address", "row" : "xuan", "type" : "Put", "value" : "guangdong"}', '{"qualifier" : "age", "timestamp" : "1584196930408", "columnFamily" : "info", "row" : "xuan", "type" : "Put", "value" : "28"}', '{"qualifier" : "birthday", "timestamp" : "1584196930408", "columnFamily" : "info", "row" : "xuan", "type" : "Put", "value" : "1992-10-18"}', '{"qualifier" : "company", "timestamp" : "1584196930408", "columnFamily" : "info", "row" : "xuan", "type" : "Put", "value" : "aliba"}'])]
        # print(fdc_split.collect())
        # 提取列名和取值
        fdc_cols = fdc_split.map(lambda x: (x[0], call_transfor(x[1])))
        # [('xuan', {'city': 'dongguang', 'contry': 'china', 'province': 'guangdong', 'age': '28', 'birthday': '1992-10-18', 'company': 'aliba'})]
        # print(fdc_cols.collect())
        # print(fdc_cols.map(lambda x: [i for i in x[1]]).take(1)[0])
        colnames = ['row_key'] + fdc_cols.map(lambda x: [i for i in x[1]]).take(1)[0]
        fdc_dataframe = fdc_cols.map(lambda x: [x[0]] + [x[1][i] for i in x[1]]).toDF(colnames)
        # [['xuan', 'dongguang', 'china', 'guangdong', '28', '1992-10-08', 'aliba']]
        return fdc_dataframe

    fdc_data = rdd_to_df(hbase_rdd)
    fdc_data.show()
    fdc_data.cache()
    fdc_data.createOrReplaceTempView("student")

    output_table = "student_tmp"
    output_conf = {"hbase.zookeeper.quorum": hosts, \
                   "hbase.mapreduce.outputtable": output_table, \
                   "mapreduce.output.format.class": "org.apache.hadoop.hbase.mapreduce.TableOutputFormat", \
                   "mapreduce.job.output,key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable", \
                   "mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable", \
                   "mapreduce.output.fileoutputformat.outputdir": "/tmp/hbase_output/"
                   }

    keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"
    valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter"
    # 利用stack函数将行转换成行
    # ( rowkey , [ row key , column family , column name , value ] )
    #
    hbase_split_df = spark.sql(
        """
            select stack(2,`id`,`name`) as (`col`)from(
            select concat(row_key,",info,id,",id) id, concat(row_key,",info,name,",name) name
            from student)
        """
    )
    hbase_split_df.show(truncate=False)
    hbase_split_rdd = hbase_split_df.rdd.map(lambda x: (x["col"])).map(lambda x: (x.split(',')[0], x.split(',')))
    print(hbase_split_rdd.collect())
    # saveAsNewAPIHadoopDataset写入hbase时报错
    # 在https://mvnrepository.com/artifact/org.apache.spark/spark-examples?repo=typesafe-maven-releases下载的spark-examples_2.11-1.6.0-typesafe-001会提示java.lang.NoSuchMethodError: org.apache.hadoop.hbase.client.Put.add([B[B[B)Lorg/apache/hadoop/hbase/client/Put;
    # 需要下载spark的源码重新编译打包,有兴趣的可以自行编译
    # 参考:
    # 1.https://download.csdn.net/download/sinat_37992109/11491308
    # 2.https://blog.csdn.net/sinat_37992109/article/details/98735188
    hbase_split_rdd.saveAsNewAPIHadoopDataset(conf=output_conf, keyConverter=keyConv, valueConverter=valueConv)

参考:
https://diogoalexandrefranco.github.io/interacting-with-hbase-from-pyspark/#spark-hbase-connector
https://zhuanlan.zhihu.com/p/70092198


版权声明:本文为qq_16755563原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。