使用MapReduce将HBASE表中的数据导入到HDFS

目标:将HBase中student表中的数据,通过MR迁移到HDFS的hdfs://hadoop112:9000/user/hadoop/out1路径下。

1. 添加Maven依赖

 <dependencies>
 
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>1.3.1</version>
        </dependency>
 
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>1.3.1</version>
        </dependency>
 
    </dependencies>

2. 查看HBase的MapReduce任务的执行

[hadoop@hadoop112 hbase-1.3.1]$ bin/hbase mapredcp
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/module/hbase-1.3.1/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/module/hadoop-2.7.2/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
/opt/module/hbase-1.3.1/lib/zookeeper-3.4.6.jar:/opt/module/hbase-1.3.1/lib/guava-12.0.1.jar:/opt/module/hbase-1.3.1/lib/metrics-core-2.2.0.jar:/opt/module/hbase-1.3.1/lib/protobuf-java-2.5.0.jar:/opt/module/hbase-1.3.1/lib/hbase-common-1.3.1.jar:/opt/module/hbase-1.3.1/lib/hbase-protocol-1.3.1.jar:/opt/module/hbase-1.3.1/lib/htrace-core-3.1.0-incubating.jar:/opt/module/hbase-1.3.1/lib/hbase-client-1.3.1.jar:/opt/module/hbase-1.3.1/lib/hbase-hadoop-compat-1.3.1.jar:/opt/module/hbase-1.3.1/lib/netty-all-4.0.23.Final.jar:/opt/module/hbase-1.3.1/lib/hbase-server-1.3.1.jar:/opt/module/hbase-1.3.1/lib/hbase-prefix-tree-1.3.1.jar

3. 环境变量的导入

(1)执行环境变量的导入(临时生效,在命令行执行下述操作)

[hadoop@hadoop112 hbase-1.3.1]$  export HBASE_HOME=/opt/module/hbase-1.3.1
[hadoop@hadoop112 hbase-1.3.1]$  export HADOOP_HOME=/opt/module/hadoop-2.7.2
[hadoop@hadoop112 hbase-1.3.1]$  export HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`

(2)永久生效:在/etc/profile配置

[hadoop@hadoop112 hbase-1.3.1]$  export HBASE_HOME=/opt/module/hbase-1.3.1
[hadoop@hadoop112 hbase-1.3.1]$  export HADOOP_HOME=/opt/module/hadoop-2.7.2

并在hadoop-env.sh中配置:(注意:在for循环之后配)

export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/opt/module/hbase/lib/*

4. 构建ReadStudentHDFSMapper于读取HBASE表中的数据

package com.fczheng.mr2;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;

import java.io.IOException;

/**
 * @author fczheng
 * @create 2019-08-16 20:42
 */
public class ReadStudentHDFSMapper extends TableMapper<Text,Text> {

    private Text k = new Text();
    private Text v = new Text();
    private StringBuilder sb = new StringBuilder();

    @Override
    protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {

        //设置rowkey
        k.set(Bytes.toString(key.get()));

        //sb设置零
        sb.setLength(0);
        Cell[] cells = value.rawCells();
        for (Cell cell : cells) {
            sb.append(Bytes.toString(CellUtil.cloneValue(cell))).append("\t");
        }
        v.set(sb.toString());
        context.write(k,v);
    }
}

5. 构建ReadStudentHDFSDriver将读取的数据写入到HDFS

package com.fczheng.mr2;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * @author fczheng
 * @create 2019-08-16 20:51
 */
public class ReadStudentHDFSDriver implements Tool {

    Configuration conf;
    @Override
    public int run(String[] args) throws Exception {

        Job job = Job.getInstance(conf, "MR2");
        job.setJarByClass(ReadStudentHDFSDriver.class);
        TableMapReduceUtil.initTableMapperJob(
                "student",
                new Scan(),
                ReadStudentHDFSMapper.class,
                Text.class,
                Text.class,
                job
        );

        job.setNumReduceTasks(0);
        FileOutputFormat.setOutputPath(job,new Path("hdfs://hadoop112:9000/user/hadoop/out1"));

        boolean res = job.waitForCompletion(true);

        return res ? 0 : 1;
    }

    @Override
    public void setConf(Configuration conf) {
        this.conf = conf ;
    }

    @Override
    public Configuration getConf() {
        return conf;
    }

    public static void main(String[] args) throws Exception {
        int status = ToolRunner.run(HBaseConfiguration.create(), new ReadStudentHDFSDriver(), args);
        System.exit(status);
    }
}

6. 打包运行任务

[hadoop@hadoop112 jar]$ yarn jar hbase-plugin-1.0-SNAPSHOT.jar com.fczheng.mr2.ReadStudentHDFSDriver

提示:运行任务前,HDFS路径应该不存在,如果存在会报错。


版权声明:本文为fczheng原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。