Geomesa集成HBase:Geotools DataStore API 读写 HBase


import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.geotools.data.*;
import org.geotools.factory.Hints;
import org.geotools.feature.simple.SimpleFeatureBuilder;
import org.geotools.filter.identity.FeatureIdImpl;
import org.geotools.filter.text.cql2.CQLException;
import org.geotools.filter.text.ecql.ECQL;
import org.locationtech.geomesa.hbase.data.HBaseDataStore;
import org.locationtech.geomesa.hbase.data.HBaseDataStoreFactory;
import org.locationtech.geomesa.hbase.data.HBaseFeatureWriter;
import org.locationtech.geomesa.index.metadata.GeoMesaMetadata;
import org.opengis.feature.Property;
import org.opengis.feature.simple.SimpleFeature;
import org.opengis.feature.simple.SimpleFeatureType;
import org.opengis.feature.type.Name;
import org.opengis.filter.Filter;
import org.opengis.filter.sort.SortBy;

import java.io.IOException;
import java.util.*;

/**
 * @Author: gh
 * @Description: 通过geomesa操作hbase数据。
 * 1.新增:通过geomesa封装并存储数据到HBase。
 * //geomesa默认主键名称是__fid__
 * SimpleFeature封装的数据中,主键是feature ID(__fid__),不是数据自身的ID(gid,fid等)。
 * 2.删除:根据rowkey删除数条记录。
 * 3.更新:更新除geom以外的字段。
 * 和新增使用同一个方法。
 * 4.查询:解析出所有字段并返回;分页;模糊查询。
 * 条件查询、模糊查询
 */
public class GeomesaHBaseImpl extends HBaseDaoImpl{


    public GeomesaHBaseImpl(DataBaseDTO dto){
        super(dto);
    }
    /**
     * 精确查询、模糊查询
     * @param tableName 表名
     * @param qualifier 字段名
     * @param fieldValue 字段值
     * @param pageSize 分页大小
     * @param pageNum 当前页码
     * @return
     */
    public Map<String,Object> queryByRandomField(String tableName, String qualifier,String fieldValue,
                                           Integer pageSize,Integer pageNum) {
        Query query = null;
        DataStore ds = createDataStore(tableName);
        if(ds != null){
            //设置查询条件
            try {
                query = new Query(getTypeNames(ds)[0],
                        ECQL.toFilter(qualifier+" LIKE '%"+fieldValue+"%'"));
            } catch (CQLException e) {
                e.printStackTrace();
            }
        }
        return queryFeatureDatas(ds, query,pageSize,pageNum);
    }

