HBase 2.x ---- HBase API
1. 环境准备
新建项目后在 pom.xml 中添加依赖:
注意:会报错 javax.el 包不存在,是一个测试用的依赖,不影响使用
<dependencies>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.4.11</version>
<exclusions>
<exclusion>
<groupId>org.glassfish</groupId>
<artifactId>javax.el</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.glassfish</groupId>
<artifactId>javax.el</artifactId>
<version>3.0.1-b06</version>
</dependency>
</dependencies>
2. 创建链接
根据官方 API 介绍,HBase 的客户端连接由 ConnectionFactory 类来创建(工厂模式),用户使用完成之后需要手动关闭连接。同时连接是一个重量级的,推荐使用一个进程使用一个连接,对 HBase 的命令通过连接中的两个属性 Admin 和 Table 来实现。
1. 单线程创建连接
package com.fickler;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
/**
* @author dell
* @version 1.0
*/
public class HBaseConnection {
public static void main(String[] args) throws IOException {
//1.创建配置对象
Configuration configuration = new Configuration();
//2.添加配置参数
configuration.set("hbase.zookeeper.quorum", "hadoop102,hadoop103,hadoop104");
//3.创建hbase的连接
//默认使用同步连接
Connection connection = ConnectionFactory.createConnection(configuration);
//可以使用异步连接
//主要影响后续的DML操作
CompletableFuture<AsyncConnection> asyncConnection = ConnectionFactory.createAsyncConnection(configuration);
//4.使用连接
System.out.println(connection);
//5.关闭连接
connection.close();
}
}
HBase 创建连接是一个重量级的连接,连接的时候时间花费较多,大约需要等待1min左右,出现下面的示图就代表连接成功了
2. 多线程创建连接
使用类单例模式,确保使用一个连接,可以同时用于多个线程。
package com.fickler;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
/**
* @author dell
* @version 1.0
*/
public class HBaseConnection {
//设置静态属性hbase连接
public static Connection connection = null;
static {
//创建hbase连接
try {
//使用配置文件方法
connection = ConnectionFactory.createConnection();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* 关闭连接方法,用于进程关闭时调用
* @throws IOException
*/
public static void closeConnection() throws IOException {
if (connection != null) {
connection.close();
}
}
public static void main(String[] args) throws IOException {
// //1.创建配置对象
// Configuration configuration = new Configuration();
// //2.添加配置参数
// configuration.set("hbase.zookeeper.quorum", "hadoop102,hadoop103,hadoop104");
// //3.创建hbase的连接
// //默认使用同步连接
// Connection connection = ConnectionFactory.createConnection(configuration);
// //可以使用异步连接
// //主要影响后续的DML操作
// CompletableFuture<AsyncConnection> asyncConnection = ConnectionFactory.createAsyncConnection(configuration);
// //4.使用连接
// System.out.println(connection);
// //5.关闭连接
// connection.close();
//直接使用创建好的连接
//不在main函数中创建连接
System.out.println(HBaseConnection.connection);
//在main函数线程的最后关闭连接
connection.close();
}
}
在 resources 文件夹中创建配置文件 hbase-site.xml,添加如下内容
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>hbase.zookeeper.quorum</name>
<value>hadoop102,hadoop103,hadoop104</value>
</property>
</configuration>
创建成功
3. DDL
创建 HBaseDDL 类,添加静态方法即可作为工具类
public class HBaseDDL {
//声明一个静态属性
public static Connection connection = HBaseConnection.connection;
}
1. 创建命名空间
/**
* 创建命名空间
* @param namespace 命名空间名称
*/
public static void createNamespace(String namespace) throws IOException {
//1.获取admin
//此处的异常先不要抛出,等待方法写完,再统一进行处理
//admin的连接是轻量级的,不是线程安全的,不推荐池化或者缓存这个连接
Admin admin = connection.getAdmin();
//2.调用方法创建命名空间
//代码相对shell更加底层,所以shell能够实现的功能,代码一定能实现
//所以需要填写完整的命名空间描述
//2.1创建命名空间描述建造者=>设计师
NamespaceDescriptor.Builder builder = NamespaceDescriptor.create(namespace);
//2.2给命名空间添加需求
builder.addConfiguration("user", "fickler");
//2.3使用builder构造出对应的添加完整参数的对象 完成创建
//创建命名空间出现的问题,都属于方法自身的问题,不应该抛出
try {
admin.createNamespace(builder.build());
} catch (IOException e) {
System.out.println("命名空间已经存在");
throw new RuntimeException(e);
}
//3.关闭admin
admin.close();
}
可以看到代码执行前和执行后,命名空间中多了一个 “fickler”
查看 “fickler” 命名空间的详细描述,可以看到我们设置的 kv 键值对
2. 判断表格是否存在
/**
* 判断表格是否存在
* @param namespace 命名空间名称
* @param tableName 表格名称
* @return
* @throws IOException
*/
public static boolean isTableExists(String namespace, String tableName) throws IOException {
//1.获取admin
Admin admin = connection.getAdmin();
//2.使用方法判断表格是否存在
boolean b = false;
try {
b = admin.tableExists(TableName.valueOf(namespace, tableName));
} catch (IOException e) {
throw new RuntimeException(e);
}
//3.关闭admin
admin.close();
//4.返回结果
return b;
}
3. 创建表
/**
* 创建表格
* @param namespace 命名空间名称
* @param tableName 表格名称
* @param columnFamilies 列族名称,可以有多个
*/
public static void createTable(String namespace, String tableName, String... columnFamilies) throws IOException {
//判断是否有至少一个列族
if (columnFamilies.length == 0) {
System.out.println("创建表格至少有一个列族");
return;
}
//判断表格是否存在
if (isTableExists(namespace, tableName)) {
System.out.println("表格已经存在");
return;
}
//1.获取admin
Admin admin = connection.getAdmin();
//2.调用方法创建表格
//2.1创建表格描述的建造者
TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf(namespace, tableName));
//2.2添加参数
for (String columnFamily : columnFamilies) {
//2.3创建列族描述的建造者
ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(columnFamily));
//2.4对应当前列族添加参数
//添加版本参数
columnFamilyDescriptorBuilder.setMaxVersions(5);
//2.5创建添加完参数的列族描述
tableDescriptorBuilder.setColumnFamily(columnFamilyDescriptorBuilder.build());
}
//2.6创建对应的表格描述
try {
admin.createTable(tableDescriptorBuilder.build());
} catch (IOException e) {
throw new RuntimeException(e);
}
admin.close();
}
查看表格的详细描述
4. 修改表
/**
* 修改表格中一个列族的版本
*
* @param namespace 命名空间名称
* @param tableName 表格名称
* @param columnFamily 列族名称
* @param version 版本
*/
public static void modifyTable(String namespace, String tableName, String columnFamily, int version) throws IOException {
//判断表格是否存在
if (!isTableExists(namespace, tableName)) {
System.out.println("表格不存在,无法修改");
return;
}
//1.获取admin
Admin admin = connection.getAdmin();
try {
//2.调用方法修改表格
//2.0获取之前的表格描述
TableDescriptor descriptor = admin.getDescriptor(TableName.valueOf(namespace, tableName));
//2.1创建一个表格描述建造者
//如果使用填写tableName的方法,相当于创建了一个新的表格描述建造者,没有之前的信息
//如果想要修改之前的信息,必须调用方法填写一个旧的表格描述
TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(descriptor);
//2.2对应建造者进行表格数据的修改
ColumnFamilyDescriptor columnFamily1 = descriptor.getColumnFamily(Bytes.toBytes(columnFamily));
//创建列族描述建造者
//需要填写旧的列族描述
ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder = ColumnFamilyDescriptorBuilder.newBuilder(columnFamily1);
//修改对应的版本
columnFamilyDescriptorBuilder.setMaxVersions(version);
//此处修改的时候,如果填写的新创建,那么别的参数会初始化
tableDescriptorBuilder.modifyColumnFamily(columnFamilyDescriptorBuilder.build());
admin.modifyTable(tableDescriptorBuilder.build());
} catch (IOException e) {
throw new RuntimeException(e);
}
//3.关闭admin
admin.close();
}
5. 删除表
/**
* 删除表格
*
* @param namespace 命名空间
* @param tableName 表格名称
* @return true 表示删除成功
*/
public static boolean deleteTable(String namespace, String tableName) throws IOException {
//1.判断表格是否存在
if (!isTableExists(namespace, tableName)) {
System.out.println("表格不存在,无法删除");
return false;
}
//2.获取admin
Admin admin = connection.getAdmin();
//3.调用相关的方法删除表格
try {
TableName tableName1 = TableName.valueOf(namespace, tableName);
admin.disableTable(tableName1);
admin.deleteTable(tableName1);
} catch (IOException e) {
throw new RuntimeException(e);
}
//4.关闭admin
admin.close();
return true;
}
这里的表格后续还会用到,这里就先不删除了…
4. DML
创建类 HBaseDML
public class HBaseDML {
//添加静态属性connection指向单例连接
public static Connection connection = HBaseConnection.connection;
}
1. 插入数据
/**
* 插入数据
* @param namespace 命名空间名称
* @param tableName 表格名称
* @param rowKey 主键
* @param columnFamily 列族名称
* @param columnName 列名
* @param value 值
*/
public static void putCell(String namespace, String tableName, String rowKey, String columnFamily, String columnName, String value) throws IOException {
//1.获取table
Table table = connection.getTable(TableName.valueOf(namespace, tableName));
//2.调用相关方法插入数据
//2.1创建put对象
Put put = new Put(Bytes.toBytes(rowKey));
//2.2给put对象添加数据
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnName), Bytes.toBytes(value));
//2.3将对象写入对应的方法
try {
table.put(put);
} catch (IOException e) {
throw new RuntimeException(e);
}
//3.关闭table
table.close();
}
对比代码执行前面表格中的内容可以看出,数据已经插入成功了
2. 读取数据
/**
* 读取数据
* @param namespace 命名空间名称
* @param tableName 表格名称
* @param rowKey 主键
* @param columnFamily 列族名称
* @param columnName 列名
*/
public static void getCell(String namespace, String tableName, String rowKey, String columnFamily, String columnName) throws IOException {
//1.获取table
Table table = connection.getTable(TableName.valueOf(namespace, tableName));
//2.创建get对象
Get get = new Get(Bytes.toBytes(rowKey));
//如果直接调用get方法读取数据,此时读一整行数据
//如果想读取某一列的数据,需要添加对象的参数
get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnName));
//设置读取数据的版本
get.readAllVersions();
try {
//读取数据,得到result对象
Result result = table.get(get);
//处理数据
Cell[] cells = result.rawCells();
//测试方法:直接把读取的数据打印到控制台
//如果是实际开发,需要再额外写方法,对应处理数据
for (Cell cell : cells) {
//cell存储数据比较底层
String value = new String(CellUtil.cloneValue(cell));
System.out.println(value);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
//关闭table
table.close();
}
可以看到控制台已经输出了结果,说明读取数据成功
3. 扫描数据
/**
* 扫描数据
* @param namespace 命名空间
* @param tableName 表格名称
* @param startRow 开始row,包含
* @param stopRow 结束row,不包含
*/
public static void scanRows(String namespace, String tableName, String startRow, String stopRow) throws IOException {
//1.获取table
Table table = connection.getTable(TableName.valueOf(namespace, tableName));
//2.创建scan对象
Scan scan = new Scan();
//如果此时直接调用,会直接扫描整张表
//添加参数,来控制扫描的数据
//默认包含
scan.withStartRow(Bytes.toBytes(startRow));
//默认不包含
scan.withStopRow(Bytes.toBytes(stopRow));
try {
//读取多行数据,获得scanner
ResultScanner scanner = table.getScanner(scan);
//result来记录一行数据,cell数组
//ResultScanner来记录多行数据,result的数组
for (Result result : scanner) {
Cell[] cells = result.rawCells();
for (Cell cell : cells) {
System.out.println(new String(CellUtil.cloneRow(cell)) + "-" + new String(CellUtil.cloneFamily(cell)) + "-" + new String(CellUtil.cloneQualifier(cell)) + "-" + new String(CellUtil.cloneValue(cell)) + "\t");
}
System.out.println();
}
} catch (IOException e) {
throw new RuntimeException(e);
}
//3.关闭table
table.close();
}
4. 带过滤扫描
/**
* 带过滤扫描
* @param namespace 命名空间名称
* @param tableName 表格名称
* @param startRow 开始row,包含
* @param stopRow 结束row,不包含
* @param columnFamily 列族名称
* @param columnName 列名
* @param value 值
* @throws IOException
*/
public static void filterScan(String namespace, String tableName, String startRow, String stopRow, String columnFamily, String columnName, String value) throws IOException {
//1.获取table
Table table = connection.getTable(TableName.valueOf(namespace, tableName));
//2.创建scan对象
Scan scan = new Scan();
//如果此时直接调用,会直接扫描整张表
//添加参数,来控制扫描的数据
//默认包含
scan.withStartRow(Bytes.toBytes(startRow));
//默认不包含
scan.withStopRow(Bytes.toBytes(stopRow));
//可以添加多个过滤
FilterList filterList = new FilterList();
//创建过滤器
//(1)结果只保留当前列的数据
ColumnValueFilter columnValueFilter = new ColumnValueFilter(Bytes.toBytes(columnFamily), Bytes.toBytes(columnName), CompareOperator.EQUAL, Bytes.toBytes(value));
//(2)结果保留整行数据
//结果同时会保留没有当前列的数据
SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter(Bytes.toBytes(columnFamily), Bytes.toBytes(columnName), CompareOperator.EQUAL, Bytes.toBytes(value));
//本身可以添加多个过滤器
filterList.addFilter(singleColumnValueFilter);
//添加过滤
scan.setFilter(filterList);
try {
//读取多行数据,获得scanner
ResultScanner scanner = table.getScanner(scan);
//result来记录一行数据,cell数组
//ResultScanner来记录多行数据,result的数组
for (Result result : scanner) {
Cell[] cells = result.rawCells();
for (Cell cell : cells) {
System.out.println(new String(CellUtil.cloneRow(cell)) + "-" + new String(CellUtil.cloneFamily(cell)) + "-" + new String(CellUtil.cloneQualifier(cell)) + "-" + new String(CellUtil.cloneValue(cell)) + "\t");
}
System.out.println();
}
} catch (IOException e) {
throw new RuntimeException(e);
}
//3.关闭table
table.close();
}
5. 删除数据
/**
* 删除数据
* @param nameSpace 命名空间名称
* @param tableName 表格名称
* @param rowKey 主键
* @param family
* @param column
* @throws IOException
*/
public static void deleteColumn(String nameSpace, String tableName, String rowKey, String family, String column) throws IOException {
//1.获取table
Table table = connection.getTable(TableName.valueOf(nameSpace, tableName));
//2.创建Delete对象
Delete delete = new Delete(Bytes.toBytes(rowKey));
//3.添加删除信息
//3.1删除单个版本
delete.addColumn(Bytes.toBytes(family), Bytes.toBytes(column));
//3.2删除所有版本
delete.addColumn(Bytes.toBytes(family), Bytes.toBytes(column));
//3.3删除列族
delete.addFamily(Bytes.toBytes(family));
//4.删除数据
table.delete(delete);
//5.关闭资源
table.close();
}
版权声明:本文为qq_52354698原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。