import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.geotools.data.*;
import org.geotools.factory.Hints;
import org.geotools.feature.simple.SimpleFeatureBuilder;
import org.geotools.filter.identity.FeatureIdImpl;
import org.geotools.filter.text.cql2.CQLException;
import org.geotools.filter.text.ecql.ECQL;
import org.locationtech.geomesa.hbase.data.HBaseDataStore;
import org.locationtech.geomesa.hbase.data.HBaseDataStoreFactory;
import org.locationtech.geomesa.hbase.data.HBaseFeatureWriter;
import org.locationtech.geomesa.index.metadata.GeoMesaMetadata;
import org.opengis.feature.Property;
import org.opengis.feature.simple.SimpleFeature;
import org.opengis.feature.simple.SimpleFeatureType;
import org.opengis.feature.type.Name;
import org.opengis.filter.Filter;
import org.opengis.filter.sort.SortBy;
import java.io.IOException;
import java.util.*;
/**
* @Author: gh
* @Description: 通过geomesa操作hbase数据。
* 1.新增:通过geomesa封装并存储数据到HBase。
* //geomesa默认主键名称是__fid__
* SimpleFeature封装的数据中,主键是feature ID(__fid__),不是数据自身的ID(gid,fid等)。
* 2.删除:根据rowkey删除数条记录。
* 3.更新:更新除geom以外的字段。
* 和新增使用同一个方法。
* 4.查询:解析出所有字段并返回;分页;模糊查询。
* 条件查询、模糊查询
*/
public class GeomesaHBaseImpl extends HBaseDaoImpl{
public GeomesaHBaseImpl(DataBaseDTO dto){
super(dto);
}
/**
* 精确查询、模糊查询
* @param tableName 表名
* @param qualifier 字段名
* @param fieldValue 字段值
* @param pageSize 分页大小
* @param pageNum 当前页码
* @return
*/
public Map<String,Object> queryByRandomField(String tableName, String qualifier,String fieldValue,
Integer pageSize,Integer pageNum) {
Query query = null;
DataStore ds = createDataStore(tableName);
if(ds != null){
//设置查询条件
try {
query = new Query(getTypeNames(ds)[0],
ECQL.toFilter(qualifier+" LIKE '%"+fieldValue+"%'"));
} catch (CQLException e) {
e.printStackTrace();
}
}
return queryFeatureDatas(ds, query,pageSize,pageNum);
}
/**
* 统计一个表中的记录总条数
* 【注意:不能直接调用父类中的totalCount方法,统计结果不对!】
* @param tableName 表名
* @return 总条数
*/
public long totalCount2(String tableName) {
long count = 0;
try {
//不设置查询条件
String indexTable = getXZ3IndexTable(tableName);
//使用协处理器统计(任何一张)索引表才有结果。
count = super.totalCount(indexTable);
} catch (Exception e) {
e.printStackTrace();
}
return count;
/* DataStore ds = createDataStore(tableName);
Query query = new Query(getTypeNames(ds)[0]);
return getTotalCountByQuery(ds,query);*/
}
/**
* 获取模糊查询
* @param tableName 表名
* @param qualifier 字段名
* @param fieldValue 字段值
* @return 模糊查询的结果总数
*/
public long totalCountOfFuzzyQuery(String tableName,String qualifier,String fieldValue) {
long count = 0;
Query query = null;
DataStore ds = createDataStore(tableName);
if(ds != null){
//设置查询条件
try {
query = new Query(getTypeNames(ds)[0],
ECQL.toFilter(qualifier+" LIKE '%"+fieldValue+"%'"));
} catch (CQLException e) {
e.printStackTrace();
}
count = getTotalCountByQuery(ds, query);
}
return count;
}
/**
* 分页查询。
* @param tableName 表名
* @param pageSize 每页的大小
* @param pageNum 当前页的页数
*/
public Map<String,Object> getAllRowsByPage(String tableName, Integer pageSize, Integer pageNum) {
//不设置查询条件
DataStore ds = createDataStore(tableName);
Query query = null;
if(ds != null){
query = new Query(getTypeNames(ds)[0]);
}
return queryFeatureDatas(ds, query,pageSize,pageNum);
}
/**
* 获取指定命名空间(数据库)下所有的表的名称。
* (排除元数据表)
* 源码中createSchema时命名表:GeoMesaFeatureIndex
* .formatTableName(ds.config.catalog, GeoMesaFeatureIndex.tableSuffix(this, partition), sft)
* @param nameSpace
* @return
*/
@Override
public List<String> getTablenamesOfDB(String nameSpace) {
//过滤掉了disable的表
List<String> fullTableNames = getFullTablenamesOfDB(nameSpace);
List<String> filteredTableNames = new ArrayList<>();
for (String fullName : fullTableNames) {
List<String> cfs = getColumnFamilies(fullName);
//catalog表的CF=m,索引表的CF=d
if(cfs.contains("m")){
fullName = fullName.contains(":") ? fullName.split(":")[1] : fullName;
filteredTableNames.add(fullName);
}
/* DataStore ds = createDataStore(fullName);
//当fullname是索引表时,datastore=null,此时报错:NoSuchColumnFamilyException/RemoteWithExtrasException。
if(ds != null){
fullName = fullName.contains(":") ? fullName.split(":")[1] : fullName;
filteredTableNames.add(fullName);
}*/
}
return filteredTableNames;
}
/**
* 获取某张表的字段名称和字段类型。
* @param tableName 表的名称
* @return map:key字段名称,value字段类型的class字符串。
*/
@Override
public Map<String, String> getColumnNames(String tableName) {
DataStore dataStore = createDataStore(tableName);
return getColumnNames(dataStore);
}
/**
* 新增/更新一条数据。
* 【注意:新增的数据中不包括主键属性RowKey!主键的设置为后台自动生成!】
* 【注意:更新的数据中要包括主键属性RowKey!】
* @param tableName 表名
* @param qualifiers 字段名
* @param values 字段值
* @return 插入的数据条数
*/
public int insert(String tableName, String []qualifiers, String []values) {
DataStore ds = createDataStore(tableName);
if(ds != null){
SimpleFeatureType sft = getSimpleFeatureType(ds);
//组织数据
if(qualifiers!=null && values!=null){
int len1 = qualifiers.length;
int len2 = values.length;
if(len1==len2){
//查询表中的字段,确定主键
/*Query query = new Query(getTypeNames(ds)[0]);
Map<String, Object> queryMap = queryFeatureDatas(ds, query, 1, 1);
String pk = queryMap.get("pk").toString();*/
//封装数据
List<Map<String,Object>> datas = new ArrayList<>();
Map<String,Object> map = new HashMap<>();
for(int i=0;i<len1;i++){
map.put(qualifiers[i], values[i]);
}
datas.add(map);
List<SimpleFeature> simpleFeatures = dataToSimpleFeatures(sft,datas);
return writeFeatures(ds, sft, simpleFeatures);
}
}
}
return 0;
}
/**
* 根据指定的条件,删除一条或多条满足条件的数据。
* 【注意:一般使用主键进行精确删除!!】
* @param tableName
* @param fieldValues 字段名称和值
* @return 删除的数据条数
*/
public int deleteRecords(String tableName, Map<String,Object> fieldValues) {
int count = 0;
DataStore ds = createDataStore(tableName);
if(ds == null){
return count;
}
if(fieldValues != null){
List<Query> queries = new ArrayList<>();
//获取全称的表名
tableName = super.getFullTableName(tableName);
//根据字段名称和值,构建查询条件
Set<Map.Entry<String, Object>> entries = fieldValues.entrySet();
for (Map.Entry<String, Object> entry : entries) {
String field = entry.getKey();
Object value = entry.getValue();
try {
Filter filter = ECQL.toFilter(field + " = '" + value.toString() + "'");
queries.add(new Query(tableName, filter));
} catch (CQLException e) {
e.printStackTrace();
}
}
//根据查询条件,查询对应的features
List<SimpleFeature> simpleFeatures = queryFeatures(ds, queries);
//删除指定的features记录
String typeName = getTypeNames(ds)[0];
count = removeFeatures(ds, typeName, simpleFeatures);
}
return count;
}
/**
* 查询某个命名空间下的若干个表的总计大小。
* @param nameSpace 命名空间
* @param tables 表的名称
* @param unit 大小单位
* @return 总计打下。
*/
@Override
public double storeSizeOfTbl(String nameSpace,String[] tables, SizeUnitEnum unit) {
boolean flag = isNamespaceExist(nameSpace);
List<String> allGivenTables = new ArrayList<>();
if(flag && tables != null){
for (String table : tables) {
//搜索出指定tables相关的元数据表
List<String> associatedMetaTables = getAssociatedMetaTables(table);
allGivenTables.addAll(associatedMetaTables);
}
}
String[] tableNamesArray = allGivenTables.toArray(new String[allGivenTables.size()]);
return super.storeSizeOfTbl(nameSpace,tableNamesArray, unit);
}
private boolean checkTableEnabled(Connection conn, String tableName){
try {
String fullTableName = super.getFullTableName(tableName);
TableName tn = TableName.valueOf(fullTableName);
return conn.getAdmin().isTableEnabled(tn);
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 删除若干条feature数据。
* @param datastore
* @param typeName feature type name
* @param features 封装的需要删除的数据列表
* @return 删除的条数
*/
private int removeFeatures(DataStore datastore,String typeName,List<SimpleFeature> features){
int count = 0;
if(datastore == null){
return count;
}
try (FeatureWriter<SimpleFeatureType, SimpleFeature> writer =
datastore.getFeatureWriter(typeName, Transaction.AUTO_COMMIT)) {
HBaseFeatureWriter hbaseWriter = (HBaseFeatureWriter)writer;
for (SimpleFeature feature : features) {
hbaseWriter.removeFeature(feature);
/* 以下方式行不通,数据并没有被删除:
SimpleFeature next = writer.next();
next = feature;
writer.remove();*/
//next.setAttributes(feature.getAttributes());
count+=1;
}
}catch(IOException e){
System.out.println(e.getMessage());
}catch (NullPointerException e){
System.out.println(e.getMessage());
}
System.out.println("Delete "+count+" features successfully!");
return count;
}
/**
* 写入feature数据
* @param datastore
* @param sft
* @param features 封装的数据列表
* @return 写入的条数
*/
private int writeFeatures(DataStore datastore,SimpleFeatureType sft,List<SimpleFeature> features){
int count = 0;
if(datastore == null){
return count;
}
try (FeatureWriter<SimpleFeatureType, SimpleFeature> writer =
datastore.getFeatureWriterAppend(sft.getTypeName(), Transaction.AUTO_COMMIT)) {
for (SimpleFeature feature : features) {
// 使用geotools writer,获取一个feature,然后修改并提交。
// appending writers will always return 'false' for haveNext, so we don't need to bother checking
SimpleFeature toWrite = writer.next();
// copy attributes:
List<Object> attributes = feature.getAttributes();
toWrite.setAttributes(attributes);
// 如果设置了 feature ID, 需要转换为实现类,并添加USE_PROVIDED_FID hint到user data
String featureId = feature.getID();
((FeatureIdImpl) toWrite.getIdentifier()).setID(featureId);
//toWrite.getUserData().put(Hints.USE_PROVIDED_FID, Boolean.TRUE);
// 或者可直接使用 PROVIDED_FID hint
toWrite.getUserData().put(Hints.PROVIDED_FID, featureId);
//如果没有设置feature ID, 会自动生成一个UUID
toWrite.getUserData().putAll(feature.getUserData());
// write the feature
writer.write();
count += 1;
}
}catch(IOException e){
System.out.println(e.getMessage());
}catch (NullPointerException e){
System.out.println(e.getMessage());
}
System.out.println("Write "+count+" features successfully!");
return count;
}
/**
* 将数据封装到SimpleFeature,一行数据就是一个SimpleFeature。
* @param sft feature类型信息
* @param datas 多行数据值。map中的key是字段名,value是字段值。
* 【注意数据中不包括主键!】
* @return 多行数据组成的SimpleFeature对象列表。
*/
private List<SimpleFeature> dataToSimpleFeatures(SimpleFeatureType sft,
List<Map<String,Object>> datas) {
List<SimpleFeature> features = new ArrayList<>();
//查看user data设置
/*Map<Object, Object> userDatas = sft.getUserData();
Boolean idProvided = (Boolean)userDatas.get(Hints.USE_PROVIDED_FID);
idProvided = idProvided == null?Boolean.FALSE:idProvided;*/
//使用geotools SimpleFeatureBuilder创建特征features
SimpleFeatureBuilder builder = new SimpleFeatureBuilder(sft);
for (Map<String, Object> rowMap : datas) {
Set<String> fieldNames = rowMap.keySet();
//设置feature id
String featureId = null;
if(fieldNames.contains(pk)){
featureId = rowMap.get(pk).toString();
fieldNames.remove(pk);
}else{
//新增时,自动生成feature id。
featureId = UUID.randomUUID().toString();
}
SimpleFeature feature = null;
for (String fieldName : fieldNames) {
Object fieldValue = rowMap.get(fieldName);
//写入数据
builder.set(fieldName, fieldValue);
//确定是否需要自己提供ID
/* if(idProvided){
//使用自己提供的ID
featureId = UUID.randomUUID().toString();
}*/
}
try {
// 告知geotools,我们要使用自己提供的ID
builder.featureUserData(Hints.USE_PROVIDED_FID, Boolean.TRUE);
// build the feature - this also resets the feature builder for the next entry
// 一律使用自己提供的ID!
feature = builder.buildFeature(featureId);
features.add(feature);
} catch (Exception e) {
System.out.println("Invalid SimpleFeature data: " + e.toString());
}
}
return Collections.unmodifiableList(features);
}
private Map<String, String> buildParams(String tableName) throws ParseException {
System.out.println("Start building parameters...");
String zk = dataBaseDTO.getIp()+":"+dataBaseDTO.getHost();
String[]args = new String[]{"--hbase.zookeepers", zk,"--hbase.catalog",tableName};
DataAccessFactory.Param[] infos = new HBaseDataStoreFactory().getParametersInfo();
Options options = createOptions(infos);
CommandLine command = CommandLineDataStore.parseArgs(getClass(), options, args);
return CommandLineDataStore.getDataStoreParams(command, options);
}
/**
* 构建ECQL语句,根据字段类型来拼接ECQL。
* @param qualifier 字段名
* @param fieldValue 字段值
* @return ECQL语句
*/
private String buildEcqlPredicate(String tableName,String qualifier,String fieldValue){
//获取全称的表名
tableName = super.getFullTableName(tableName);
Map<String, String> columns = getColumnNames(tableName);
String type = "String";
//strToLowerCase(STATE_NAME)
Set<String> keys = columns.keySet();
//确定给定的字段是什么类型
for (String key : keys) {
if(key.equals(qualifier)){
type = columns.get(key);
}
}
//判断如何拼接
if(type.contains("String")){
}
return null;
}
private DataStore createDataStore(String tableName){
DataStore ds = null;
//获取全称的表名
tableName = super.getFullTableName(tableName);
if(checkTableEnabled(getConn(),tableName)){
try {
Map<String, String> params = buildParams(tableName);
System.out.println("Loading datastore...");
// use geotools service loading to get a datastore instance
//当表禁用时datastore为null
ds = DataStoreFinder.getDataStore(params);
} catch (ParseException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
return ds;
}
private Options createOptions(DataAccessFactory.Param[] parameters) {
// parse the data store parameters from the command line
Options options = CommandLineDataStore.createOptions(parameters);
return options;
}
/**
* 获取某个表中的字段信息,包括:字段名、字段类型
* @param dataStore 数据存储
* @return 字段信息
*/
private Map<String, String> getColumnNames(DataStore dataStore) {
Map<String, String> columnNames = new HashMap<>();
if(dataStore == null){
return columnNames;
}
String typeName = getTypeNames(dataStore)[0];
if(StringUtils.isEmpty(typeName)){
throw new RuntimeException("feature type name should not be empty! ");
}
Query query = new Query(typeName);
try (FeatureReader<SimpleFeatureType, SimpleFeature> reader =
dataStore.getFeatureReader(query, Transaction.AUTO_COMMIT)) {
// loop through all results
if(reader.hasNext()){
//开始收集结果,读取一行的数据
SimpleFeature feature = reader.next();
//封装一行的数据
Collection<Property> properties = feature.getProperties();
for (Property p : properties) {
//获取字段类型
Class<?> binding = p.getType().getBinding();
String pType = binding.getName();
//获取字段名称
String pName = p.getName().toString();
columnNames.put(pName, pType);
}
}
}catch(IOException e){
System.out.println(e.getMessage());
}catch (NullPointerException e){
System.out.println(e.getMessage());
}
return columnNames;
}
private String[] getTypeNames(DataStore ds){
String[] typeNames=new String[1];
try {
if(ds != null){
String[] types = ds.getTypeNames();
if(types != null && types.length > 0){
typeNames = types;
}
}
} catch (IOException e) {
e.printStackTrace();
}
return typeNames;
}
private SimpleFeatureType getSimpleFeatureType(DataStore ds){
SimpleFeatureType sft = null;
try {
if(ds != null){
String typeName = getTypeNames(ds)[0];
sft = ds.getSchema(typeName);
}
} catch (IOException e) {
e.printStackTrace();
}
return sft;
}
private Map<Object, Object> getUserData(DataStore ds) {
SimpleFeatureType sft = getSimpleFeatureType(ds);
if(sft != null){
return sft.getUserData();
}else{
return new HashMap<>();
}
}
private long getTotalCountByQuery(DataStore datastore,Query query){
long count = 0;
if(datastore == null){
return count;
}
try (FeatureReader<SimpleFeatureType, SimpleFeature> reader =
datastore.getFeatureReader(query, Transaction.AUTO_COMMIT)) {
while(reader.hasNext()){
//开始收集结果,读取一行的数据
reader.next();
count+=1;
}
}catch(IOException e){
System.out.println(e.getMessage());
}catch (NullPointerException e){
System.out.println(e.getMessage());
}
return count;
}
private String getTableNameKey(String name,int version){
//e.g."table.$name.v$version"
return new StringBuilder("table")
.append(".").append(name)
.append(".").append("v").append(version)
.toString();
}
/**
* 获取与给定表相关的元数据表。
* @param tableName
* @return 表的名称列表
*/
private List<String> getAssociatedMetaTables(String tableName){
List<String> tables = new ArrayList<>();
//获取全称的表名
tableName = super.getFullTableName(tableName);
tables.add(tableName);
DataStore ds = createDataStore(tableName);
if(ds != null){
//转为HBaseDataStore,获取元数据
HBaseDataStore hds = (HBaseDataStore)ds;
GeoMesaMetadata<String> metadata = hds.metadata();
//获取user data中的索引信息
Map<Object, Object> userData = getUserData(ds);
Object o = userData.get("geomesa.indices");
if(o != null){
//e.g."xz2:1:3,id:1:3"
String indices = o.toString();
String[] splits = indices.split(",");
for (String split : splits) {
String[] oneIndiceInfo = split.split(":");
if(oneIndiceInfo.length>=3){
String tableNameKey = getTableNameKey(oneIndiceInfo[0], Integer.valueOf(oneIndiceInfo[1]));
String metaTableName = metadata.read(getTypeNames(ds)[0], tableNameKey, true).get();
if(!StringUtils.isBlank(metaTableName)){
tables.add(metaTableName);
}
}
}
}
}
return tables;
}
/**
* 获取指定表的某个索引表的名称
* @param tableName hbase表的全称:namespace:table
* 空间索引(Z2/XZ2)、时间索引(Z3/XZ3)、ID索引、属性索引。
* @return 索引表的名称
*/
private String getXZ3IndexTable(String tableName) throws NoSuchElementException{
//索引表//获取全称的表名
String indexTable = super.getFullTableName(tableName);
DataStore ds = createDataStore(tableName);
if(ds != null){
//转为HBaseDataStore,获取元数据
HBaseDataStore hds = (HBaseDataStore)ds;
GeoMesaMetadata<String> metadata = hds.metadata();
//获取user data中的索引信息
Map<Object, Object> userData = getUserData(ds);
Object o = userData.get("geomesa.indices");
if(o != null){
//e.g."xz2:1:3,id:1:3"
String indices = o.toString();
String[] splits = indices.split(",");
Map<String,String> indexMap = new HashMap<>();
for (String split : splits) {
String[] oneIndiceInfo = split.split(":");
String indexName = oneIndiceInfo[0];
String indexVersion = oneIndiceInfo[1];
if(oneIndiceInfo.length>=3){
String tableNameKey = getTableNameKey(indexName, Integer.valueOf(indexVersion));
String metaTableName = metadata.read(getTypeNames(ds)[0], tableNameKey, true).get();
indexMap.put(indexName, metaTableName);
}
}
//取出索引表
if(indexMap.get("xz3")!=null){
indexTable = indexMap.remove("xz3");
}else if(indexMap.get("z3")!=null){
indexTable = indexMap.remove("z3");
}else if(indexMap.get("xz2")!=null){
indexTable = indexMap.remove("xz2");
}else if(indexMap.get("z2")!=null){
indexTable = indexMap.remove("z2");
}else if(indexMap.get("id")!=null){
indexTable = indexMap.remove("id");
}else{
if(indexMap.size()>0){
indexTable = indexMap.values().iterator().next();
}
}
}
}
return indexTable;
}
private List<String> getColumnFamilies(String tableName){
List<String> list = new ArrayList<>();
Admin admin = null;
try {
admin = conn.getAdmin();
//使用全称的表
tableName = super.getFullTableName(tableName);
HTableDescriptor htd = admin.getTableDescriptor(TableName.valueOf(tableName));
HColumnDescriptor[] hcdArray = htd.getColumnFamilies();
for (HColumnDescriptor hcd : hcdArray) {
list.add(hcd.getNameAsString());
}
} catch (Exception e) {
e.printStackTrace();
}finally{
if(admin != null){
try {
admin.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return list;
}
/**
* 获取命名空间下全称的表名,格式为 namespace:tablename
* @param nameSpace 命名空间名称,相当于数据库
* @return 表的全称的列表
*/
private List<String> getFullTablenamesOfDB(String nameSpace) {
List<String> list = new ArrayList<>();
Admin admin = null;
try {
admin = conn.getAdmin();
TableName[] tableNames = admin.listTableNamesByNamespace(nameSpace);
for (TableName name : tableNames) {
//判断table是不是enable的 //过滤系统表
if(checkTableEnabled(admin,name) && !name.isSystemTable()){
String tableName = name.getNameAsString();
list.add(tableName);
}
}
} catch (Exception e) {
e.printStackTrace();
}finally{
if(admin != null){
try {
admin.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return list;
}
/**
* 查询数据:精确查询、模糊查询、分页查询。
* 字段名称:property.getName() 或者 property.getType().getName()
* 字段值: feature.getAttribute(property.getName()) 或者 property.getValue()
* @param dataStore
* @param query 查询条件
* @throws IOException
*/
private Map<String,Object> queryFeatureDatas(DataStore dataStore, Query query,
Integer pageSize, Integer pageNum){
if (pageSize == null || pageSize < 1) {
pageSize = 10;
}
if (pageNum == null || pageNum < 1) {
pageNum = 1;
}
//封装结果数据
Map<String,Object> rowDatas = new HashMap<>();
List<Map<String,String>> datas = new ArrayList<>();
Set<String> fields = new HashSet<>();
if(dataStore == null){
rowDatas.put("data", datas);
rowDatas.put("fields", fields);
rowDatas.put("pk", pk);
return rowDatas;
}
try (FeatureReader<SimpleFeatureType, SimpleFeature> reader =
dataStore.getFeatureReader(query, Transaction.AUTO_COMMIT)) {
// loop through all results
long totalLoopTimes = pageSize * pageNum;
long startFrom = pageSize*(pageNum-1)+1;
for(long count=1;count<=totalLoopTimes;count++){
if(reader.hasNext()){
//开始收集结果,读取一行的数据。feature:ScalaSimpleFeature
SimpleFeature feature = reader.next();
if(count >= startFrom){
//每行数据的主键值
String id = feature.getID();
//封装一行的数据
Map<String, String> oneRowData = new HashMap<>();
//先把唯一的feature ID装进去
oneRowData.put(pk, id);
Collection<Property> properties = feature.getProperties();
for (Property p : properties) {
//property: AttributeImpl
//AttributeImpl attrImpl = (AttributeImpl)p;
//获取字段名称
Name name = p.getName();
String pName = name.toString();
//获取字段值
Object attrObj = feature.getAttribute(name);
String pValue = attrObj==null?null:attrObj.toString();
oneRowData.put(pName, pValue);
// use geotools data utilities to get a printable string
// System.out.println(String.format("%02d", n) + " " + DataUtilities.encodeFeature(feature));
}
fields.addAll(oneRowData.keySet());
datas.add(oneRowData);
}
}
}
}catch(IOException e){
System.out.println(e.getMessage());
}catch (NullPointerException e){
System.out.println(e.getMessage());
}
rowDatas.put("data", datas);
rowDatas.put("fields", fields);
rowDatas.put("pk", pk);
return rowDatas;
}
/**
* 根据指定条件查询SimpleFeature对象。
* @param dataStore
* @param queries (多个)查询条件
* @return SimpleFeature对象的列表
*/
private List<SimpleFeature> queryFeatures(DataStore dataStore, List<Query> queries) {
List<SimpleFeature> results = new ArrayList<>();
if(dataStore == null){
return results;
}
for (Query query : queries) {
System.out.println("Running query " + ECQL.toCQL(query.getFilter()));
if (query.getPropertyNames() != null) {
System.out.println("Returning attributes " + Arrays.asList(query.getPropertyNames()));
}
if (query.getSortBy() != null) {
SortBy sort = query.getSortBy()[0];
System.out.println("Sorting by " + sort.getPropertyName() + " " + sort.getSortOrder());
}
// 提交查询,获取匹配查询条件的features,并遍历。
// try语句确保reader可以关闭。
try (FeatureReader<SimpleFeatureType, SimpleFeature> reader =
dataStore.getFeatureReader(query, Transaction.AUTO_COMMIT)) {
long n = 0;
while (reader.hasNext()) {
SimpleFeature feature = reader.next();
results.add(feature);
n+=1;
}
System.out.println("Returned " + n + " total queried features");
}catch(IOException e){
System.out.println(e.getMessage());
}catch (NullPointerException e){
System.out.println(e.getMessage());
}
}
return results;
}
}
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.client.coprocessor.AggregationClient;
import org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter;
import org.apache.hadoop.hbase.coprocessor.AggregateImplementation;
import org.apache.hadoop.hbase.filter.*;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.ClusterMetrics;
import org.apache.zookeeper.KeeperException;
import java.io.IOException;
import java.util.*;
/**
* @Author: gh
* @Description: HBase表格数据查询,删除一条记录,新增一条记录,查询所有表格名称.
*/
public class HBaseDaoImpl implements HBaseDao {
Connection conn = null;
DataBaseDTO dataBaseDTO = null;
public final String pk = "RowKey";
//static Configuration conf = null;
public HBaseDaoImpl(DataBaseDTO dto) {
try {
dataBaseDTO = dto;
conn = connectToHBase();
} catch (Exception e) {
e.printStackTrace();
}
}
public Connection getConn() {
return conn;
}
public void setConn(Connection conn) {
this.conn = conn;
}
/**
* 关闭连接
*/
public void close(){
try{
if(this.conn != null){
conn.close();
}
}catch(IOException e){
e.printStackTrace();
}
}
public boolean connected(){
if(getConn() == null){
return false;
}
boolean tmp = isNamespaceExist(this.dataBaseDTO.getDbName());
return !getConn().isClosed() && tmp;
}
/**
* 安全通信:kerberos
* zookeeper节点信息。
*
* @return true表示连接成功,false表示连接失败
* @throws IOException
*/
private org.apache.hadoop.hbase.client.Connection connectToHBase() {
try {
if (conn == null) {
if (dataBaseDTO != null) {
Configuration conf = HBaseConfiguration.create();
String ip = dataBaseDTO.getIp();
String port = dataBaseDTO.getHost();
//NOTE1:如果ip不存在,会报错并retry30次:
// org.apache.zookeeper.ClientCnxn - Session 0x0 for server null,
// unexpected error, closing socket connection and attempting reconnect
//java.net.ConnectException: Connection refused: no further information
//org.apache.zookeeper.ClientCnxnSocketNIO - Ignoring exception during shutdown input
//java.nio.channels.ClosedChannelException: null
//NOTE2:如果ip存在,但不是zk的地址,会报错并retry:
// java.io.IOException: org.apache.zookeeper.KeeperException$NoNodeException:
// KeeperErrorCode = NoNode for /hbase/master
//TODO: 用户名、密码做校验
conf.set("hbase.zookeeper.quorum", ip+":"+port);
//conf.set("hbase.zookeeper.quorum", "zyb1:2181,zyb2:2181,zyb9:2181");
conf.setInt("zookeeper.recovery.retry", 0);
conf.setInt("zookeeper.session.timeout",30000);
conn = ConnectionFactory.createConnection(conf);
}
}
}catch(Exception e){
e.printStackTrace();
}finally {
return conn;
}
}
/**
* 获取表名全称: namespace:tableName
* @param tableName
* @return
*/
protected String getFullTableName(String tableName){
if(this.dataBaseDTO != null){
String dbName = this.dataBaseDTO.getDbName();
if(!StringUtil.isEmpty(dbName)&&!StringUtil.isEmpty(tableName)){
//表名中不包含:
if(tableName.indexOf(":")<0){
return new StringBuffer(dbName).append(":").append(tableName).toString();
}
}
}
return tableName;
}
public boolean isNamespaceExist(String nameSpace){
boolean flag = false;
Admin admin = null;
try {
if(!StringUtils.isEmpty(nameSpace)){
admin = conn.getAdmin();
//直接使用admin.getNamespaceDescriptor,可能会报错:
// org.apache.hadoop.hbase.NamespaceNotFoundException
NamespaceDescriptor[] nsds = admin.listNamespaceDescriptors();
for (NamespaceDescriptor nsd : nsds) {
if(nsd.getName().equals(nameSpace)){
flag = true;
}
}
}
} catch (IOException e) {
e.printStackTrace();
}finally{
if(admin != null){
try {
admin.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return flag;
}
@Override
public void createTable(String tableName, String[] columnFamilys) {
Admin admin = null;
try {
admin = conn.getAdmin();
//拼接表名
tableName = getFullTableName(tableName);
TableName tblName = TableName.valueOf(tableName);
if (admin.tableExists(tblName)) {
System.err.println("此表已存在!");
} else {
HTableDescriptor htd = new HTableDescriptor(tblName);
for (String columnFamily : columnFamilys) {
HColumnDescriptor hcd = new HColumnDescriptor(columnFamily);
htd.addFamily(hcd);
}
admin.createTable(htd);
System.err.println("建表成功!");
}
} catch (MasterNotRunningException e) {
e.printStackTrace();
} catch (ZooKeeperConnectionException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}finally{
if(admin != null){
try {
admin.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
@Override
public int insert(String tableName, String rowKey, String family, String quailifer[], String value[]) {
Table table = null;
int rowCount = 0; //0 for nothing
try {
//拼接表名
tableName = getFullTableName(tableName);
TableName tblName = TableName.valueOf(tableName);
if(checkTableEnabled(conn.getAdmin(),tblName)){
table = conn.getTable(tblName);
Put put = new Put(Bytes.toBytes(rowKey));
// 批量添加
for (int i = 0; i < quailifer.length; i++) {
String col = quailifer[i];
String val = value[i];
put.addColumn(Bytes.toBytes(family), Bytes.toBytes(col), Bytes.toBytes(val));
}
table.put(put);
rowCount = 1; // row count of DML
return rowCount;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if(table != null){
table.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
return rowCount;
}
@Override
public List<Result> getAllRows(String tableName, Integer pageSize, Integer pageNum) {
if (pageSize == null || pageSize < 1) {
pageSize = 10;
}
if (pageNum == null || pageNum < 1) {
pageNum = 1;
}
Table table = null;
List<Result> list = new ArrayList<>();
//拼接表名
tableName = getFullTableName(tableName);
try {
TableName tblName = TableName.valueOf(tableName);
if(checkTableEnabled(conn.getAdmin(),tblName)){
table = conn.getTable(tblName);
//分页:设置每页的最大容量;获取startrow,第1页的startrow=null。
ResultScanner scanner = getPageScanner(table, pageNum, pageSize);
//返回的条数
int resultSize = pageSize;
for (Result rs : scanner) {
if(resultSize>0){
list.add(rs);
resultSize = resultSize -1;
}
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if(table != null){
table.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
return list;
}
/**
* 分页过滤
* 【注意:】该过滤器并不能保证返回的结果行数小于等于指定的页面行数,
* 因为过滤器是分别作用到各个region server的,
* 它只能保证当前region返回的结果行数不超过指定页面行数。
* @param table 表对象
* @param pageNum 当前页码
* @param pageSize 当前页大小
* @return
* @throws Exception
*/
private ResultScanner getPageScanner(Table table, int pageNum, int pageSize) throws Exception {
Scan scan = new Scan();
//第1页时,不需要设置startrow;其他页时,需要设置startrow
Filter pageFilter = new PageFilter(pageSize);
scan.setFilter(pageFilter);
ResultScanner scanner = table.getScanner(scan);
if (pageNum > 1) {
for (int i = 1; i < pageNum; i++) {
byte[] startRow = new byte[]{};
for (Result result : scanner) {
startRow = result.getRow();
}
startRow = Bytes.add(startRow, new byte[]{0});
scan.setStartRow(startRow);
scanner = table.getScanner(scan);
}
}
return scanner;
}
/**
* 统计hbase中表的行数。方法:
* 1.统计ResultScanner。效率和性能低下,不可取。
* 2.使用协处理器中的“存储过程”--endpoint。
* 3.Mapreduce任务。
* @param tableName 表的名称
* @return row的总行数
*/
@Override
public long totalCount(String tableName) {
//拼接表名
tableName = getFullTableName(tableName);
return totalCount(tableName,new Scan());
}
/**
* 统计hbase中表的行数。
* @param tableName 表的名称
* @param scan 扫描条件。
* @return row的总行数
*/
private long totalCount(String tableName,Scan scan){
Admin admin = null;
long rowCount = 0;
long start = System.currentTimeMillis();
System.out.println("start: "+new Date(start).toLocaleString());
//使用协处理器时,(没有加载协处理器)报错:
//org.apache.hadoop.hbase.exceptions.UnknownProtocolException:
// No registered coprocessor service found for name AggregateService in region **.
TableName tblName= null ;
try {
tblName = TableName.valueOf(tableName);
} catch (Exception e) {
e.printStackTrace();
}
if(tblName != null){
try {
//提前创建connection和conf
Configuration conf = conn.getConfiguration();
// conf.setBoolean("hbase.table.sanity.checks", false);
admin = conn.getAdmin();
//检查是不是已经添加过协处理器
HTableDescriptor htd = admin.getTableDescriptor(tblName);
String coprocessorName = AggregateImplementation.class.getName();
boolean flag = htd.hasCoprocessor(coprocessorName);
System.err.println("coprocessor: "+flag);
if(!flag){
//先disable表,再添加协处理器。如果不disable,也会被自动disable。
asyncDisable(admin,tblName);
//添加协处理器
htd.addCoprocessor(coprocessorName);
//不修改表,协处理器就添加不到表属性中,会报错:UnknownProtocolException:
//No registered coprocessor service found for name AggregateService in region
admin.modifyTable(tblName, htd);
//enable table
asyncEnable(admin,tblName);
}
//使用协处理器进行row count
AggregationClient ac = new AggregationClient(conf);
//防止enable超时,表格依然是disable
if(checkTableEnabled(admin, tblName)){
rowCount = ac.rowCount(tblName, new LongColumnInterpreter(), scan);
}
System.out.format("RowCount of %s : %s",tableName,rowCount);
System.out.println();
} catch (IOException e) {
e.printStackTrace();
}catch (Throwable e){
e.printStackTrace();
}finally{
if(admin != null){
try {
asyncEnable(admin,tblName);
admin.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
long end = System.currentTimeMillis();
System.out.println("end: "+new Date(end).toLocaleString());
System.out.println("统计耗时:"+(end-start)/1000+"s");
return rowCount;
}
protected boolean checkTableEnabled(Admin admin,TableName tblName){
try {
return admin.isTableEnabled(tblName);
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
private void asyncDisable(Admin admin,TableName tblName){
try{
if(admin.isTableEnabled(tblName)){
System.err.println("==========disableTableAsync===========");
admin.disableTableAsync(tblName);
//如果没有禁用,就一直轮询
long totalWait = 0;
long maxWait = 30*1000;
long sleepTime = 300;
while(!admin.isTableDisabled(tblName)){
try {
Thread.sleep(sleepTime);
totalWait += sleepTime;
//最多等待30s
if(totalWait >= maxWait){
break;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}catch(IOException e){
e.printStackTrace();
}
}
private void asyncEnable(Admin admin,TableName tblName){
try{
if(admin.isTableDisabled(tblName)){
System.err.println("==========enableTableAsync===========");
admin.enableTableAsync(tblName);
//如果没有启用,就一直轮询
long totalWait = 0;
long maxWait = 30*1000;
long sleepTime = 300;
while(!admin.isTableEnabled(tblName)){
try {
Thread.sleep(sleepTime);
totalWait += sleepTime;
//最多等待30s
if(totalWait >= maxWait){
break;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}catch(IOException e){
e.printStackTrace();
}
}
@Override
public Result getRowByKey(String tableName, String rowKey) {
Table table = null;
//拼接表名
tableName = getFullTableName(tableName);
try {
TableName tblName = TableName.valueOf(tableName);
if(checkTableEnabled(conn.getAdmin(),tblName)){
table = conn.getTable(tblName);
Get get = new Get(Bytes.toBytes(rowKey));
return table.get(get);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if(table != null){
table.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
return null;
}
@Override
public List<Result> getRowsByKey(String tableName, String rowKeyLike) {
Table table = null;
List<Result> list = list = new ArrayList<>();
//拼接表名
tableName = getFullTableName(tableName);
try {
TableName tblName = TableName.valueOf(tableName);
if(checkTableEnabled(conn.getAdmin(),tblName)){
table = conn.getTable(tblName);
PrefixFilter filter = new PrefixFilter(Bytes.toBytes(rowKeyLike));
Scan scan = new Scan();
scan.setFilter(filter);
ResultScanner scanner = table.getScanner(scan);
for (Result rs : scanner) {
list.add(rs);
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if(table != null){
table.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
return list;
}
@Override
public List<Result> getRowsByKey(String tableName, String startRow, String stopRow) {
Table table = null;
List<Result> list = new ArrayList<>();
//拼接表名
tableName = getFullTableName(tableName);
try {
TableName tblName = TableName.valueOf(tableName);
if(checkTableEnabled(conn.getAdmin(),tblName)){
table = conn.getTable(tblName);
Scan scan = new Scan();
scan.setStartRow(Bytes.toBytes(startRow));
scan.setStopRow(Bytes.toBytes(stopRow));
ResultScanner scanner = table.getScanner(scan);
for (Result rs : scanner) {
list.add(rs);
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if(table != null){
table.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
return list;
}
@Override
public List<Result> getRowsByKey(String tableName, String rowKeyLike, String family, String cols[]) {
Table table = null;
List<Result> list = new ArrayList<>();
//拼接表名
tableName = getFullTableName(tableName);
try {
TableName tblName = TableName.valueOf(tableName);
if(checkTableEnabled(conn.getAdmin(),tblName)){
table = conn.getTable(tblName);
PrefixFilter filter = new PrefixFilter(Bytes.toBytes(rowKeyLike));
Scan scan = new Scan();
if (cols == null || cols.length < 1) {
scan.addFamily(Bytes.toBytes(family));
} else {
for (int i = 0; i < cols.length; i++) {
scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(cols[i]));
}
}
scan.setFilter(filter);
ResultScanner scanner = table.getScanner(scan);
for (Result rs : scanner) {
list.add(rs);
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if(table != null){
table.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
return list;
}
@Override
public int deleteRecords(String tableName, String rowKeyLike) {
Table table = null;
int rowCount = 0;
//拼接表名
tableName = getFullTableName(tableName);
try {
TableName tblName = TableName.valueOf(tableName);
if(checkTableEnabled(conn.getAdmin(),tblName)){
table = conn.getTable(tblName);
PrefixFilter filter = new PrefixFilter(Bytes.toBytes(rowKeyLike));
Scan scan = new Scan();
scan.setFilter(filter);
ResultScanner scanner = table.getScanner(scan);
List<Delete> list = new ArrayList<Delete>();
for (Result rs : scanner) {
Delete del = new Delete(rs.getRow());
list.add(del);
rowCount ++;
}
table.delete(list);
return rowCount;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if(table != null){
table.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
return rowCount;
}
@Override
public int deleteRecord(String tableName, String rowKey) {
Table table = null;
//拼接表名
tableName = getFullTableName(tableName);
try {
TableName tblName = TableName.valueOf(tableName);
if(checkTableEnabled(conn.getAdmin(),tblName)){
table = conn.getTable(tblName);
//确认rowkey是否存在
Get get = new Get(Bytes.toBytes(rowKey));
Result result = table.get(get);
List<Cell> cells = result.listCells();
if(cells != null && cells.size() > 0){
//即使rowkey不存在,删除也不会报错。
Delete del = new Delete(Bytes.toBytes(rowKey));
table.delete(del);
return 1;
}
}
return 0;
} catch (Exception e) {
e.printStackTrace();
return 0;
} finally {
try {
if(table != null){
table.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
@Override
public Map<String, List<String>> getNameSpaceTables() {
Map<String, List<String>> result = new HashMap<>();
Admin admin = null;
try {
admin = conn.getAdmin();
NamespaceDescriptor[] namespaces = admin.listNamespaceDescriptors();
for (NamespaceDescriptor ns : namespaces) {
String nsName = ns.getName();
TableName[] tableNames = admin.listTableNamesByNamespace(nsName);
List<String> tblList = new ArrayList<>();
for (TableName name : tableNames) {
String tableName = name.getNameAsString();
//非default命名空间的表名有前缀 hbase:meta
tableName = tableName.contains(":") ? tableName.split(":")[1] : tableName;
tblList.add(tableName);
/*List<RegionInfo> regions = admin.getRegions(name);
for (RegionInfo region : regions) {
System.out.println("region id: "+region.getRegionId());
}
System.out.println("--------------------");*/
}
result.put(nsName, tblList);
}
} catch (IOException e) {
e.printStackTrace();
}finally{
if(admin != null){
try {
admin.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return result;
}
@Override
public List<String> getTablenamesOfDB(String nameSpace) {
List<String> list = new ArrayList<>();
Admin admin = null;
try {
admin = conn.getAdmin();
TableName[] tableNames = admin.listTableNamesByNamespace(nameSpace);
for (TableName name : tableNames) {
//判断table是不是enable的//过滤系统表
if(checkTableEnabled(admin, name) && !name.isSystemTable()){
String tableName = name.getNameAsString();
//非default命名空间的表名有前缀 hbase:meta
tableName = tableName.contains(":") ? tableName.split(":")[1] : tableName;
list.add(tableName);
}
}
} catch (Exception e) {
e.printStackTrace();
}finally{
if(admin != null){
try {
admin.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return list;
}
public int getTableStatus(String tableName){
//拼接表名
tableName = getFullTableName(tableName);
Admin admin = null;
try {
admin = conn.getAdmin();
TableName tn = TableName.valueOf(tableName);
boolean exist = admin.tableExists(tn);
if(exist){
//表存在,继续判断是否可用。
boolean disabled = admin.isTableDisabled(tn);
if(disabled){
//0:禁用
return 0;
}else{
//1:可用
return 1;
}
}
} catch (IOException e) {
e.printStackTrace();
}finally{
if(admin != null){
try {
admin.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
//-1:表不存在
return -1;
}
/**
* 从HBase表的结果集提取出具体的字段值,即每行包括主键、列族、列名、列值。
*
* @param results
* @param b 用来区分 是否返回geom 字段数据 true 是返回 false 是不返回
*/
public Map<String,Object> extractResults(List<Result> results, boolean b) {
Map<String,Object> rowDatas = new HashMap<>();
List<Map<String,String>> data = new ArrayList<>();
Set<String> fields = new HashSet<>();
try {
for (Result r : results) {
//直接遍历Cell会有乱码
//Cell[] cells = r.rawCells();
Map<String, String> oneRowData = new HashMap<>();
//TODO:
oneRowData.put(pk, Bytes.toString(r.getRow()));
NavigableMap<byte[], NavigableMap<byte[], byte[]>> maps = r.getNoVersionMap();
//遍历列族
NavigableSet<byte[]> keys = maps.navigableKeySet();
for (byte[] key : keys) {
//TODO:
String columnFamily = Bytes.toString(key);
NavigableMap<byte[], byte[]> values = maps.get(key);
//遍历某个列族下的列
NavigableSet<byte[]> ks = values.navigableKeySet();
for (byte[] k : ks) {
//TODO:
String qualifierName = Bytes.toString(k);
String qualifierValue = Bytes.toString(values.get(k));
String fieldName = columnFamily + "." + qualifierName;
if(b){
oneRowData.put(fieldName, qualifierValue);
}else{
if(!"geom".equals(qualifierName)){
oneRowData.put(fieldName, qualifierValue);
}
}
}
}
fields.addAll(oneRowData.keySet());
data.add(oneRowData);
}
rowDatas.put("data", data);
rowDatas.put("fields", fields);
rowDatas.put("pk", pk);
} catch (Exception e) {
e.printStackTrace();
}
return rowDatas;
}
@Override
public double usedRate() {
double rate = 0.0;
Admin admin = null;
try {
admin = conn.getAdmin();
//获取region数据总量和总大小
long regionSize = admin.getConfiguration()
.getLong("hbase.hregion.max.filesize", 10737418240L); //byte
ClusterStatus clusterStatus = admin.getClusterStatus();
int regionsCount = clusterStatus.getRegionsCount();
final long totalRegionSizes = regionSize * regionsCount;
Collection<ServerName> liveServers = clusterStatus.getServers();
//获取storefile总大小(默认累加单位为MB)
double totalHFileSizes = 0;
for (ServerName liveServer : liveServers) {
ServerLoad serverLoad = clusterStatus.getLoad(liveServer);
if(serverLoad != null){
int storefileSizeInMB = serverLoad.getStorefileSizeInMB();
totalHFileSizes += (storefileSizeInMB);
}
}
rate = (totalHFileSizes*1024*1024) / totalRegionSizes;
} catch (IOException e) {
e.printStackTrace();
}finally{
if(admin != null){
try {
admin.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return Double.parseDouble(String.format("%.2f",rate*100));
}
@Override
public double storeSizeOfDB(SizeUnitEnum unit) {
return storeSizeOfDB(null,unit);
}
@Override
public double storeSizeOfDB(String nameSpace, SizeUnitEnum unit) {
//获取storefile总大小(默认累加单位为MB)
double totalHFileSizes = 0.00;
Admin admin = null;
try {
admin = conn.getAdmin();
ClusterStatus clusterStatus = admin.getClusterStatus();
Collection<ServerName> liveServers = clusterStatus.getServers();
if(nameSpace == null){
//获取所有的storefile总大小
for (ServerName server : liveServers) {
//当前活着的server
ServerLoad serverLoad = clusterStatus.getLoad(server);
if(serverLoad != null){
int storefileSizeInMB = serverLoad.getStorefileSizeInMB();
totalHFileSizes += (storefileSizeInMB);
}
}
}else if(isNamespaceExist(nameSpace)){
//获取指定namespace下的所有表的storefile的大小
TableName[] tableNames = admin.listTableNamesByNamespace(nameSpace);
List<HRegionInfo> totalRegions = new ArrayList<>();
for (TableName tableName : tableNames) {
//获取当前表分布在哪些region上
List<HRegionInfo> currentRegions = admin.getTableRegions(tableName);
totalRegions.addAll(currentRegions);
}
for (ServerName server : liveServers) {
//当前活着的server
ServerLoad serverLoad = clusterStatus.getLoad(server);
//当前server中所有的regions
Map<byte[], RegionLoad> regionLoads = serverLoad.getRegionsLoad();
//查看哪些regions是属于当前namespace中的表的
for (HRegionInfo tableRegion : totalRegions) {
//获取以上各个region中storefile的大小
RegionLoad rl = regionLoads.get(tableRegion.getRegionName());
if(rl != null){
totalHFileSizes += (rl.getStorefileSizeMB());
}
}
}
}
} catch (IOException e) {
e.printStackTrace();
}finally{
if(admin != null){
try {
admin.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return FileSizeUtil.valueOf(totalHFileSizes*1024*1024, unit);
}
@Override
public double storeSizeOfTbl(String nameSpace,String[] tables, SizeUnitEnum unit) {
//获取storefile总大小(默认累加单位为MB)
double totalHFileSizes = 0.00;
Admin admin =null;
try {
boolean flag = isNamespaceExist(nameSpace);
if(flag && tables != null){
admin = conn.getAdmin();
ClusterStatus clusterStatus = admin.getClusterStatus();
Collection<ServerName> liveServers = clusterStatus.getServers();
List<HRegionInfo> totalRegions = new ArrayList<>();
for (String table : tables) {
//获取当前表分布在哪些region上
String fullTableName = nameSpace+":"+table;
if(table.contains(":")){
fullTableName = table;
}
TableName currentTable = TableName.valueOf(fullTableName);
List<HRegionInfo> currentRegions = admin.getTableRegions(currentTable);
totalRegions.addAll(currentRegions);
}
for (ServerName server : liveServers) {
//当前活着的server
ServerLoad serverLoad = clusterStatus.getLoad(server);
//当前server中所有的regions
Map<byte[], RegionLoad> regionLoads = serverLoad.getRegionsLoad();
//查看哪些regions是属于当前namespace中的表的
for (HRegionInfo tableRegion : totalRegions) {
//获取以上各个region中storefile的大小
RegionLoad rl = regionLoads.get(tableRegion.getRegionName());
if(rl != null){
totalHFileSizes += (rl.getStorefileSizeMB());
}
}
}
}
} catch (Exception e) {
System.out.println("===================="+e.getMessage());
e.printStackTrace();
}finally{
if(admin != null){
try {
admin.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return FileSizeUtil.valueOf(totalHFileSizes*1024*1024, unit);
}
@Override
public int countTables(String nameSpace) {
List<String> list = getTablenamesOfDB(nameSpace);
return list.size();
}
@Override
public List<String> getTables(String nameSpace) {
List<String> list = getTablenamesOfDB(nameSpace);
return list;
}
@Override
public List<Result> queryByRandomField(String tableName, String familyName,String qualifier,
String fieldValue,Integer pageSize,Integer pageNum) {
if (pageSize == null || pageSize < 1) {
pageSize = 10;
}
if (pageNum == null || pageNum < 1) {
pageNum = 1;
}
Table table = null;
List<Result> list = new ArrayList<>();
//拼接表名
tableName = getFullTableName(tableName);
try {
TableName tblName = TableName.valueOf(tableName);
if(checkTableEnabled(conn.getAdmin(),tblName)){
table = conn.getTable(tblName);
Scan scan = new Scan();
Filter columnFilter = null;
if(pk.equals(qualifier)){
//过滤1:模糊匹配rowkey的值,找到所有row
columnFilter = new RowFilter(CompareFilter.CompareOp.EQUAL,
new SubstringComparator(fieldValue));
}else{
//过滤2:模糊匹配某个字段的值(非rowkey)
columnFilter = new SingleColumnValueFilter(
Bytes.toBytes(familyName),
Bytes.toBytes(qualifier),
CompareFilter.CompareOp.EQUAL,
new SubstringComparator(fieldValue));
}
//过滤3:页面大小,为分页准备
Filter pageFilter = new PageFilter(pageSize);
//添加过滤器
List<Filter> filters = new ArrayList<>();
filters.add(columnFilter);
filters.add(pageFilter);
scan.setFilter(new FilterList(filters));
//查询
ResultScanner scanner = table.getScanner(scan);
if (pageNum > 1) {
for (int i = 1; i < pageNum; i++) {
byte[] startRow = new byte[]{};
for (Result result : scanner) {
startRow = result.getRow();
}
startRow = Bytes.add(startRow, new byte[]{0});
scan.setStartRow(startRow);
scanner = table.getScanner(scan);
}
}
int resultSize = pageSize;
for (Result rs : scanner) {
if(resultSize>0){
list.add(rs);
resultSize = resultSize -1;
}
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if(table != null){
table.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
return list;
}
@Override
public long totalCountOfFuzzyQuery(String tableName,String familyName,
String qualifier,String fieldValue) {
//拼接表名
tableName = getFullTableName(tableName);
//添加scan条件
Scan scan = new Scan();
Filter columnFilter = null;
if(pk.equals(qualifier)){
//过滤1:模糊匹配rowkey的值,找到所有row
columnFilter = new RowFilter(CompareFilter.CompareOp.EQUAL,
new SubstringComparator(fieldValue));
}else{
//过滤2:模糊匹配某个字段的值(非rowkey)
columnFilter = new SingleColumnValueFilter(
Bytes.toBytes(familyName),
Bytes.toBytes(qualifier),
CompareFilter.CompareOp.EQUAL,
new SubstringComparator(fieldValue));
}
//添加过滤器
scan.setFilter(columnFilter);
//使用coprocessor计数
return totalCount(tableName, scan);
}
@Override
public Map<String, String> getColumnNames(String tableName) {
return getColumnNames(tableName,null,null);
}
private Map<String, String> getColumnNames(String tableName,Integer pageSize,Integer pageNum){
List<Result> someRows = getAllRows(tableName, pageSize, pageNum);
Map<String, Object> map = extractResults(someRows, true);
Object fields = map.get("fields");
Map<String, String> columnNames = new HashMap<>();
if(fields != null){
//有列的名称
Set<String> set = (Set<String>)fields;
for (String field : set) {
columnNames.put(field, "");
}
}
return columnNames;
}
}
依赖:
<properties>
<geomesa.version>2.2.1</geomesa.version>
<gt.version>20.0</gt.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<scala.abi.version>2.11</scala.abi.version>
<scala.version>2.11.7</scala.version>
<!-- environment specific variables-->
<zookeeper.version>3.4.9</zookeeper.version>
<hadoop.version>2.6.0</hadoop.version>
<hbase.hadoop.version>2.6.0</hbase.hadoop.version>
<kafka.version>1.0.0</kafka.version>
<hbase.version>1.3.1</hbase.version>
<spark.version>2.4.0</spark.version>
</properties>
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.33.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
<version>3.6.2.Final</version>
</dependency>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>1.4</version>
</dependency>
<!--<dependency>
<groupId>org.locationtech.geomesa</groupId>
<artifactId>geomesa-hbase-tools_2.11</artifactId>
<version>${geomesa.version}</version>
</dependency>
<dependency>
<groupId>org.locationtech.geomesa</groupId>
<artifactId>geomesa-hbase-datastore_${scala.abi.version}</artifactId>
<version>${geomesa.version}</version>
</dependency>-->
<dependency>
<groupId>org.locationtech.geomesa</groupId>
<artifactId>geomesa-hbase-spark-runtime_${scala.abi.version}</artifactId>
<version>${geomesa.version}</version>
</dependency>
<dependency>
<groupId>org.locationtech.geomesa</groupId>
<artifactId>geomesa-spark-core_2.11</artifactId>
<version>${geomesa.version}</version>
</dependency>
<dependency>
<groupId>org.locationtech.geomesa</groupId>
<artifactId>geomesa-spark-jts_2.11</artifactId>
<version>${geomesa.version}</version>
</dependency>
<!--<dependency>
<groupId>org.locationtech.geomesa</groupId>
<artifactId>geomesa-index-api_2.11</artifactId>
<version>${geomesa.version}</version>
</dependency>-->
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.2.5</version>
</dependency>
<!--<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>-->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-annotations</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-protocol</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>${zookeeper.version}</version>
</dependency>
<dependency>
<groupId>org.geotools</groupId>
<artifactId>gt-opengis</artifactId>
<version>${gt.version}</version>
</dependency>
<!--<dependency>
<groupId>org.geotools</groupId>
<artifactId>gt-data</artifactId>
<version>${gt.version}</version>
</dependency>-->
<dependency>
<groupId>org.geotools</groupId>
<artifactId>gt-epsg-hsql</artifactId>
<version>${gt.version}</version>
</dependency>
<!--
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_${scala.abi.version}</artifactId>
<version>${spark.version}</version>
</dependency>-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.abi.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.abi.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- <dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-yarn_${scala.abi.version}</artifactId>
<version>${spark.version}</version>
</dependency>-->
</dependencies>
版权声明:本文为koukan3原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。