    /**
     * 统计一个表中的记录总条数
     * 【注意:不能直接调用父类中的totalCount方法,统计结果不对!】
     * @param tableName 表名
     * @return 总条数
     */
    public long totalCount2(String tableName) {
        long count = 0;
        try {
            //不设置查询条件
            String indexTable = getXZ3IndexTable(tableName);
            //使用协处理器统计(任何一张)索引表才有结果。
            count = super.totalCount(indexTable);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return count;
       /* DataStore ds = createDataStore(tableName);
        Query query = new Query(getTypeNames(ds)[0]);
        return getTotalCountByQuery(ds,query);*/
    }
    /**
     * 获取模糊查询
     * @param tableName 表名
     * @param qualifier 字段名
     * @param fieldValue 字段值
     * @return 模糊查询的结果总数
     */
    public long totalCountOfFuzzyQuery(String tableName,String qualifier,String fieldValue) {
        long count = 0;
        Query query = null;
        DataStore ds = createDataStore(tableName);
        if(ds != null){
            //设置查询条件
            try {
                query = new Query(getTypeNames(ds)[0],
                        ECQL.toFilter(qualifier+" LIKE '%"+fieldValue+"%'"));
            } catch (CQLException e) {
                e.printStackTrace();
            }
            count =  getTotalCountByQuery(ds, query);
        }
        return count;
    }
    /**
     * 分页查询。
     * @param tableName 表名
     * @param pageSize 每页的大小
     * @param pageNum  当前页的页数
     */
    public Map<String,Object> getAllRowsByPage(String tableName, Integer pageSize, Integer pageNum) {
        //不设置查询条件
        DataStore ds = createDataStore(tableName);
        Query query = null;
        if(ds != null){
            query = new Query(getTypeNames(ds)[0]);
        }
        return  queryFeatureDatas(ds, query,pageSize,pageNum);
    }
    /**
     * 获取指定命名空间(数据库)下所有的表的名称。
     * (排除元数据表)
     * 源码中createSchema时命名表:GeoMesaFeatureIndex
     * .formatTableName(ds.config.catalog, GeoMesaFeatureIndex.tableSuffix(this, partition), sft)
     * @param nameSpace
     * @return
     */
    @Override
    public List<String> getTablenamesOfDB(String nameSpace) {
        //过滤掉了disable的表
        List<String> fullTableNames = getFullTablenamesOfDB(nameSpace);
        List<String> filteredTableNames = new ArrayList<>();
        for (String fullName : fullTableNames) {
            List<String> cfs = getColumnFamilies(fullName);
            //catalog表的CF=m,索引表的CF=d
            if(cfs.contains("m")){
                fullName = fullName.contains(":") ? fullName.split(":")[1] : fullName;
                filteredTableNames.add(fullName);
            }
           /* DataStore ds = createDataStore(fullName);
        //当fullname是索引表时,datastore=null,此时报错:NoSuchColumnFamilyException/RemoteWithExtrasException。
            if(ds != null){
                fullName = fullName.contains(":") ? fullName.split(":")[1] : fullName;
                filteredTableNames.add(fullName);
            }*/
        }
        return filteredTableNames;
    }
    /**
     * 获取某张表的字段名称和字段类型。
     * @param tableName 表的名称
     * @return map:key字段名称,value字段类型的class字符串。
     */
    @Override
    public Map<String, String> getColumnNames(String tableName) {
        DataStore dataStore = createDataStore(tableName);
        return getColumnNames(dataStore);
    }

    /**
     * 新增/更新一条数据。
     * 【注意:新增的数据中不包括主键属性RowKey!主键的设置为后台自动生成!】
     * 【注意:更新的数据中要包括主键属性RowKey!】
     * @param tableName 表名
     * @param qualifiers 字段名
     * @param values 字段值
     * @return 插入的数据条数
     */
    public int insert(String tableName, String []qualifiers, String []values) {
        DataStore ds = createDataStore(tableName);
        if(ds != null){
            SimpleFeatureType sft = getSimpleFeatureType(ds);
            //组织数据
            if(qualifiers!=null && values!=null){
                int len1 = qualifiers.length;
                int len2 = values.length;
                if(len1==len2){
                    //查询表中的字段,确定主键
                /*Query query = new Query(getTypeNames(ds)[0]);
                Map<String, Object> queryMap = queryFeatureDatas(ds, query, 1, 1);
                String pk = queryMap.get("pk").toString();*/
                    //封装数据
                    List<Map<String,Object>> datas = new ArrayList<>();
                    Map<String,Object> map = new HashMap<>();
                    for(int i=0;i<len1;i++){
                        map.put(qualifiers[i], values[i]);
                    }
                    datas.add(map);
                    List<SimpleFeature> simpleFeatures = dataToSimpleFeatures(sft,datas);
                    return writeFeatures(ds, sft, simpleFeatures);
                }
            }
        }
        return 0;
    }

    /**
     * 根据指定的条件,删除一条或多条满足条件的数据。
     * 【注意:一般使用主键进行精确删除!!】
     * @param tableName
     * @param fieldValues 字段名称和值
     * @return 删除的数据条数
     */
    public int deleteRecords(String tableName, Map<String,Object> fieldValues) {
        int count = 0;
        DataStore ds = createDataStore(tableName);
        if(ds == null){
            return count;
        }
        if(fieldValues != null){
            List<Query> queries = new ArrayList<>();
            //获取全称的表名
            tableName = super.getFullTableName(tableName);
            //根据字段名称和值,构建查询条件
            Set<Map.Entry<String, Object>> entries = fieldValues.entrySet();
            for (Map.Entry<String, Object> entry : entries) {
                String field = entry.getKey();
                Object value = entry.getValue();
                try {
                    Filter filter = ECQL.toFilter(field + " = '" + value.toString() + "'");
                    queries.add(new Query(tableName, filter));
                } catch (CQLException e) {
                    e.printStackTrace();
                }
            }
            //根据查询条件,查询对应的features
            List<SimpleFeature> simpleFeatures = queryFeatures(ds, queries);
            //删除指定的features记录
            String typeName = getTypeNames(ds)[0];
            count = removeFeatures(ds, typeName, simpleFeatures);
        }
        return count;
    }

    /**
     * 查询某个命名空间下的若干个表的总计大小。
     * @param nameSpace 命名空间
     * @param tables 表的名称
     * @param unit 大小单位
     * @return 总计打下。
     */
    @Override
    public double storeSizeOfTbl(String nameSpace,String[] tables, SizeUnitEnum unit) {
        boolean flag = isNamespaceExist(nameSpace);
        List<String> allGivenTables = new ArrayList<>();
        if(flag && tables != null){
            for (String table : tables) {
                //搜索出指定tables相关的元数据表
                List<String> associatedMetaTables = getAssociatedMetaTables(table);
                allGivenTables.addAll(associatedMetaTables);
            }
        }
        String[] tableNamesArray = allGivenTables.toArray(new String[allGivenTables.size()]);
        return super.storeSizeOfTbl(nameSpace,tableNamesArray, unit);
    }
    private boolean checkTableEnabled(Connection conn, String tableName){
        try {
            String fullTableName = super.getFullTableName(tableName);
            TableName tn = TableName.valueOf(fullTableName);
            return conn.getAdmin().isTableEnabled(tn);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }
    /**
     * 删除若干条feature数据。
     * @param datastore
     * @param typeName feature type name
     * @param features 封装的需要删除的数据列表
     * @return 删除的条数
     */
    private int removeFeatures(DataStore datastore,String typeName,List<SimpleFeature> features){
        int count = 0;
        if(datastore == null){
            return count;
        }
        try (FeatureWriter<SimpleFeatureType, SimpleFeature> writer =
                     datastore.getFeatureWriter(typeName, Transaction.AUTO_COMMIT)) {
                HBaseFeatureWriter hbaseWriter = (HBaseFeatureWriter)writer;
            for (SimpleFeature feature : features) {
                hbaseWriter.removeFeature(feature);
                /* 以下方式行不通,数据并没有被删除:
                SimpleFeature next = writer.next();
                next = feature;
                writer.remove();*/
                //next.setAttributes(feature.getAttributes());
                count+=1;
            }
        }catch(IOException e){
            System.out.println(e.getMessage());
        }catch (NullPointerException e){
            System.out.println(e.getMessage());
        }
        System.out.println("Delete "+count+" features successfully!");
        return count;
    }
    /**
     * 写入feature数据
     * @param datastore
     * @param sft
     * @param features 封装的数据列表
     * @return 写入的条数
     */
    private int writeFeatures(DataStore datastore,SimpleFeatureType sft,List<SimpleFeature> features){
        int count = 0;
        if(datastore == null){
            return count;
        }
        try (FeatureWriter<SimpleFeatureType, SimpleFeature> writer =
                     datastore.getFeatureWriterAppend(sft.getTypeName(), Transaction.AUTO_COMMIT)) {
            for (SimpleFeature feature : features) {
                // 使用geotools writer,获取一个feature,然后修改并提交。
                // appending writers will always return 'false' for haveNext, so we don't need to bother checking
                SimpleFeature toWrite = writer.next();
                // copy attributes:
                List<Object> attributes = feature.getAttributes();
                toWrite.setAttributes(attributes);
                // 如果设置了 feature ID, 需要转换为实现类,并添加USE_PROVIDED_FID hint到user data
                String featureId = feature.getID();
                ((FeatureIdImpl) toWrite.getIdentifier()).setID(featureId);
                //toWrite.getUserData().put(Hints.USE_PROVIDED_FID, Boolean.TRUE);
                // 或者可直接使用 PROVIDED_FID hint
                toWrite.getUserData().put(Hints.PROVIDED_FID, featureId);
                //如果没有设置feature ID, 会自动生成一个UUID
                toWrite.getUserData().putAll(feature.getUserData());
                // write the feature
                writer.write();
                count += 1;
            }
        }catch(IOException e){
            System.out.println(e.getMessage());
        }catch (NullPointerException e){
            System.out.println(e.getMessage());
        }
        System.out.println("Write "+count+" features successfully!");
        return count;
    }

    /**
     * 将数据封装到SimpleFeature,一行数据就是一个SimpleFeature。
     * @param sft feature类型信息
     * @param datas 多行数据值。map中的key是字段名,value是字段值。
     *              【注意数据中不包括主键!】
     * @return 多行数据组成的SimpleFeature对象列表。
     */
    private List<SimpleFeature> dataToSimpleFeatures(SimpleFeatureType sft,
                                                      List<Map<String,Object>> datas) {
        List<SimpleFeature> features = new ArrayList<>();
        //查看user data设置
        /*Map<Object, Object> userDatas = sft.getUserData();
        Boolean idProvided = (Boolean)userDatas.get(Hints.USE_PROVIDED_FID);
        idProvided = idProvided == null?Boolean.FALSE:idProvided;*/
        //使用geotools SimpleFeatureBuilder创建特征features
        SimpleFeatureBuilder builder = new SimpleFeatureBuilder(sft);
        for (Map<String, Object> rowMap : datas) {
            Set<String> fieldNames = rowMap.keySet();
            //设置feature id
            String featureId = null;
            if(fieldNames.contains(pk)){
                featureId = rowMap.get(pk).toString();
                fieldNames.remove(pk);
            }else{
                //新增时,自动生成feature id。
                featureId = UUID.randomUUID().toString();
            }
            SimpleFeature feature = null;
            for (String fieldName : fieldNames) {
                Object fieldValue = rowMap.get(fieldName);
                //写入数据
                builder.set(fieldName, fieldValue);
                //确定是否需要自己提供ID
               /* if(idProvided){
                    //使用自己提供的ID
                    featureId = UUID.randomUUID().toString();
                }*/
            }
            try {
                // 告知geotools,我们要使用自己提供的ID
                builder.featureUserData(Hints.USE_PROVIDED_FID, Boolean.TRUE);
                // build the feature - this also resets the feature builder for the next entry
                // 一律使用自己提供的ID!
                feature = builder.buildFeature(featureId);
                features.add(feature);
            } catch (Exception e) {
                System.out.println("Invalid SimpleFeature data: " + e.toString());
            }
        }
        return Collections.unmodifiableList(features);
    }
    private Map<String, String> buildParams(String tableName) throws ParseException {
        System.out.println("Start building parameters...");
        String zk = dataBaseDTO.getIp()+":"+dataBaseDTO.getHost();
        String[]args = new String[]{"--hbase.zookeepers", zk,"--hbase.catalog",tableName};
        DataAccessFactory.Param[] infos = new HBaseDataStoreFactory().getParametersInfo();
        Options options = createOptions(infos);
        CommandLine command = CommandLineDataStore.parseArgs(getClass(), options, args);
        return CommandLineDataStore.getDataStoreParams(command, options);
    }
    /**
     * 构建ECQL语句,根据字段类型来拼接ECQL。
     * @param qualifier  字段名
     * @param fieldValue 字段值
     * @return ECQL语句
     */
    private String buildEcqlPredicate(String tableName,String qualifier,String fieldValue){
        //获取全称的表名
        tableName = super.getFullTableName(tableName);
        Map<String, String> columns = getColumnNames(tableName);
        String type = "String";
        //strToLowerCase(STATE_NAME)
        Set<String> keys = columns.keySet();
        //确定给定的字段是什么类型
        for (String key : keys) {
            if(key.equals(qualifier)){
                type = columns.get(key);
            }
        }
        //判断如何拼接
        if(type.contains("String")){

        }
        return null;
    }
    private DataStore createDataStore(String tableName){
        DataStore ds = null;
        //获取全称的表名
        tableName = super.getFullTableName(tableName);
        if(checkTableEnabled(getConn(),tableName)){
            try {
                Map<String, String> params = buildParams(tableName);
                System.out.println("Loading datastore...");
                // use geotools service loading to get a datastore instance
                //当表禁用时datastore为null
                ds = DataStoreFinder.getDataStore(params);
            } catch (ParseException e) {
                e.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return ds;
    }
    private Options createOptions(DataAccessFactory.Param[] parameters) {
        // parse the data store parameters from the command line
        Options options = CommandLineDataStore.createOptions(parameters);
        return options;
    }
    /**
     * 获取某个表中的字段信息,包括:字段名、字段类型
     * @param dataStore  数据存储
     * @return 字段信息
     */
    private Map<String, String> getColumnNames(DataStore dataStore) {
        Map<String, String> columnNames = new HashMap<>();
        if(dataStore == null){
            return columnNames;
        }
        String typeName = getTypeNames(dataStore)[0];
        if(StringUtils.isEmpty(typeName)){
            throw new RuntimeException("feature type name should not be empty! ");
        }
        Query query = new Query(typeName);
        try (FeatureReader<SimpleFeatureType, SimpleFeature> reader =
                     dataStore.getFeatureReader(query, Transaction.AUTO_COMMIT)) {
            // loop through all results
            if(reader.hasNext()){
                //开始收集结果,读取一行的数据
                SimpleFeature feature = reader.next();
                //封装一行的数据
                Collection<Property> properties = feature.getProperties();
                for (Property p : properties) {
                    //获取字段类型
                    Class<?> binding = p.getType().getBinding();
                    String pType = binding.getName();
                    //获取字段名称
                    String pName = p.getName().toString();
                    columnNames.put(pName, pType);
                }
            }
        }catch(IOException e){
            System.out.println(e.getMessage());
        }catch (NullPointerException e){
            System.out.println(e.getMessage());
        }
        return columnNames;
    }
    private String[] getTypeNames(DataStore ds){
        String[] typeNames=new String[1];
        try {
            if(ds != null){
                String[] types = ds.getTypeNames();
                if(types != null && types.length > 0){
                    typeNames =  types;
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return typeNames;
    }
    private SimpleFeatureType getSimpleFeatureType(DataStore ds){
        SimpleFeatureType sft = null;
        try {
            if(ds != null){
                String typeName = getTypeNames(ds)[0];
                sft = ds.getSchema(typeName);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return sft;
    }
    private Map<Object, Object> getUserData(DataStore ds) {
        SimpleFeatureType sft = getSimpleFeatureType(ds);
        if(sft != null){
            return sft.getUserData();
        }else{
            return new HashMap<>();
        }
    }
    private long getTotalCountByQuery(DataStore datastore,Query query){
        long count = 0;
        if(datastore == null){
            return count;
        }
        try (FeatureReader<SimpleFeatureType, SimpleFeature> reader =
                     datastore.getFeatureReader(query, Transaction.AUTO_COMMIT)) {
            while(reader.hasNext()){
                //开始收集结果,读取一行的数据
                reader.next();
                count+=1;
            }
        }catch(IOException e){
            System.out.println(e.getMessage());
        }catch (NullPointerException e){
            System.out.println(e.getMessage());
        }
        return count;
    }
    private String getTableNameKey(String name,int version){
        //e.g."table.$name.v$version"
        return new StringBuilder("table")
                .append(".").append(name)
                .append(".").append("v").append(version)
                .toString();
    }
    /**
     * 获取与给定表相关的元数据表。
     * @param tableName
     * @return 表的名称列表
     */
    private List<String> getAssociatedMetaTables(String tableName){
        List<String> tables = new ArrayList<>();
        //获取全称的表名
        tableName = super.getFullTableName(tableName);
        tables.add(tableName);
        DataStore ds = createDataStore(tableName);
        if(ds != null){
            //转为HBaseDataStore,获取元数据
            HBaseDataStore hds = (HBaseDataStore)ds;
            GeoMesaMetadata<String> metadata = hds.metadata();
            //获取user data中的索引信息
            Map<Object, Object> userData = getUserData(ds);
            Object o = userData.get("geomesa.indices");
            if(o != null){
                //e.g."xz2:1:3,id:1:3"
                String indices = o.toString();
                String[] splits = indices.split(",");
                for (String split : splits) {
                    String[] oneIndiceInfo = split.split(":");
                    if(oneIndiceInfo.length>=3){
                        String tableNameKey = getTableNameKey(oneIndiceInfo[0], Integer.valueOf(oneIndiceInfo[1]));
                        String metaTableName = metadata.read(getTypeNames(ds)[0], tableNameKey, true).get();
                        if(!StringUtils.isBlank(metaTableName)){
                            tables.add(metaTableName);
                        }
                    }
                }
            }
        }
        return tables;
    }

    /**
     * 获取指定表的某个索引表的名称
     * @param tableName hbase表的全称:namespace:table
     * 空间索引(Z2/XZ2)、时间索引(Z3/XZ3)、ID索引、属性索引。
     * @return 索引表的名称
     */
    private String getXZ3IndexTable(String tableName) throws NoSuchElementException{
        //索引表//获取全称的表名
        String indexTable = super.getFullTableName(tableName);
        DataStore ds = createDataStore(tableName);
        if(ds != null){
            //转为HBaseDataStore,获取元数据
            HBaseDataStore hds = (HBaseDataStore)ds;
            GeoMesaMetadata<String> metadata = hds.metadata();
            //获取user data中的索引信息
            Map<Object, Object> userData = getUserData(ds);
            Object o = userData.get("geomesa.indices");
            if(o != null){
                //e.g."xz2:1:3,id:1:3"
                String indices = o.toString();
                String[] splits = indices.split(",");
                Map<String,String> indexMap = new HashMap<>();
                for (String split : splits) {
                    String[] oneIndiceInfo = split.split(":");
                    String indexName = oneIndiceInfo[0];
                    String indexVersion = oneIndiceInfo[1];
                    if(oneIndiceInfo.length>=3){
                        String tableNameKey = getTableNameKey(indexName, Integer.valueOf(indexVersion));
                        String metaTableName = metadata.read(getTypeNames(ds)[0], tableNameKey, true).get();
                        indexMap.put(indexName, metaTableName);
                    }
                }
                //取出索引表
                if(indexMap.get("xz3")!=null){
                    indexTable = indexMap.remove("xz3");
                }else if(indexMap.get("z3")!=null){
                    indexTable = indexMap.remove("z3");
                }else if(indexMap.get("xz2")!=null){
                    indexTable = indexMap.remove("xz2");
                }else if(indexMap.get("z2")!=null){
                    indexTable = indexMap.remove("z2");
                }else if(indexMap.get("id")!=null){
                    indexTable = indexMap.remove("id");
                }else{
                    if(indexMap.size()>0){
                        indexTable = indexMap.values().iterator().next();
                    }
                }
            }
        }
        return indexTable;
    }
    private List<String> getColumnFamilies(String tableName){
        List<String> list = new ArrayList<>();
        Admin admin = null;
        try {
            admin = conn.getAdmin();
            //使用全称的表
            tableName = super.getFullTableName(tableName);
            HTableDescriptor htd = admin.getTableDescriptor(TableName.valueOf(tableName));
            HColumnDescriptor[] hcdArray = htd.getColumnFamilies();
            for (HColumnDescriptor hcd : hcdArray) {
                list.add(hcd.getNameAsString());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }finally{
            if(admin != null){
                try {
                    admin.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
        return list;
    }
    /**
     * 获取命名空间下全称的表名,格式为  namespace:tablename
     * @param nameSpace 命名空间名称,相当于数据库
     * @return 表的全称的列表
     */
    private List<String> getFullTablenamesOfDB(String nameSpace) {
        List<String> list = new ArrayList<>();
        Admin admin = null;
        try {
            admin = conn.getAdmin();
            TableName[] tableNames = admin.listTableNamesByNamespace(nameSpace);
            for (TableName name : tableNames) {
                //判断table是不是enable的 //过滤系统表
                if(checkTableEnabled(admin,name) && !name.isSystemTable()){
                    String tableName = name.getNameAsString();
                    list.add(tableName);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }finally{
            if(admin != null){
                try {
                    admin.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
        return list;
    }
    /**
     * 查询数据:精确查询、模糊查询、分页查询。
     * 字段名称:property.getName()  或者 property.getType().getName()
     * 字段值: feature.getAttribute(property.getName()) 或者 property.getValue()
     * @param dataStore
     * @param query  查询条件
     * @throws IOException
     */
    private Map<String,Object> queryFeatureDatas(DataStore dataStore, Query query,
                                                 Integer pageSize, Integer pageNum){
        if (pageSize == null || pageSize < 1) {
            pageSize = 10;
        }
        if (pageNum == null || pageNum < 1) {
            pageNum = 1;
        }
        //封装结果数据
        Map<String,Object> rowDatas = new HashMap<>();
        List<Map<String,String>> datas = new ArrayList<>();
        Set<String> fields = new HashSet<>();
        if(dataStore == null){
            rowDatas.put("data", datas);
            rowDatas.put("fields", fields);
            rowDatas.put("pk", pk);
            return rowDatas;
        }
        try (FeatureReader<SimpleFeatureType, SimpleFeature> reader =
                     dataStore.getFeatureReader(query, Transaction.AUTO_COMMIT)) {
            // loop through all results
            long totalLoopTimes = pageSize * pageNum;
            long startFrom = pageSize*(pageNum-1)+1;
            for(long count=1;count<=totalLoopTimes;count++){
                if(reader.hasNext()){
                    //开始收集结果,读取一行的数据。feature:ScalaSimpleFeature
                    SimpleFeature feature = reader.next();
                    if(count >= startFrom){
                        //每行数据的主键值
                        String id = feature.getID();
                        //封装一行的数据
                        Map<String, String> oneRowData = new HashMap<>();
                        //先把唯一的feature ID装进去
                        oneRowData.put(pk, id);
                        Collection<Property> properties = feature.getProperties();
                        for (Property p : properties) {
                            //property: AttributeImpl
                            //AttributeImpl attrImpl = (AttributeImpl)p;
                            //获取字段名称
                            Name name = p.getName();
                            String pName = name.toString();
                            //获取字段值
                            Object attrObj = feature.getAttribute(name);
                            String pValue = attrObj==null?null:attrObj.toString();
                            oneRowData.put(pName, pValue);
                            // use geotools data utilities to get a printable string
                            // System.out.println(String.format("%02d", n) + " " + DataUtilities.encodeFeature(feature));
                        }
                        fields.addAll(oneRowData.keySet());
                        datas.add(oneRowData);
                    }
                }
            }
        }catch(IOException e){
            System.out.println(e.getMessage());
        }catch (NullPointerException e){
            System.out.println(e.getMessage());
        }
        rowDatas.put("data", datas);
        rowDatas.put("fields", fields);
        rowDatas.put("pk", pk);
        return rowDatas;
    }


    /**
     * 根据指定条件查询SimpleFeature对象。
     * @param dataStore
     * @param queries (多个)查询条件
     * @return SimpleFeature对象的列表
     */
    private List<SimpleFeature> queryFeatures(DataStore dataStore, List<Query> queries) {
        List<SimpleFeature> results = new ArrayList<>();
        if(dataStore == null){
            return results;
        }
        for (Query query : queries) {
            System.out.println("Running query " + ECQL.toCQL(query.getFilter()));
            if (query.getPropertyNames() != null) {
                System.out.println("Returning attributes " + Arrays.asList(query.getPropertyNames()));
            }
            if (query.getSortBy() != null) {
                SortBy sort = query.getSortBy()[0];
                System.out.println("Sorting by " + sort.getPropertyName() + " " + sort.getSortOrder());
            }
            // 提交查询,获取匹配查询条件的features,并遍历。
            // try语句确保reader可以关闭。
            try (FeatureReader<SimpleFeatureType, SimpleFeature> reader =
                         dataStore.getFeatureReader(query, Transaction.AUTO_COMMIT)) {
                long  n = 0;
                while (reader.hasNext()) {
                    SimpleFeature feature = reader.next();
                    results.add(feature);
                    n+=1;
                }
                System.out.println("Returned " + n + " total queried features");
            }catch(IOException e){
                System.out.println(e.getMessage());
            }catch (NullPointerException e){
                System.out.println(e.getMessage());
            }
        }
        return results;
    }
}
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.client.coprocessor.AggregationClient;
import org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter;
import org.apache.hadoop.hbase.coprocessor.AggregateImplementation;
import org.apache.hadoop.hbase.filter.*;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.ClusterMetrics;
import org.apache.zookeeper.KeeperException;

import java.io.IOException;
import java.util.*;

/**
 * @Author: gh
 * @Description: HBase表格数据查询,删除一条记录,新增一条记录,查询所有表格名称.
 */
public class HBaseDaoImpl implements HBaseDao {

    Connection conn = null;
    DataBaseDTO dataBaseDTO = null;

    public final String pk = "RowKey";

    //static Configuration conf = null;
    public HBaseDaoImpl(DataBaseDTO dto) {
        try {
            dataBaseDTO = dto;
            conn = connectToHBase();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public Connection getConn() {
        return conn;
    }

    public void setConn(Connection conn) {
        this.conn = conn;
    }

    /**
     * 关闭连接
     */
    public void close(){
        try{
            if(this.conn != null){
                conn.close();
            }
        }catch(IOException e){
            e.printStackTrace();
        }
    }
    public boolean connected(){
        if(getConn() == null){
            return false;
        }
        boolean tmp = isNamespaceExist(this.dataBaseDTO.getDbName());
        return !getConn().isClosed() && tmp;
    }
    /**
     * 安全通信:kerberos
     * zookeeper节点信息。
     *
     * @return true表示连接成功,false表示连接失败
     * @throws IOException
     */
    private org.apache.hadoop.hbase.client.Connection connectToHBase() {
        try {
            if (conn == null) {
                if (dataBaseDTO != null) {
                    Configuration conf = HBaseConfiguration.create();
                    String ip = dataBaseDTO.getIp();
                    String port = dataBaseDTO.getHost();
                    //NOTE1:如果ip不存在,会报错并retry30次:
                    // org.apache.zookeeper.ClientCnxn - Session 0x0 for server null,
                    // unexpected error, closing socket connection and attempting reconnect
                    //java.net.ConnectException: Connection refused: no further information
                    //org.apache.zookeeper.ClientCnxnSocketNIO - Ignoring exception during shutdown input
                    //java.nio.channels.ClosedChannelException: null
                    //NOTE2:如果ip存在,但不是zk的地址,会报错并retry:
                    // java.io.IOException: org.apache.zookeeper.KeeperException$NoNodeException:
                    // KeeperErrorCode = NoNode for /hbase/master
                    //TODO: 用户名、密码做校验
                    conf.set("hbase.zookeeper.quorum", ip+":"+port);
                    //conf.set("hbase.zookeeper.quorum", "zyb1:2181,zyb2:2181,zyb9:2181");
                    conf.setInt("zookeeper.recovery.retry", 0);
                    conf.setInt("zookeeper.session.timeout",30000);
                    conn =  ConnectionFactory.createConnection(conf);
                }
            }
        }catch(Exception e){
            e.printStackTrace();
        }finally {
            return conn;
        }
    }

    /**
     * 获取表名全称: namespace:tableName
     * @param tableName
     * @return
     */
    protected String getFullTableName(String tableName){
        if(this.dataBaseDTO != null){
            String dbName = this.dataBaseDTO.getDbName();
            if(!StringUtil.isEmpty(dbName)&&!StringUtil.isEmpty(tableName)){
                //表名中不包含:
                if(tableName.indexOf(":")<0){
                    return new StringBuffer(dbName).append(":").append(tableName).toString();
                }
            }
        }
        return tableName;
    }
    public boolean isNamespaceExist(String nameSpace){
        boolean flag = false;
        Admin admin = null;
        try {
            if(!StringUtils.isEmpty(nameSpace)){
                admin = conn.getAdmin();
                //直接使用admin.getNamespaceDescriptor,可能会报错:
                // org.apache.hadoop.hbase.NamespaceNotFoundException
                NamespaceDescriptor[] nsds = admin.listNamespaceDescriptors();
                for (NamespaceDescriptor nsd : nsds) {
                    if(nsd.getName().equals(nameSpace)){
                        flag = true;
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }finally{
            if(admin != null){
                try {
                    admin.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
        return flag;
    }

    @Override
    public void createTable(String tableName, String[] columnFamilys) {
        Admin admin = null;
        try {
            admin = conn.getAdmin();
            //拼接表名
            tableName = getFullTableName(tableName);
            TableName tblName = TableName.valueOf(tableName);
            if (admin.tableExists(tblName)) {
                System.err.println("此表已存在!");
            } else {
                HTableDescriptor htd = new HTableDescriptor(tblName);
                for (String columnFamily : columnFamilys) {
                    HColumnDescriptor hcd = new HColumnDescriptor(columnFamily);
                    htd.addFamily(hcd);
                }
                admin.createTable(htd);
                System.err.println("建表成功!");
            }
        } catch (MasterNotRunningException e) {
            e.printStackTrace();
        } catch (ZooKeeperConnectionException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        }finally{
            if(admin != null){
                try {
                    admin.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    @Override
    public int insert(String tableName, String rowKey, String family, String quailifer[], String value[]) {
        Table table = null;
        int rowCount = 0; //0 for nothing
        try {
            //拼接表名
            tableName = getFullTableName(tableName);
            TableName tblName = TableName.valueOf(tableName);
            if(checkTableEnabled(conn.getAdmin(),tblName)){
                table = conn.getTable(tblName);
                Put put = new Put(Bytes.toBytes(rowKey));
                // 批量添加
                for (int i = 0; i < quailifer.length; i++) {
                    String col = quailifer[i];
                    String val = value[i];
                    put.addColumn(Bytes.toBytes(family), Bytes.toBytes(col), Bytes.toBytes(val));
                }
                table.put(put);
                rowCount = 1; // row count of DML
                return rowCount;
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if(table != null){
                    table.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return rowCount;
    }
    @Override
    public List<Result> getAllRows(String tableName, Integer pageSize, Integer pageNum) {
        if (pageSize == null || pageSize < 1) {
            pageSize = 10;
        }
        if (pageNum == null || pageNum < 1) {
            pageNum = 1;
        }
        Table table = null;
        List<Result> list = new ArrayList<>();
        //拼接表名
        tableName = getFullTableName(tableName);
        try {
            TableName tblName = TableName.valueOf(tableName);
            if(checkTableEnabled(conn.getAdmin(),tblName)){
                table = conn.getTable(tblName);
                //分页:设置每页的最大容量;获取startrow,第1页的startrow=null。
                ResultScanner scanner = getPageScanner(table, pageNum, pageSize);
                //返回的条数
                int resultSize = pageSize;
                for (Result rs : scanner) {
                    if(resultSize>0){
                        list.add(rs);
                        resultSize = resultSize -1;
                    }
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if(table != null){
                    table.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return list;
    }

    /**
     * 分页过滤
     * 【注意:】该过滤器并不能保证返回的结果行数小于等于指定的页面行数,
     * 因为过滤器是分别作用到各个region server的,
     * 它只能保证当前region返回的结果行数不超过指定页面行数。
     * @param table 表对象
     * @param pageNum 当前页码
     * @param pageSize 当前页大小
     * @return
     * @throws Exception
     */
    private ResultScanner getPageScanner(Table table, int pageNum, int pageSize) throws Exception {
        Scan scan = new Scan();
        //第1页时,不需要设置startrow;其他页时,需要设置startrow
        Filter pageFilter = new PageFilter(pageSize);
        scan.setFilter(pageFilter);
        ResultScanner scanner = table.getScanner(scan);
        if (pageNum > 1) {
            for (int i = 1; i < pageNum; i++) {
                byte[] startRow = new byte[]{};
                for (Result result : scanner) {
                    startRow = result.getRow();
                }
                startRow = Bytes.add(startRow, new byte[]{0});
                scan.setStartRow(startRow);
                scanner = table.getScanner(scan);
            }
        }
        return scanner;
    }

    /**
     * 统计hbase中表的行数。方法:
     * 1.统计ResultScanner。效率和性能低下,不可取。
     * 2.使用协处理器中的“存储过程”--endpoint。
     * 3.Mapreduce任务。
     * @param tableName 表的名称
     * @return row的总行数
     */
    @Override
    public long totalCount(String tableName) {
        //拼接表名
        tableName = getFullTableName(tableName);
        return totalCount(tableName,new Scan());
    }

    /**
     * 统计hbase中表的行数。
     * @param tableName 表的名称
     * @param scan 扫描条件。
     * @return row的总行数
     */
    private long totalCount(String tableName,Scan scan){
        Admin admin = null;
        long rowCount = 0;
        long start = System.currentTimeMillis();
        System.out.println("start: "+new Date(start).toLocaleString());
        //使用协处理器时,(没有加载协处理器)报错:
        //org.apache.hadoop.hbase.exceptions.UnknownProtocolException:
        // No registered coprocessor service found for name AggregateService in region **.
        TableName tblName= null ;
        try {
            tblName = TableName.valueOf(tableName);
        } catch (Exception e) {
            e.printStackTrace();
        }
        if(tblName != null){
            try {
                //提前创建connection和conf
                Configuration conf = conn.getConfiguration();
                // conf.setBoolean("hbase.table.sanity.checks", false);
                admin = conn.getAdmin();
                //检查是不是已经添加过协处理器
                HTableDescriptor htd = admin.getTableDescriptor(tblName);
                String coprocessorName = AggregateImplementation.class.getName();
                boolean flag = htd.hasCoprocessor(coprocessorName);
                System.err.println("coprocessor: "+flag);
                if(!flag){
                    //先disable表,再添加协处理器。如果不disable,也会被自动disable。
                    asyncDisable(admin,tblName);
                    //添加协处理器
                    htd.addCoprocessor(coprocessorName);
                    //不修改表,协处理器就添加不到表属性中,会报错:UnknownProtocolException:
                    //No registered coprocessor service found for name AggregateService in region
                    admin.modifyTable(tblName, htd);
                    //enable table
                    asyncEnable(admin,tblName);
                }
                //使用协处理器进行row count
                AggregationClient ac = new AggregationClient(conf);
                //防止enable超时,表格依然是disable
                if(checkTableEnabled(admin, tblName)){
                    rowCount = ac.rowCount(tblName, new LongColumnInterpreter(), scan);
                }
                System.out.format("RowCount of %s : %s",tableName,rowCount);
                System.out.println();
            } catch (IOException e) {
                e.printStackTrace();
            }catch (Throwable e){
                e.printStackTrace();
            }finally{
                if(admin != null){
                    try {
                        asyncEnable(admin,tblName);
                        admin.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
        long end = System.currentTimeMillis();
        System.out.println("end: "+new Date(end).toLocaleString());
        System.out.println("统计耗时:"+(end-start)/1000+"s");
        return rowCount;
    }
    protected boolean checkTableEnabled(Admin admin,TableName tblName){
        try {
            return admin.isTableEnabled(tblName);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }
    private void asyncDisable(Admin admin,TableName tblName){
        try{
            if(admin.isTableEnabled(tblName)){
                System.err.println("==========disableTableAsync===========");
                admin.disableTableAsync(tblName);
                //如果没有禁用,就一直轮询
                long totalWait = 0;
                long maxWait = 30*1000;
                long sleepTime = 300;
                while(!admin.isTableDisabled(tblName)){
                    try {
                        Thread.sleep(sleepTime);
                        totalWait += sleepTime;
                        //最多等待30s
                        if(totalWait >= maxWait){
                            break;
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }catch(IOException e){
            e.printStackTrace();
        }
    }
    private void asyncEnable(Admin admin,TableName tblName){
        try{
            if(admin.isTableDisabled(tblName)){
                System.err.println("==========enableTableAsync===========");
                admin.enableTableAsync(tblName);
                //如果没有启用,就一直轮询
                long totalWait = 0;
                long maxWait = 30*1000;
                long sleepTime = 300;
                while(!admin.isTableEnabled(tblName)){
                    try {
                        Thread.sleep(sleepTime);
                        totalWait += sleepTime;
                        //最多等待30s
                        if(totalWait >= maxWait){
                            break;
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }catch(IOException e){
            e.printStackTrace();
        }
    }
    @Override
    public Result getRowByKey(String tableName, String rowKey) {
        Table table = null;
        //拼接表名
        tableName = getFullTableName(tableName);
        try {
            TableName tblName = TableName.valueOf(tableName);
            if(checkTableEnabled(conn.getAdmin(),tblName)){
                table = conn.getTable(tblName);
                Get get = new Get(Bytes.toBytes(rowKey));
                return table.get(get);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if(table != null){
                    table.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return null;
    }
    @Override
    public List<Result> getRowsByKey(String tableName, String rowKeyLike) {
        Table table = null;
        List<Result> list = list = new ArrayList<>();
        //拼接表名
        tableName = getFullTableName(tableName);
        try {
            TableName tblName = TableName.valueOf(tableName);
            if(checkTableEnabled(conn.getAdmin(),tblName)){
                table = conn.getTable(tblName);
                PrefixFilter filter = new PrefixFilter(Bytes.toBytes(rowKeyLike));
                Scan scan = new Scan();
                scan.setFilter(filter);
                ResultScanner scanner = table.getScanner(scan);
                for (Result rs : scanner) {
                    list.add(rs);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if(table != null){
                    table.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return list;
    }

    @Override
    public List<Result> getRowsByKey(String tableName, String startRow, String stopRow) {
        Table table = null;
        List<Result> list = new ArrayList<>();
        //拼接表名
        tableName = getFullTableName(tableName);
        try {
            TableName tblName = TableName.valueOf(tableName);
            if(checkTableEnabled(conn.getAdmin(),tblName)){
                table = conn.getTable(tblName);
                Scan scan = new Scan();
                scan.setStartRow(Bytes.toBytes(startRow));
                scan.setStopRow(Bytes.toBytes(stopRow));
                ResultScanner scanner = table.getScanner(scan);
                for (Result rs : scanner) {
                    list.add(rs);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if(table != null){
                    table.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return list;
    }

    @Override
    public List<Result> getRowsByKey(String tableName, String rowKeyLike, String family, String cols[]) {
        Table table = null;
        List<Result> list = new ArrayList<>();
        //拼接表名
        tableName = getFullTableName(tableName);
        try {
            TableName tblName = TableName.valueOf(tableName);
            if(checkTableEnabled(conn.getAdmin(),tblName)){
                table = conn.getTable(tblName);
                PrefixFilter filter = new PrefixFilter(Bytes.toBytes(rowKeyLike));
                Scan scan = new Scan();
                if (cols == null || cols.length < 1) {
                    scan.addFamily(Bytes.toBytes(family));
                } else {
                    for (int i = 0; i < cols.length; i++) {
                        scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(cols[i]));
                    }
                }
                scan.setFilter(filter);
                ResultScanner scanner = table.getScanner(scan);
                for (Result rs : scanner) {
                    list.add(rs);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if(table != null){
                    table.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return list;
    }
    @Override
    public int deleteRecords(String tableName, String rowKeyLike) {
        Table table = null;
        int rowCount = 0;
        //拼接表名
        tableName = getFullTableName(tableName);
        try {
            TableName tblName = TableName.valueOf(tableName);
            if(checkTableEnabled(conn.getAdmin(),tblName)){
                table = conn.getTable(tblName);
                PrefixFilter filter = new PrefixFilter(Bytes.toBytes(rowKeyLike));
                Scan scan = new Scan();
                scan.setFilter(filter);
                ResultScanner scanner = table.getScanner(scan);
                List<Delete> list = new ArrayList<Delete>();
                for (Result rs : scanner) {
                    Delete del = new Delete(rs.getRow());
                    list.add(del);
                    rowCount ++;
                }
                table.delete(list);
                return rowCount;
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if(table != null){
                    table.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return rowCount;
    }
    @Override
    public int deleteRecord(String tableName, String rowKey) {
        Table table = null;
        //拼接表名
        tableName = getFullTableName(tableName);
        try {
            TableName tblName = TableName.valueOf(tableName);
            if(checkTableEnabled(conn.getAdmin(),tblName)){
                table = conn.getTable(tblName);
                //确认rowkey是否存在
                Get get = new Get(Bytes.toBytes(rowKey));
                Result result = table.get(get);
                List<Cell> cells = result.listCells();
                if(cells != null && cells.size() > 0){
                    //即使rowkey不存在,删除也不会报错。
                    Delete del = new Delete(Bytes.toBytes(rowKey));
                    table.delete(del);
                    return 1;
                }
            }
            return 0;
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        } finally {
            try {
                if(table != null){
                    table.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    @Override
    public Map<String, List<String>> getNameSpaceTables() {
        Map<String, List<String>> result = new HashMap<>();
        Admin admin = null;
        try {
            admin = conn.getAdmin();
            NamespaceDescriptor[] namespaces = admin.listNamespaceDescriptors();
            for (NamespaceDescriptor ns : namespaces) {
                String nsName = ns.getName();
                TableName[] tableNames = admin.listTableNamesByNamespace(nsName);
                List<String> tblList = new ArrayList<>();
                for (TableName name : tableNames) {
                    String tableName = name.getNameAsString();
                    //非default命名空间的表名有前缀 hbase:meta
                    tableName = tableName.contains(":") ? tableName.split(":")[1] : tableName;
                    tblList.add(tableName);
                    /*List<RegionInfo> regions = admin.getRegions(name);
                    for (RegionInfo region : regions) {
                        System.out.println("region id: "+region.getRegionId());
                    }
                    System.out.println("--------------------");*/
                }
                result.put(nsName, tblList);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }finally{
            if(admin != null){
                try {
                    admin.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
        return result;
    }
    @Override
    public List<String> getTablenamesOfDB(String nameSpace) {
        List<String> list = new ArrayList<>();
        Admin admin = null;
        try {
            admin = conn.getAdmin();
            TableName[] tableNames = admin.listTableNamesByNamespace(nameSpace);
            for (TableName name : tableNames) {
                //判断table是不是enable的//过滤系统表
                if(checkTableEnabled(admin, name) && !name.isSystemTable()){
                    String tableName = name.getNameAsString();
                    //非default命名空间的表名有前缀 hbase:meta
                    tableName = tableName.contains(":") ? tableName.split(":")[1] : tableName;
                    list.add(tableName);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }finally{
            if(admin != null){
                try {
                    admin.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
        return list;
    }
    public int getTableStatus(String tableName){
        //拼接表名
        tableName = getFullTableName(tableName);
        Admin admin = null;
        try {
            admin = conn.getAdmin();
            TableName tn = TableName.valueOf(tableName);
            boolean exist = admin.tableExists(tn);
            if(exist){
                //表存在,继续判断是否可用。
                boolean disabled = admin.isTableDisabled(tn);
                if(disabled){
                    //0:禁用
                    return 0;
                }else{
                    //1:可用
                    return 1;
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }finally{
            if(admin != null){
                try {
                    admin.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
        //-1:表不存在
        return -1;
    }
    /**
     * 从HBase表的结果集提取出具体的字段值,即每行包括主键、列族、列名、列值。
     *
     * @param results
     * @param b 用来区分 是否返回geom 字段数据 true 是返回 false 是不返回
     */
    public Map<String,Object> extractResults(List<Result> results, boolean b) {
        Map<String,Object> rowDatas = new HashMap<>();
        List<Map<String,String>> data = new ArrayList<>();
        Set<String> fields = new HashSet<>();
        try {
            for (Result r : results) {
                //直接遍历Cell会有乱码
                //Cell[] cells = r.rawCells();
                Map<String, String> oneRowData = new HashMap<>();
                //TODO:
                oneRowData.put(pk, Bytes.toString(r.getRow()));
                NavigableMap<byte[], NavigableMap<byte[], byte[]>> maps = r.getNoVersionMap();
                //遍历列族
                NavigableSet<byte[]> keys = maps.navigableKeySet();
                for (byte[] key : keys) {
                    //TODO:
                    String columnFamily = Bytes.toString(key);
                    NavigableMap<byte[], byte[]> values = maps.get(key);
                    //遍历某个列族下的列
                    NavigableSet<byte[]> ks = values.navigableKeySet();
                    for (byte[] k : ks) {
                        //TODO:
                        String qualifierName = Bytes.toString(k);
                        String qualifierValue = Bytes.toString(values.get(k));
                        String fieldName = columnFamily + "." + qualifierName;
                        if(b){
                            oneRowData.put(fieldName, qualifierValue);
                        }else{
                            if(!"geom".equals(qualifierName)){
                                oneRowData.put(fieldName, qualifierValue);
                            }
                        }
                    }
                }
                fields.addAll(oneRowData.keySet());
                data.add(oneRowData);
            }
            rowDatas.put("data", data);
            rowDatas.put("fields", fields);
            rowDatas.put("pk", pk);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return rowDatas;
    }
    @Override
    public double usedRate() {
        double rate = 0.0;
        Admin admin = null;
        try {
            admin = conn.getAdmin();
            //获取region数据总量和总大小
            long regionSize = admin.getConfiguration()
                    .getLong("hbase.hregion.max.filesize", 10737418240L); //byte
            ClusterStatus clusterStatus = admin.getClusterStatus();
            int regionsCount = clusterStatus.getRegionsCount();
            final long totalRegionSizes = regionSize * regionsCount;
            Collection<ServerName> liveServers = clusterStatus.getServers();
            //获取storefile总大小(默认累加单位为MB)
            double totalHFileSizes = 0;
            for (ServerName liveServer : liveServers) {
                ServerLoad serverLoad = clusterStatus.getLoad(liveServer);
                if(serverLoad != null){
                    int storefileSizeInMB = serverLoad.getStorefileSizeInMB();
                    totalHFileSizes += (storefileSizeInMB);
                }
            }
            rate = (totalHFileSizes*1024*1024) / totalRegionSizes;
        } catch (IOException e) {
            e.printStackTrace();
        }finally{
            if(admin != null){
                try {
                    admin.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
        return Double.parseDouble(String.format("%.2f",rate*100));
    }
    @Override
    public double storeSizeOfDB(SizeUnitEnum unit) {
        return storeSizeOfDB(null,unit);
    }
    @Override
    public double storeSizeOfDB(String nameSpace, SizeUnitEnum unit) {
        //获取storefile总大小(默认累加单位为MB)
        double totalHFileSizes = 0.00;
        Admin admin = null;
        try {
            admin = conn.getAdmin();
            ClusterStatus clusterStatus = admin.getClusterStatus();
            Collection<ServerName> liveServers = clusterStatus.getServers();
            if(nameSpace == null){
                //获取所有的storefile总大小
                for (ServerName server : liveServers) {
                    //当前活着的server
                    ServerLoad serverLoad = clusterStatus.getLoad(server);
                    if(serverLoad != null){
                        int storefileSizeInMB = serverLoad.getStorefileSizeInMB();
                        totalHFileSizes += (storefileSizeInMB);
                    }
                }
            }else if(isNamespaceExist(nameSpace)){
                //获取指定namespace下的所有表的storefile的大小
                TableName[] tableNames = admin.listTableNamesByNamespace(nameSpace);
                List<HRegionInfo> totalRegions = new ArrayList<>();
                for (TableName tableName : tableNames) {
                    //获取当前表分布在哪些region上
                    List<HRegionInfo> currentRegions = admin.getTableRegions(tableName);
                    totalRegions.addAll(currentRegions);
                }
                for (ServerName server : liveServers) {
                    //当前活着的server
                    ServerLoad serverLoad = clusterStatus.getLoad(server);
                    //当前server中所有的regions
                    Map<byte[], RegionLoad> regionLoads = serverLoad.getRegionsLoad();
                    //查看哪些regions是属于当前namespace中的表的
                    for (HRegionInfo tableRegion : totalRegions) {
                        //获取以上各个region中storefile的大小
                        RegionLoad rl = regionLoads.get(tableRegion.getRegionName());
                        if(rl != null){
                            totalHFileSizes += (rl.getStorefileSizeMB());
                        }
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }finally{
            if(admin != null){
                try {
                    admin.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
        return FileSizeUtil.valueOf(totalHFileSizes*1024*1024, unit);
    }
    @Override
    public double storeSizeOfTbl(String nameSpace,String[] tables, SizeUnitEnum unit) {
        //获取storefile总大小(默认累加单位为MB)
        double totalHFileSizes = 0.00;
        Admin admin =null;
        try {
            boolean flag = isNamespaceExist(nameSpace);
            if(flag && tables != null){
                admin = conn.getAdmin();
                ClusterStatus clusterStatus = admin.getClusterStatus();
                Collection<ServerName> liveServers = clusterStatus.getServers();
                List<HRegionInfo> totalRegions = new ArrayList<>();
                for (String table : tables) {
                    //获取当前表分布在哪些region上
                    String fullTableName = nameSpace+":"+table;
                    if(table.contains(":")){
                        fullTableName = table;
                    }
                    TableName currentTable = TableName.valueOf(fullTableName);
                    List<HRegionInfo> currentRegions = admin.getTableRegions(currentTable);
                    totalRegions.addAll(currentRegions);
                }
                for (ServerName server : liveServers) {
                    //当前活着的server
                    ServerLoad serverLoad = clusterStatus.getLoad(server);
                    //当前server中所有的regions
                    Map<byte[], RegionLoad> regionLoads = serverLoad.getRegionsLoad();
                    //查看哪些regions是属于当前namespace中的表的
                    for (HRegionInfo tableRegion : totalRegions) {
                        //获取以上各个region中storefile的大小
                        RegionLoad rl = regionLoads.get(tableRegion.getRegionName());
                        if(rl != null){
                            totalHFileSizes += (rl.getStorefileSizeMB());
                        }
                    }
                }
            }
        } catch (Exception e) {
            System.out.println("===================="+e.getMessage());
            e.printStackTrace();
        }finally{
            if(admin != null){
                try {
                    admin.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
        return FileSizeUtil.valueOf(totalHFileSizes*1024*1024, unit);
    }
    @Override
    public int countTables(String nameSpace) {
        List<String> list = getTablenamesOfDB(nameSpace);
        return list.size();
    }
    @Override
    public List<String> getTables(String nameSpace) {
        List<String> list = getTablenamesOfDB(nameSpace);
        return list;
    }
    @Override
    public List<Result> queryByRandomField(String tableName, String familyName,String qualifier,
                                           String fieldValue,Integer pageSize,Integer pageNum) {
        if (pageSize == null || pageSize < 1) {
            pageSize = 10;
        }
        if (pageNum == null || pageNum < 1) {
            pageNum = 1;
        }
        Table table = null;
        List<Result> list = new ArrayList<>();
        //拼接表名
        tableName = getFullTableName(tableName);
        try {
            TableName tblName = TableName.valueOf(tableName);
            if(checkTableEnabled(conn.getAdmin(),tblName)){
                table = conn.getTable(tblName);
                Scan scan = new Scan();
                Filter columnFilter = null;
                if(pk.equals(qualifier)){
                    //过滤1:模糊匹配rowkey的值,找到所有row
                    columnFilter = new RowFilter(CompareFilter.CompareOp.EQUAL,
                            new SubstringComparator(fieldValue));
                }else{
                    //过滤2:模糊匹配某个字段的值(非rowkey)
                    columnFilter = new SingleColumnValueFilter(
                            Bytes.toBytes(familyName),
                            Bytes.toBytes(qualifier),
                            CompareFilter.CompareOp.EQUAL,
                            new SubstringComparator(fieldValue));
                }
                //过滤3:页面大小,为分页准备
                Filter pageFilter = new PageFilter(pageSize);
                //添加过滤器
                List<Filter> filters = new ArrayList<>();
                filters.add(columnFilter);
                filters.add(pageFilter);
                scan.setFilter(new FilterList(filters));
                //查询
                ResultScanner scanner = table.getScanner(scan);
                if (pageNum > 1) {
                    for (int i = 1; i < pageNum; i++) {
                        byte[] startRow = new byte[]{};
                        for (Result result : scanner) {
                            startRow = result.getRow();
                        }
                        startRow = Bytes.add(startRow, new byte[]{0});
                        scan.setStartRow(startRow);
                        scanner = table.getScanner(scan);
                    }
                }
                int resultSize = pageSize;
                for (Result rs : scanner) {
                    if(resultSize>0){
                        list.add(rs);
                        resultSize = resultSize -1;
                    }
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if(table != null){
                    table.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return list;
    }

    @Override
    public long totalCountOfFuzzyQuery(String tableName,String familyName,
                                       String qualifier,String fieldValue) {
        //拼接表名
        tableName = getFullTableName(tableName);
        //添加scan条件
        Scan scan = new Scan();
        Filter columnFilter = null;
        if(pk.equals(qualifier)){
            //过滤1:模糊匹配rowkey的值,找到所有row
            columnFilter = new RowFilter(CompareFilter.CompareOp.EQUAL,
                    new SubstringComparator(fieldValue));
        }else{
            //过滤2:模糊匹配某个字段的值(非rowkey)
            columnFilter = new SingleColumnValueFilter(
                    Bytes.toBytes(familyName),
                    Bytes.toBytes(qualifier),
                    CompareFilter.CompareOp.EQUAL,
                    new SubstringComparator(fieldValue));
        }
        //添加过滤器
        scan.setFilter(columnFilter);
        //使用coprocessor计数
        return totalCount(tableName, scan);
    }

    @Override
    public Map<String, String> getColumnNames(String tableName) {
        return getColumnNames(tableName,null,null);
    }
    private Map<String, String> getColumnNames(String tableName,Integer pageSize,Integer pageNum){
        List<Result> someRows = getAllRows(tableName, pageSize, pageNum);
        Map<String, Object> map = extractResults(someRows, true);
        Object fields = map.get("fields");
        Map<String, String> columnNames = new HashMap<>();
        if(fields != null){
            //有列的名称
            Set<String> set = (Set<String>)fields;
            for (String field : set) {
                columnNames.put(field, "");
            }
        }
        return columnNames;
    }
}

 

依赖:

<properties>
        <geomesa.version>2.2.1</geomesa.version>
        <gt.version>20.0</gt.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <scala.abi.version>2.11</scala.abi.version>
        <scala.version>2.11.7</scala.version>
        <!-- environment specific variables-->
        <zookeeper.version>3.4.9</zookeeper.version>
        <hadoop.version>2.6.0</hadoop.version>
        <hbase.hadoop.version>2.6.0</hbase.hadoop.version>
        <kafka.version>1.0.0</kafka.version>
        <hbase.version>1.3.1</hbase.version>
        <spark.version>2.4.0</spark.version>
    </properties>


    <dependencies>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.33.Final</version>
        </dependency>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty</artifactId>
            <version>3.6.2.Final</version>
        </dependency>
        <dependency>
            <groupId>commons-cli</groupId>
            <artifactId>commons-cli</artifactId>
            <version>1.4</version>
        </dependency>

        <!--<dependency>
            <groupId>org.locationtech.geomesa</groupId>
            <artifactId>geomesa-hbase-tools_2.11</artifactId>
            <version>${geomesa.version}</version>
        </dependency>
        <dependency>
            <groupId>org.locationtech.geomesa</groupId>
            <artifactId>geomesa-hbase-datastore_${scala.abi.version}</artifactId>
            <version>${geomesa.version}</version>
        </dependency>-->
        <dependency>
            <groupId>org.locationtech.geomesa</groupId>
            <artifactId>geomesa-hbase-spark-runtime_${scala.abi.version}</artifactId>
            <version>${geomesa.version}</version>
        </dependency>

        <dependency>
            <groupId>org.locationtech.geomesa</groupId>
            <artifactId>geomesa-spark-core_2.11</artifactId>
            <version>${geomesa.version}</version>
        </dependency>
        <dependency>
            <groupId>org.locationtech.geomesa</groupId>
            <artifactId>geomesa-spark-jts_2.11</artifactId>
            <version>${geomesa.version}</version>
        </dependency>
        <!--<dependency>
            <groupId>org.locationtech.geomesa</groupId>
            <artifactId>geomesa-index-api_2.11</artifactId>
            <version>${geomesa.version}</version>
        </dependency>-->
        <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
            <version>42.2.5</version>
        </dependency>
        <!--<dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>${hbase.version}</version>
        </dependency>-->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-common</artifactId>
            <version>${hbase.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>${hbase.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-annotations</artifactId>
            <version>${hbase.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-protocol</artifactId>
            <version>${hbase.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-auth</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>${zookeeper.version}</version>
        </dependency>
        <dependency>
            <groupId>org.geotools</groupId>
            <artifactId>gt-opengis</artifactId>
            <version>${gt.version}</version>
        </dependency>
        <!--<dependency>
            <groupId>org.geotools</groupId>
            <artifactId>gt-data</artifactId>
            <version>${gt.version}</version>
        </dependency>-->
        <dependency>
            <groupId>org.geotools</groupId>
            <artifactId>gt-epsg-hsql</artifactId>
            <version>${gt.version}</version>
        </dependency>
       <!--
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-catalyst_${scala.abi.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.abi.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.abi.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
       <!-- <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-yarn_${scala.abi.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>-->
    </dependencies>

版权声明:本文为koukan3原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。