文章目录
前言
非关系型数据数据库大量应用于数据缓存、消息队列等场景中,在大数据领域其非结构化特征与内存缓存数据的效果,适用于实时的数据分析,这里简单记录redis、hbase、kafka相关概念和使用
一、Hbase基础
HBase是一个分布式的、面向列的开源数据库,该技术来源于 Fay Chang 所撰写的Google论文“Bigtable:一个结构化数据的分布式存储系统”。就像Bigtable利用了Google文件系统(File System)所提供的分布式数据存储一样,HBase在Hadoop之上提供了类似于Bigtable的能力。HBase是Apache的Hadoop项目的子项目。HBase不同于一般的关系数据库,它是一个适合于非结构化数据存储的数据库。另一个不同的是HBase基于列的而不是基于行的模式。
特点:
分布式
基于hadoop存储
基于列储存
分布式的实时realtime
随机的Random
大数据持久性存储(读写数据)
1、Hbase数据模型
Hbase数据存储对象:namespace和table namespace相当于数据库,table为数据库表,但其中的列不固定
默认namespace为default。访问数据库要加上namespace名称如:
hbase:table1
hbase基于行存储,其table有特殊的结构:
行键/主键RowKey:
- 类似于MySQL主键(Primary Key)概念,唯一标记一行、作为主键索引
- 每张HBase表都有行健,行健这一列是HBase表创建以后自带的
列族ColumnFamily:
- 对除了Rowkey以外的列进行分组,将列划分不同的组中
- 任何一张HBase的表,都至少要有一个列族,除了Rowkey以外的任何一列,都必须属于某个列族,Rowkey不属于任何一个列族
- 分组:将拥有相似属性的列放入同一个列族【要读一起读,要写一起写】
- 设计原因:划分列族,读取数据时可以加快读取的性能
- 如果没有列族,没有划分班级教室:找一个人,告诉你这个人就在这栋楼
- 如果有了列族,划分了教室:找一个人,告诉你这个人在这栋楼某个房间
列Qualifier/Column:
- 与MySQL中的列是一样
- 除了rowkey以外的任何一列都必须属于某个列族,引用列的时候,必须加上列族的名称
- 如果有一个列族:
info
,如果info列族中有两列:name,age
,访问:info:name,info:age
多版本VERSIONS:HBase可以允许某一行的某一列存储多个版本的值的,默认每一列都只能存储1个版本。如果某个列有多个版本值,默认情况下查询,根据时间戳返回最新版本的值。
MySQL数据库与HBase数据库对比:
概念 | MySQL | Hbase |
---|---|---|
数据库 | DataBase | NameSpace |
数据表 | Table | Table【分布式的】 |
数据分区 | - | Region,表中数据一部分 |
数据行 | 数据【主键+其他列】 | Rowkey+数据【其他列】 |
列族 | - | ColumnFamily |
数据列 | 普通列与对应的值 | 列【timestamp】与对应的值【支持多版本】 |
2、Hbase集群
HMaster:主节点,管理节点
负责元数据的管理,比如namespace、table、列簇,region等
负责所有从节点的管理
HRegionServer:从节点,存储节点
- 负责管理每张表的分区数据:Region
- 对外提供Region的读写请求
Zookeeper:分布式协作框架
- HBase 分布式集群,依赖Zookeeper框架进行协作服务,存储元数据,实现高可用HA
HDFS:分布式文件系统
- HBase表中数据存储到HDFS文件中,文件称为HFile文件
Web UI:Apache HBase 1.x之前是60010,1.x开始更改为16010,CDH版本:一直使用60010
二、Hbase基础命令
Hbase shell
1.基础DDL操作
- 列举所有Namespace: list_namespace
- 列举某个NameSpace中的表: list_namespace_tables
- 创建NameSpace: create_namespace
- 删除NameSpace: drop_namespace
- 列举所有用户表: list
- 创建表: create
#表示在ns1的namespace中创建一张表t1,这张表有一个列族叫f1,这个列族中的所有列可以存储5个版本的值
create 'ns1:t1', {NAME => 'f1', VERSIONS => 5}
#在default的namespace中创建一张表t1,这张表有三个列族,f1,f2,f3,每个列族的属性都是默认的
create 't1', 'f1', 'f2', 'f3'
# 创建表,可以更改列族的属性
create 't1', {NAME => 'cf1'}, {NAME => 'cf2', VERSIONS => 3}
查看某个表的信息:desc
判断存在某个表是否存在:exists
表的禁用和启用:disable / enable
功能- HBase为了避免修改或者删除表,影响这张表正在对外提供读写服务
- 规定约束:修改或者删除表时,必须先禁用表,表示这张表暂时不能对外提供服务
- 如果是删除:禁用以后删除
- 如果是修改:先禁用,然后修改,修改完成以后启用
删除某个表:drop
2.基础DML操作
插入数据:每次对表中某一列族中某一列插入,指定rowkey。put时如果不存在,就插入,如果存在就更新。
HBase表数据:按照Rowkey构建字典有序排序规则:
- 先依据RowKey升序,再按照列簇CF升序,最后列名Column升序
底层存储也是KV结构:每一列就是一条KV数据
- K:Rowkey + 列族 + 列 + 时间【降序】
- V:值
没有更新和删除:通过插入来代替的,做了标记不再显示
# 表名+rowkey+列族+列+值
put 'ns:tbname', 'rowkey', 'cf:col', 'value'
put 'people', '1001', 'info:name', 'laoda'
put 'people', '1001', 'info:age', '25'
put 'people', '1001', 'info:gender', 'male'
put 'people', '1001', 'info:address', 'shanghai'
put 'people', '1002', 'info:name', 'laoer'
put 'people', '1002', 'info:address', 'beijing'
put 'people', '1003', 'info:name', 'laosan'
put 'people', '1003', 'info:age', '20'
-- 扫描表数据
scan "people"
读取数据:get
功能:读取某个Rowkey的数据优点:Get是HBase中查询数据最快的方式,并不是最常用的方式
缺点:get命令最多只能返回一个rowkey的数据,根据Rowkey进行检索数据
get 表名 rowkey [列族,列]
get 'ns:tbname','rowkey'
get 'ns:tbname','rowkey',[cf]
get 'ns:tbname','rowkey',[cf:col]
# 获取RowKey所有列数据
get 'people','1001'
# 获取指定列簇中所有列数据
get 'people','1001', 'info'
# 获取指定列名称数据
get 'people','1001', 'info:name'
- 按条件查询:scan
#读取整张表的所有数据,一般不用
scan 'tbname'
#根据条件查询:工作中主要使用的场景, 用到最多
scan 'tbname', {Filter}
scan 'people'
scan 'people', {LIMIT => 2}
# 前缀过滤器
scan 'people', {ROWPREFIXFILTER => '1001'}
scan 'people', {ROWPREFIXFILTER => '100'}
# STARTROW:从某个rowkey开始,包含,闭区间
# STOPROW:到某个rowkey结束,不包含,开区间
scan 'people', {STARTROW=>'1001'}
scan 'people', {STARTROW=>'1001', STOPROW=>'1003'}
- 统计:count
- 三种统计方
- mr程序,Rowcount统计rowkey个数
- count
- 协处理器,添加数据自动计数
count 'people'
- 删除:delete
#删除某列的数据
delete tbname, rowkey, cf:col
#删除某个rowkey数据
deleteall tbname, rowkey
#清空所有数据:生产环境不建议使用,建议删表重建
truncate tbname
# 删除某一列数据
delete 'people', '1001', 'info:address'
# 删除某一行row数据
deleteall 'people','1002'
# 清空表数据
truncate 'people'
3.HbaseApi操作
- DDL
public class HBaseDdlTest {
// 定义连接Connection变量
private Connection conn = null ;
@Before
public void open() throws Exception {
// 1-1. 设置连接属性
Configuration conf = HBaseConfiguration.create();
// 设置HBase依赖ZK集群地址
conf.set("hbase.zookeeper.quorum", "node1,node2,node3");
// 1-2. 使用工厂创建连接
conn = ConnectionFactory.createConnection(conf);
}
// todo: 创建命名空间
@Test
public void createNamespace() throws Exception{
// a. 获取Admin对象
HBaseAdmin admin = (HBaseAdmin) conn.getAdmin();
// b. 创建命名空间描述符对象,设置名称, todo: 使用建造者模型创建对象
NamespaceDescriptor descriptor = NamespaceDescriptor.create("hbase").build();
// c. 创建NS
admin.createNamespace(descriptor);
// d. 关闭资源
admin.close();
}
// todo: 创建表
@Test
public void createTable() throws Exception{
// a. 获取Admin对象
HBaseAdmin admin = (HBaseAdmin) conn.getAdmin();
// b. 判断表是否存在,如果存在,先删除
TableName tableName = TableName.valueOf("hbase:students");
if(admin.tableExists(tableName)){
// 先禁用表
admin.disableTable(tableName);
// 删除表
admin.deleteTable(tableName);
}
// c. 列簇信息
ColumnFamilyDescriptor basicFamily = ColumnFamilyDescriptorBuilder
.newBuilder(Bytes.toBytes("basic"))
.build() ;
ColumnFamilyDescriptor otherFamily = ColumnFamilyDescriptorBuilder
.newBuilder(Bytes.toBytes("other"))
.build() ;
// d. 表的描述符
TableDescriptor desc = TableDescriptorBuilder
.newBuilder(tableName)
.setColumnFamily(basicFamily)
.setColumnFamily(otherFamily)
.build();
// e. 创建表
admin.createTable(desc);
// f. 关闭资源
admin.close();
}
@After
public void close() throws Exception{
if(null != conn) conn.close();
}
}
- DML
public class HBaseDmlTest {
// 定义连接Connection变量
private Connection conn = null ;
@Before
public void open() throws Exception {
// 1-1. 设置连接属性
Configuration conf = HBaseConfiguration.create();
// 设置HBase依赖ZK集群地址
conf.set("hbase.zookeeper.quorum", "node1,node2,node3");
// 1-2. 使用工厂创建连接
conn = ConnectionFactory.createConnection(conf);
}
// 创建HBase Table表的对象,方便对表进行DML操作
public Table getHTable() throws Exception{
// TableName对象
TableName tableName = TableName.valueOf("hbase:students");
// 获取Table实例
Table table = conn.getTable(tableName);
// 返回实现
return table ;
}
// 测试Table对象获取
@Test
public void testTable() throws Exception{
System.out.println(getHTable());
}
// 插入数据put
@Test
public void testPut() throws Exception{
// a. 获取Table对象
Table table = getHTable();
// todo: b. 构建Put对象,封装每条数据, 必须指定RowKey,然后再添加列值, 一行数据就是一个Put对象
Put put = new Put(Bytes.toBytes("20220501_001")) ;
// 添加列
put.addColumn(Bytes.toBytes("basic"), Bytes.toBytes("name"), Bytes.toBytes("laoda")) ;
put.addColumn(Bytes.toBytes("basic"), Bytes.toBytes("age"), Bytes.toBytes("18")) ;
put.addColumn(Bytes.toBytes("other"), Bytes.toBytes("phone"), Bytes.toBytes("110")) ;
put.addColumn(Bytes.toBytes("other"), Bytes.toBytes("address"), Bytes.toBytes("beijing")) ;
// c. 插入数据
table.put(put);
// d. 关闭连接
table.close();
}
// 批量插入数据Put
@Test
public void testBatchPut() throws Exception {
// a. 获取Table对象
Table table = getHTable();
// b. TODO: 构建Put对象,表示1个RowKey数据,指定RowKey
List<Put> listPut = new ArrayList<>();
// 第1条数据
Put put1 = new Put(Bytes.toBytes("20220501_002")) ;
put1.addColumn(Bytes.toBytes("basic"), Bytes.toBytes("name"), Bytes.toBytes("laoer"));
put1.addColumn(Bytes.toBytes("basic"), Bytes.toBytes("age"), Bytes.toBytes("16"));
put1.addColumn(Bytes.toBytes("basic"), Bytes.toBytes("gender"), Bytes.toBytes("male"));
put1.addColumn(Bytes.toBytes("other"), Bytes.toBytes("phone"), Bytes.toBytes("120"));
put1.addColumn(Bytes.toBytes("other"), Bytes.toBytes("address"), Bytes.toBytes("shanghai"));
listPut.add(put1);
// 第2条数据
Put put2 = new Put(Bytes.toBytes("20220501_003")) ;
put2.addColumn(Bytes.toBytes("basic"), Bytes.toBytes("name"), Bytes.toBytes("laosan"));
put2.addColumn(Bytes.toBytes("basic"), Bytes.toBytes("age"), Bytes.toBytes("16"));
put2.addColumn(Bytes.toBytes("other"), Bytes.toBytes("address"), Bytes.toBytes("hangzhou"));
listPut.add(put2);
// 第3条数据
Put put3 = new Put(Bytes.toBytes("20220501_004")) ;
put3.addColumn(Bytes.toBytes("basic"), Bytes.toBytes("name"), Bytes.toBytes("laosi"));
put3.addColumn(Bytes.toBytes("basic"), Bytes.toBytes("age"), Bytes.toBytes("24"));
put3.addColumn(Bytes.toBytes("other"), Bytes.toBytes("job"), Bytes.toBytes("programmer"));
put3.addColumn(Bytes.toBytes("other"), Bytes.toBytes("address"), Bytes.toBytes("shanghai"));
listPut.add(put3);
// 第4条数据
Put put4 = new Put(Bytes.toBytes("20220501_005")) ;
put4.addColumn(Bytes.toBytes("basic"), Bytes.toBytes("name"), Bytes.toBytes("laoer"));
put4.addColumn(Bytes.toBytes("other"), Bytes.toBytes("job"), Bytes.toBytes("doctor"));
listPut.add(put4);
// c. 插入数据
table.put(listPut);
// d. 关闭连接
table.close();
}
// 依据RowKey查询数据
@Test
public void testGet() throws Exception{
// a. 获取Table对象
Table table = getHTable();
// b. 创建Get对象,设置RowKey,表示获取哪行数据
Get get = new Get(Bytes.toBytes("20220501_001")) ;
// todo: 设置指定列簇和列
get.addFamily(Bytes.toBytes("basic"));
get.addColumn(Bytes.toBytes("other"), Bytes.toBytes("address")) ;
// c. 查询数据,结果封装到Result对象中, todo: 表示HBase表中一行数据
Result result = table.get(get);
// d. 遍历获取每列数据
// 获取每行数据中所有列,每列数据封装在Cell对象中
Cell[] cells = result.rawCells(); // todo: HBase底层将每列数据进行封装 -> Cell 类中
for (Cell cell : cells) {
/*
20220501_001 basic age 18 1651478947786
*/
// 使用工具类CellUtil,获取每列Cell数据中各个字段值:cf、column、version、value
String rowKey = Bytes.toString(CellUtil.cloneRow(cell));
String family = Bytes.toString(CellUtil.cloneFamily(cell));
String column = Bytes.toString(CellUtil.cloneQualifier(cell));
String value = Bytes.toString(CellUtil.cloneValue(cell));
long version = cell.getTimestamp();
System.out.println(rowKey + "\tcolumn=" + family + ":" + column + ", timestamp=" + version +", value=" +value);
}
// e. 关闭连接
table.close();
}
// 依据RowKey删除数据
@Test
public void testDelete() throws Exception{
// a. 获取Table对象
Table table = getHTable();
// b. 创建Delete对象,设置RowKey
Delete delete = new Delete(Bytes.toBytes("20220501_004")) ;
// todo:添加删除列簇和列
delete.addFamily(Bytes.toBytes("other")) ;
delete.addColumn(Bytes.toBytes("basic"), Bytes.toBytes("age")) ;
// c. 执行删除
table.delete(delete);
// d. 关闭连接
table.close();
}
// 使用Scan扫描数据
@Test
public void testScan() throws Exception{
// a. 获取Table对象
Table table = getHTable();
// b. 创建Scan对象
Scan scan = new Scan() ;
// c. 扫描表的数据,TODO:ResultScanner 相当于一个集合,里面存储很多Result对象,表示HBase表中每条数据
ResultScanner resultScanner = table.getScanner(scan);
// d. 循环遍历,获取每条数据Result
for (Result result : resultScanner) {
// 获取RowKey
String rowKey = Bytes.toString(result.getRow());
System.out.println(rowKey);
// 获取每条数据中所有列
Cell[] cells = result.rawCells();
for (Cell cell : cells) {
// 使用工具类CellUtil,获取每列Cell数据中各个字段值:cf、column、version、value
String family = Bytes.toString(CellUtil.cloneFamily(cell));
String column = Bytes.toString(CellUtil.cloneQualifier(cell));
String value = Bytes.toString(CellUtil.cloneValue(cell));
long version = cell.getTimestamp();
System.out.println("\tcolumn=" + family + ":" + column + ", timestamp=" + version +", value=" +value);
}
}
// e. 关闭连接
table.close();
}
/*
- 需求1:查询2022年3月和4月的数据
- 需求2:查询2022年5月的所有数据
- 需求3:查询所有age = 24的数据
*/
// 需求1:查询2022年3月和4月的数据
@Test
public void testScanFilter_Range() throws Exception{
/*
scan 'people', {STARTROW=>'202203', STOPROW=>'202205'}
*/
// a. 获取Table对象
Table table = getHTable();
// b. 创建Scan对象
Scan scan = new Scan() ;
// TODO: 设置startRow和stopRow 范围
scan.withStartRow(Bytes.toBytes("202203"));
scan.withStopRow(Bytes.toBytes("202205")) ;
// c. 扫描表的数据,TODO:ResultScanner 相当于一个集合,里面存储很多Result对象,表示HBase表中每条数据
ResultScanner resultScanner = table.getScanner(scan);
// d. 循环遍历,获取每条数据Result
for (Result result : resultScanner) {
// 获取RowKey
String rowKey = Bytes.toString(result.getRow());
System.out.println(rowKey);
// 获取每条数据中所有列
Cell[] cells = result.rawCells();
for (Cell cell : cells) {
// 使用工具类CellUtil,获取每列Cell数据中各个字段值:cf、column、version、value
String family = Bytes.toString(CellUtil.cloneFamily(cell));
String column = Bytes.toString(CellUtil.cloneQualifier(cell));
String value = Bytes.toString(CellUtil.cloneValue(cell));
long version = cell.getTimestamp();
System.out.println("\tcolumn=" + family + ":" + column + ", timestamp=" + version +", value=" +value);
}
}
// e. 关闭连接
table.close();
}
// 需求2:查询2022年5月的所有数据
@Test
public void testScanFilter_Prefix() throws Exception{
/*
scan 'hbase:students', {ROWPREFIXFILTER => '202205'}
*/
// a. 获取Table对象
Table table = getHTable();
// b. 创建Scan对象
Scan scan = new Scan() ;
// todo: 添加前缀过滤器
PrefixFilter prefixFilter = new PrefixFilter(Bytes.toBytes("202205"));
scan.setFilter(prefixFilter) ;
// c. 扫描表的数据,TODO:ResultScanner 相当于一个集合,里面存储很多Result对象,表示HBase表中每条数据
ResultScanner resultScanner = table.getScanner(scan);
// d. 循环遍历,获取每条数据Result
for (Result result : resultScanner) {
// 获取RowKey
String rowKey = Bytes.toString(result.getRow());
System.out.println(rowKey);
// 获取每条数据中所有列
Cell[] cells = result.rawCells();
for (Cell cell : cells) {
// 使用工具类CellUtil,获取每列Cell数据中各个字段值:cf、column、version、value
String family = Bytes.toString(CellUtil.cloneFamily(cell));
String column = Bytes.toString(CellUtil.cloneQualifier(cell));
String value = Bytes.toString(CellUtil.cloneValue(cell));
long version = cell.getTimestamp();
System.out.println("\tcolumn=" + family + ":" + column + ", timestamp=" + version +", value=" +value);
}
}
// e. 关闭连接
table.close();
}
// 需求3:查询所有age = 24的数据
@Test
public void testScanFilter_SingleValue() throws Exception{
// a. 获取Table对象
Table table = getHTable();
// b. 创建Scan对象
Scan scan = new Scan() ;
// todo: 过滤器filter,过滤某个列的值
SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter(
Bytes.toBytes("basic"), Bytes.toBytes("age"), CompareOperator.EQUAL, Bytes.toBytes("24")
);
scan.setFilter(singleColumnValueFilter) ;
// c. 扫描表的数据,TODO:ResultScanner 相当于一个集合,里面存储很多Result对象,表示HBase表中每条数据
ResultScanner resultScanner = table.getScanner(scan);
// d. 循环遍历,获取每条数据Result
for (Result result : resultScanner) {
// 获取RowKey
String rowKey = Bytes.toString(result.getRow());
System.out.println(rowKey);
// 获取每条数据中所有列
Cell[] cells = result.rawCells();
for (Cell cell : cells) {
// 使用工具类CellUtil,获取每列Cell数据中各个字段值:cf、column、version、value
String family = Bytes.toString(CellUtil.cloneFamily(cell));
String column = Bytes.toString(CellUtil.cloneQualifier(cell));
String value = Bytes.toString(CellUtil.cloneValue(cell));
long version = cell.getTimestamp();
System.out.println("\tcolumn=" + family + ":" + column + ", timestamp=" + version +", value=" +value);
}
}
// e. 关闭连接
table.close();
}
@After
public void close() throws Exception{
if(null != conn) conn.close();
}
}
三、Hbase核心
1.储存原理
table、region、regionserver:
Table:是一个逻辑对象,物理上不存在,供用户实现逻辑操作,存储在元数据的一个概念
- 数据写入表以后的物理存储:分区Region
- 一张表会有多个分区Region,每个分区存储在不同的机器上
- 默认每张表只有1个Region分区
Region:Hbase中数据负载均衡的最小单元
- 类似于HDFS中Block,用于实现Hbase中分布式
- 就是分区的概念,每张表都可以划分为多个Region,实现分布式存储,默认一张表只有一个分区
- 每个Region由一台RegionServer所管理,Region存储在RegionServer
- 一台RegionServer可以管理多个Region
RegionServer:是一个物理对象,Hbase中的一个进程,管理一台机器的存储
类似于HDFS中DataNode
一个Regionserver可以管理多个Region
一个Region只能被一个RegionServer所管理
# Table:提供用户读写的逻辑概念
# Region:分区的概念
一张表可以划分为多个分区
每个分区都被某一台Regionserver所管理
# RegionServer:真正存储数据的物理概念
2.Region
- region划分原理:范围划分
# 1. 任何一个Region都会对应一个范围
如果只有一个Region,范围:-oo ~ +oo
# 2. 范围划分:从整个-oo ~ +oo区间上进行范围划分
# 3. 每个分区都会有一个范围:根据Rowkey属于哪个范围就写入哪个分区
[startKey,stopKey) -> 前闭后开区间
默认:一张表创建时,只有一个Region,范围:-oo ~ +oo
- 自定义划分
创建一张表,有2个分区Region
create 't3', 'f1', SPLITS => ['50']
分区范围
region0:-oo ~ 50
region1:50 ~ +oo
- 数据分配规则,根据rowkey
# 举个栗子:创建一张表,有4个分区Region,20,40,60
create 'hbase:t3', {SPLITS => [20, 40, 60]}
# 规则:前闭后开
region0:-oo ~ 20
region1:20 ~ 40
region2:40 ~ 60
region3:60 ~ +oo
# 写入数据的rowkey:
# 比较是按照ASCII码比较的,不是数值比较
# 比较规则:ASCII码逐位比较
A1234:region3
A -> ASCII:65
c6789:region3
c -> ASCII: 99
00000001:region0
2:region0
99999999:region3
3.Region内部原理
put tbname, rowkey, cf:col, value
# tbname:决定了这张表的数据最终要读写哪些分区
# rowkey:决定了具体读写哪个分区
# cf:决定具体写入哪个Store
region内部结构:
# Store:
对分区的数据进行划分,按照列族划分,一个列族对应一个Store
不同列族的数据写入不同的Store中,实现了按照列族将列进行分组
根据用户查询时指定的列族,可以快速的读取对应的store
# MemStore:
每个Store都有一个: 内存存储区域
数据写入memstore就直接返回
# StoreFile:
每个Store中可能有0个或者多个StoreFile文件
逻辑上:Store
物理上:HDFS,HFILE【二进制文件】
# 伴随写入时会产生WAL日志文件,伴随读取会产生blockcache缓存
4.Region数据溢写和合并、分裂
FLUSH
将内存memstore中的数据溢写到HDFS中变成磁盘文件storefile HFILE
- 关闭集群:自动Flush
- 参数配置:自动触发机制
自动触发机制:HBase 2.0之前参数:
#region的memstore的触发
#判断如果某个region中的某个memstore达到这个阈值,那么触发flush,flush这个region的所有memstore
hbase.hregion.memstore.flush.size=128M
#region的触发级别:如果没有memstore达到128,但是所有memstore的大小加在一起大于等于128*4
#触发整个region的flush
hbase.hregion.memstore.block.multiplier=4
#regionserver的触发级别:所有region所占用的memstore达到阈值,就会触发整个regionserver中memstore的溢写
#从memstore占用最多的Regin开始flush
hbase.regionserver.global.memstore.size=0.4 --RegionServer中Memstore的总大小
#低于水位后停止
hbase.regionserver.global.memstore.size.upper.limit=0.99
hbase.regionserver.global.memstore.size.lower.limit = 0.4*0.95 =0.38
自动触发机制:HBase 2.0之后:
#设置了一个flush的最小阈值
#memstore的判断发生了改变:max("hbase.hregion.memstore.flush.size / column_family_number",hbase.hregion.percolumnfamilyflush.size.lower.bound.min)
#如果memstore高于上面这个结果,就会被flush,如果低于这个值,就不flush,如果整个region所有的memstore都低于,全部flush
#水位线 = max(128 / 列族个数,16),列族一般给3个 ~ 42M
#如果memstore的空间大于42,就flush,如果小于就不flush;如果都小于,全部flush
举例:3个列族,3个memstore, 90/30/30 90会被Flush
举例:3个列族,3个memstore, 30/30/30 全部flush
hbase.hregion.percolumnfamilyflush.size.lower.bound.min=16M
# 2.x中多了一种机制:In-Memory-compact,如果开启了【不为none】,会在内存中对需要flush的数据进行合并
#合并后再进行flush,将多个小文件在内存中合并后再flush
hbase.hregion.compacting.memstore.type=None|basic|eager|adaptive
为了避免大量的Memstore将大量的数据同时Flush到HDFS上,占用大量的内存和磁盘的IO带宽,会影响业务,手动执行
# 将某个表中所有Region中MemStore数据flush到本地磁盘StroeFile
hbase> flush 'TABLENAME'
# 将某个Region中MemStoree数据flush到本地磁盘StoreFile
hbase> flush 'REGIONNAME'
# 将某个RegionServer中所有Region中MemStoree数据flush到本地磁盘StoreFile
hbase> flush 'REGION_SERVER_NAME'
Compaction
将零散的有序数据合并为整体有序大文件,提高对HDFS数据的查询性能
# 2.0版本之前,只有StoreFile文件的合并
磁盘中合并:minor compaction、major compaction
# 2.0版本开始,内存中的数据也可以先合并后Flush
内存中合并:In-memory compaction
磁盘中合并:minor compaction、major compaction
minor compaction:轻量级
功能:将最早生成的几个小的StoreFile文件进行合并,成为一个大文件,不定期触发
特点
- 只实现将多个小的StoreFile合并成一个相对较大的StoreFile,占用的资源不多
- 不会将标记为更新或者删除的数据进行处理
属性: hbase.hstore.compaction.min=3
major compaction:重量级合并
功能:将整个Store中所有StoreFile进行合并为一个StoreFile文件,整体有序的一个大文件
特点
- 将所有文件进行合并,构建整体有序
- 合并过程中会进行清理过期和标记为删除的数据
- 资源消耗比较大
参数配置: hbase.hregion.majorcompaction=7天
In-memory compaction
原理:将当前写入的数据划分segment【数据段】
- 当数据不断写入MemStore,划分不同的segment,最终变成storefile文件
如果开启内存合并,先将第一个segment放入一个队列中,与其他的segment进行合并
- 合并以后的结果再进行flush
内存中合并的方式: hbase.hregion.compacting.memstore.type=None|basic|eager|adaptive
- Basic compaction策略不清理多余的数据版本,无需对cell的内存进行考核
- eager compaction会过滤重复的数据,清理多余的版本,这会带来额外的开销
- adaptive compaction根据数据的重复情况来决定是否使用eager策略
禁用自动合并,使用手动处理:
Compact all regions in a table:
hbase> major_compact 't1'
hbase> major_compact 'ns1:t1'
Compact an entire region:
hbase> major_compact 'r1'
Compact a single column family within a region:
hbase> major_compact 'r1', 'c1'
Split
- 0.94之前,region大小10G;
- 0.94开始:如果region个数在0 ` 100之间,# min(10GB, 256 x region个数的3次方)
- 2.*之后:判断region个数是否为1,如果为1,就按照256分,如果不为1,就按照10GB来分
关闭自动分裂,手动执行
split 'tableName'
split 'namespace:tableName'
split 'regionName' # format: 'tableName,startKey,id'
split 'tableName', 'splitKey'
split 'regionName', 'splitKey'
5.Hbase读写流程
写流程
put t1, 1001, info:name, zhangsan依据表名称获取所有region信息
第1步、连接ZK集群,获取hbase:meta表Region所在RS地址信息
ZNODE: /hbase/meta-region-server
第2步、读取hbase:meta数据,依据表名过滤
获取操作表所有Region信息数据依据RowKey找到对应Region及所在RegionServer地址
rowkey 和 Region范围(startKey ~ endKey)找到region,获取RS地址信息发送put请求RS,写入数据
第1步、将put发送给对应RS
第2步、将数据写入到WAL预写日志
放置内存数据丢失,导致数据丢失
第3步、依据列簇找到Region中Store,将put数据写入到memStore中
读流程
get t1, 1001基于表名称获取所有Region信息
第1步、连接ZK集群,获取hbase:meta表Region所在RS地址信息
ZNODE: /hbase/meta-region-server
第2步、读取hbase:meta数据,依据表名过滤
获取操作表所有Region信息数据依据RowKey找到对应Region及所在RegionServer地址
rowkey 和 Region范围(startKey ~ endKey)找到region,获取RS地址信息发送get请求RS,读取数据
第1、【最新】读取memStore内存中数据
写缓存
第2、【最老】如果设置读缓存BlockCache,读取BlockCache中数据
读缓存
数据从StoreFile文件读取出来
第3、【其次】读取Store中StoreFile文件
hdfs文件,磁盘读取
最终,将上面三处读取数据合并,进行返回
6.Hbase优化
RowKey设计
唯一原则:Rowkey必须具有唯一性,不能重复,一个Rowkey唯一标识一条数据
业务原则:Rowkey的设计必须贴合业务的需求,一般选择最常用的查询条件作为rowkey的前缀
组合原则:将更多的经常作为的查询条件的列放入Rowkey中,可以满足更多的条件查询可以走索引查询
散列原则:为了避免出现热点问题,需要将数据的rowkey生成规则构建散列的rowkey
方案一:更换不是连续的字段作为前缀,例如用户id
方案二:反转,一般用于时间作为前缀,查询时候必须将数据反转再查询
方案三:加盐(Salt),本质对数据进行编码,生成数字加字母组合的结果
长度原则:在满足业务需求情况下,rowkey越短越好,一般建议Rowkey的长度小于100字节
- 原因:rowkey越长,比较性能越差,rowkey在底层的存储是冗余的
- 如果rowkey超过了100字节,将一个长的rowkey,编码为8位,16位,32位
内存优化
读多写少,降低MEMStore比例
读少写多,降低BlockCache比例
压缩机制
- 与hadoop支持的压缩算法相关
- create ‘t1_snappy’, {NAME=>‘info’, COMPRESSION => ‘SNAPPY’}
布隆过滤
- ROW:行级布隆过滤
- 生成StoreFile文件时,会将这个文件中有哪些Rowkey的数据记录在文件的头部
- 当读取StoreFile文件时,会从文件头部获取这个StoreFile中的所有rowkey,自动判断是否包含需要的rowkey,如果包含就读取这个文件,如果不包含就不读这个文件
- ROWCOL:行列级布隆过滤
- 生成StoreFile文件时,会将这个文件中有哪些Rowkey的以及对应的列族和列的信息数据记录在文件的头部
- 当读取StoreFile文件时,会从文件头部或者这个StoreFile中的所有rowkey以及列的信息,自动判断是否包含需要的rowkey以及列,如果包含就读取这个文件,如果不包含就不读这个文件
- ROW:行级布隆过滤
环境优化
- 系统环境
开启文件系统的预读缓存可以提高读取速度
最大限度使用物理内存
调整文件及进程句柄数 - HDFS
保证RPC调用会有较多的线程数
文件块大小的调整
文件句柄数
超时时间
避免DN错误宕机 - zookeeper
优化Zookeeper会话超时时间 - hbase
设置RPC监听数量
优化hbase客户端缓存
指定scan.next扫描HBase所获取的行数
- 系统环境
四、Hbase与SQL的集成
1、Hbase on hive
将HBase表中每一行对应的所有列构建一张完整的结构化表
- 如果这一行没有这一列,就补null
集成框架:
Hive:通过MapReduce来实现
Impala:基于内存分布式计算实现
Phoenix:通过HBase API封装实现的,底层协处理器
stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
with serdeproperties("hbase.columns.mapping" = ":key,info:code,info:money,info:pay_account,info:pay_channel,info:pay_comments,info:pay_name,info:pay_way,info:rec_account,info:rec_bank_name,info:rec_name,info:status,info:timestamp")
tblproperties("hbase.table.name" = "default:bank_records");
- Hive中的只是关联表,并没有数据,数据存储在HBase 表中
- 在Hive中创建HBase 的关联表,关联成功后,使用SQL通过MapReduce处理关联表
- 如果HBase 中表已存在,只能建外部表,使用:key来表示rowkey
- Hive中与HBase 关联的表,不能使用load写入数据,只能使用insert,通过MR读写数据
2、Phoenix
- Hive:SQL更加全面,但是不支持二级索引,底层通过分布式计算工具来实现
- Phoenix:SQL相对支持不全面,但是性能比较好,直接使用HbaseAPI,支持索引实现
注意:Phoenix中默认会将所有字符转换为大写,如果想要使用小写字母,必须加上双引号
注意:创建表时,必须指定主键作为Rowkey,主键列不能加列族
Phoenix 4.8版本之前,只要创建同名的Hbase表,会自动关联数据
Phoenix 4.8版本以后,不推荐关联表的方式
- 推荐使用视图关联的方式来实现,如果要使用关联表的方式,必须加上以下参数
- column_encoded_bytes=0 ;
Phoenix中建议使用视图view的方式来关联HBase中已有的表
通过构建关联视图,可以解决大部分数据查询的数据,不影响数据
视图:Hbase中已经有这张表,写都是操作Hbase,Phoenix只提供读
建表:对这张表既要读也要使用Phoenix来写
在实际项目中,先在HBase中创建表,再在Phoenix中创建视图;写入数据时,基于HBase 操作,查询数据使用Phoenix基于SQL操作。
!table: 列出表
upsert into values :更新数据
LIMIT 1000 OFFSET 100: limit 100 1000
DELETE FROM
1、插入更新Upsert
当主键不存在时,直接插入数据
当主键存在时,更新数据
2、删除数据DELETE
语法类似MySQL数据库
3、查询数据
SELECT 语句,简易查询(可以过滤,可以简单分组聚合)
3、Phoenix sql
- 表分区
CREATE TABLE IF NOT EXISTS ORDER_REGION(
ID varchar primary key,
INFO.STATUS varchar,
INFO.MONEY float,
INFO.PAY_WAY integer,
INFO.USER_ID varchar,
INFO.OPERATION_TIME varchar,
INFO.CATEGORY varchar
)
CONPRESSION='GZ'
SPLIT ON ('3','5','7');
- JDBC操作
public class PhoenixJdbcDemo {
public static void main(String[] args) throws Exception{
// 定义变量
Connection conn = null ;
PreparedStatement pstmt = null ;
ResultSet result = null ;
try{
// 1. 加载驱动类
Class.forName("org.apache.phoenix.jdbc.PhoenixDriver") ;
// 2. 获取连接
conn = DriverManager.getConnection("jdbc:phoenix:node1,node2,node3:2181") ;
// 3. 创建Statement对象
pstmt = conn.prepareStatement("SELECT USER_ID, PAY_WAY, CATEGORY FROM ORDER_REGION LIMIT 10");
// 4. 执行操作,此处查询
result = pstmt.executeQuery();
// 5. 获取数据
while (result.next()){
String userId = result.getString("USER_ID");
String payway = result.getString("PAY_WAY");
String category = result.getString("CATEGORY");
System.out.println(userId + ", " + payway + ", " + category);
}
}catch (Exception e){
e.printStackTrace();
}finally {
// 6. 关闭连接
if(null != result) result.close();
if(null != pstmt) pstmt.close();
if(null != conn) conn.close();
}
}
}
4、Phoenix 二级索引
全局索引功能:当为某一列创建全局索引时,Phoenix自动创建一张索引表,将创建索引的这一列加上原表的rowkey作为新的rowkey
CREATE INDEX globe_index ON table(column);
查询
- 先查询索引表:通过rowkey获取名称对应的id
- 再查询数据表:通过id查询对应的数据
特点:
- 默认只能对构建索引的字段做索引查询
- 如果查询中包含不是索引的字段或者条件不是索引字段,不走索引
应用:写少读多
- 当原表的数据发生更新操作提交时,会被拦截
- 先更新所有索引表,然后再更新原表
覆盖索引功能:在构建全局索引时,将经常作为查询条件或者结果的列放入索引表中,直接通过索引表来返回数据结果
CREATE INDEX overwrite_index ON table(column1) include(column12);
特点:
- 基于全局索引构建,将常用的查询结果放入索引表中,直接从索引表返回结果,不用再查询原表
应用:
- 适合于查询条件比较固定,数据量比较小的场景下
本地索引功能:将索引数据与对应的原始数据放在同一台机器,避免了跨网络传输,提高了写的性能
CREATE LOCAL INDEX LOCAL_IDX on table(column);
特点
- 即使查询数据中包含了非索引字段,也会走本地索引
- 本地索引会修改原始数据表
- 如果构建了本地索引,不能通过Hbase的API来读写数据的,必须通过Phoenix来实现读写
应用:
- 提高构建索引时对写的性能的影响
- 最终所有索引都是为了提高读的性能的
1、全局索引:GlobalIndex
创建索引index,创建一张表:索引表,单独存储索引数据
SELECT查询时,先到索引表查询,再到原数据表查询
2、覆盖索引
与全局索引相比较,基于全局索引之上的
第一点、创建索引index,创建一张表:索引表,单独存储索引数据
第二点、创建索引,将查询字段进行包含include,存储到索引表列中
第三点、SELECT查询时,仅仅查询索引表,获取数据就返回,不再查询原数据表,由于索引表中包含查询字段
3、本地索引: LoalIndex
创建索引index,不创建表,直接将索引数据写到原数据表中,增加数据量
原数据表有10亿条数据,创建本地索引后,此时增加10亿条索引数据 = 20亿数据
SELECT查询时,先到索引表(就是原数据表)查询,再到原数据表查询
总结
Hbase基础内容。
时光如水,人生逆旅矣。