背景
- 大数据架构业务场景中需要实时数据落入ES,基本上是业务数据,目的是为了封装后作为规则引擎的变量提供,是变量系统的一部分;
- 架构数据流来源于Maxwell,Spark Streaming做数据流处理,落库使用RestHighLevelClient的同步提交Bulk写入;增删改此文档不涉及,主要就是客户端以及查询的封装,为后续变量系统的应用部分;
客户端
streaming直接使用
def createESClientNew(): RestHighLevelClient = {
new RestHighLevelClient(
RestClient.builder(
new HttpHost("host", 9200, "http"),
new HttpHost("host", 9200, "http"),
new HttpHost("host", 9200, "http")
)
)
}
集群节点之间使用,序列化
public class BaseEsClientSerializable implements Serializable {
public RestHighLevelClient getClient() {
return null;
}
}
public class EsClientSerializable extends BaseEsClientSerializable {
public static RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(new HttpHost("host", 9200, "http")));
@Override
public RestHighLevelClient getClient() {
return client;
}
}
配置连接池查询
public class EsClientPoolFactory implements PooledObjectFactory<RestHighLevelClient> {
/**
* 生产对象
* @return
* @throws Exception
*/
@Override
public PooledObject<RestHighLevelClient> makeObject() throws Exception {
RestHighLevelClient client = null;
try {
client = new RestHighLevelClient(RestClient.builder(
new HttpHost("host", 9200, "http")));
} catch (Exception e) {
e.printStackTrace();
}
return new DefaultPooledObject<>(client);
}
/**
* 销毁对象
* @param pooledObject
* @throws Exception
*/
@Override
public void destroyObject(PooledObject<RestHighLevelClient> pooledObject) throws Exception {
RestHighLevelClient highLevelClient = pooledObject.getObject();
highLevelClient.close();
}
@Override
public boolean validateObject(PooledObject<RestHighLevelClient> pooledObject) {
return true;
}
@Override
public void activateObject(PooledObject<RestHighLevelClient> pooledObject) throws Exception {
System.out.println("activateObject");
}
@Override
public void passivateObject(PooledObject<RestHighLevelClient> pooledObject) throws Exception {
System.out.println("passivateObject");
}
}
public class ElasticSearchPoolUtil {
// 对象池配置类,不写也可以,采用默认配置
private static GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
// 采用默认配置maxTotal是8,池中有8个client
static {
poolConfig.setMaxTotal(8);
}
// 要池化的对象的工厂类,这个是我们要实现的类
private static EsClientPoolFactory esClientPoolFactory = new EsClientPoolFactory();
// 利用对象工厂类和配置类生成对象池
private static GenericObjectPool<RestHighLevelClient> clientPool = new GenericObjectPool<>(esClientPoolFactory,
poolConfig);
/**
* 获得对象
*
* @return
* @throws Exception
*/
public static RestHighLevelClient getClient() throws Exception {
// 从池中取一个对象
RestHighLevelClient client = clientPool.borrowObject();
return client;
}
/**
* 归还对象
*
* @param client
*/
public static void returnClient(RestHighLevelClient client) {
// 使用完毕之后,归还对象
clientPool.returnObject(client);
}
}
查询
- 没有封装的很完全,多个熟悉之后按照业务场景添加就好,目的就是后续查询不需要开发,直接配置条件即可;
多条件match,过滤查询
/**
* 多条件match,过滤查询
*
* @param client 客户端
* @param index 索引
* @param matchs 条件组,json中kv
* @return hits
*/
def searchHitsByMatchs(
client: RestHighLevelClient,
index: String,
matchs: JSONObject,
size: Int
): SearchResponse = {
val searchRequest = new SearchRequest(index)
val searchSourceBuilder = new SearchSourceBuilder().size(size)
val boolQueryBuilder = QueryBuilders.boolQuery()
/**
* 处理match,做匹配
*/
matchs.keySet().toArray().foreach(jsonKey => {
val matchQueryBuilder = QueryBuilders.matchQuery(jsonKey.toString, matchs.getString(jsonKey.toString))
boolQueryBuilder.must().add(matchQueryBuilder)
})
/**
* 添加bool查询构建
*/
searchSourceBuilder.query(boolQueryBuilder)
searchRequest.source(searchSourceBuilder)
client.search(
searchRequest,
RequestOptions.DEFAULT
)
}
多条件term,过滤查询
/**
* 多条件term,过滤查询
*
* @param client 客户端
* @param index 索引
* @param terms 条件组,json中kv
* @return hits
*/
def searchHitsByTerms(client: RestHighLevelClient,
index: String,
terms: JSONObject,
size: Int
): SearchResponse = {
val searchRequest = new SearchRequest(index)
val searchSourceBuilder = new SearchSourceBuilder().size(size)
val boolQueryBuilder = QueryBuilders.boolQuery()
/**
* 处理term
*/
terms.keySet().toArray().foreach(jsonKey => {
val termQueryBuilder = QueryBuilders.termQuery(jsonKey.toString, terms.getString(jsonKey.toString))
boolQueryBuilder.must().add(termQueryBuilder)
})
searchSourceBuilder.query(boolQueryBuilder)
searchRequest.source(searchSourceBuilder)
client.search(
searchRequest,
RequestOptions.DEFAULT
)
}
多条件match查询返回count
/**
* 多条件match查询返回count
*
* @param client
* @param index
* @param matchs
* @return
*/
def searchCountByMatchs(
client: RestHighLevelClient,
index: String,
matchs: JSONObject
): CountResponse = {
val countRequest: CountRequest = new CountRequest(index)
val searchSourceBuilder = new SearchSourceBuilder()
val boolQueryBuilder = QueryBuilders.boolQuery()
/**
* 处理match,做匹配
*/
matchs.keySet().toArray().foreach(jsonKey => {
val matchQueryBuilder = QueryBuilders.matchQuery(jsonKey.toString, matchs.getString(jsonKey.toString))
boolQueryBuilder.must().add(matchQueryBuilder)
})
/**
* 添加bool查询构建
*/
searchSourceBuilder.query(boolQueryBuilder)
countRequest.source(searchSourceBuilder)
client.count(
countRequest,
RequestOptions.DEFAULT
)
}
多条件match聚合查询
/**
* 多条件match聚合查询<avg , sum , max , min>
*
* @param client 客户端
* @param index 索引
* @param matchs 条件组,json中kv
* @param returnFieldName 返回聚合的名字
* @param sumField 聚合的字段
* @param aggregationType <SUM , AVG , MAX , MIN , 默认值为count> 聚合类型
* @return
*/
def searchSumByMatchs(
client: RestHighLevelClient,
index: String,
matchs: JSONObject,
returnFieldName: String,
sumField: String,
aggregationType: String
): SearchResponse = {
val searchRequest = new SearchRequest(index)
val searchSourceBuilder = new SearchSourceBuilder()
val boolQueryBuilder = QueryBuilders.boolQuery()
/**
* 处理match,做匹配
*/
matchs.keySet().toArray().foreach(jsonKey => {
val matchQueryBuilder = QueryBuilders.matchQuery(jsonKey.toString, matchs.getString(jsonKey.toString))
boolQueryBuilder.must().add(matchQueryBuilder)
})
/**
* 添加bool查询构建
*/
searchSourceBuilder.query(boolQueryBuilder)
/**
* 聚合查询,根据不同入参算不同结果
*
* feild : Sets the field to use for this aggregation. 设置用于此聚合的字段。
* sum : Create a new {@link Sum} aggregation with the given name.用给定的名称创建一个新的{@link Sum}聚合。
*/
val aggregationBuilder: ValuesSourceAggregationBuilder.LeafOnly[_ >: ValuesSource.Numeric <: ValuesSource, _ >:
SumAggregationBuilder with MaxAggregationBuilder with MinAggregationBuilder with AvgAggregationBuilder with ValueCountAggregationBuilder <:
ValuesSourceAggregationBuilder.LeafOnly[_ >:
ValuesSource.Numeric <: ValuesSource, _ >: SumAggregationBuilder with MaxAggregationBuilder with MinAggregationBuilder with AvgAggregationBuilder with ValueCountAggregationBuilder]] =
aggregationType match {
case "SUM" =>
AggregationBuilders.sum(returnFieldName).field(sumField)
case "MAX" =>
AggregationBuilders.max(returnFieldName).field(sumField)
case "MIN" =>
AggregationBuilders.min(returnFieldName).field(sumField)
case "AVG" =>
AggregationBuilders.avg(returnFieldName).field(sumField)
case _ =>
AggregationBuilders.count(returnFieldName).field(sumField)
}
/**
* 添加聚合查询
*/
searchSourceBuilder.aggregation(aggregationBuilder)
/**
* 将所有条件添加到请求
*/
searchRequest.source(searchSourceBuilder)
client.search(
searchRequest,
RequestOptions.DEFAULT
)
}
数据解析
解析searchResponse为SearchHit 集合
/**
* 解析 searchResponse 为 SearchHit 集合
* @param searchResponse
* @return
*/
def analysisSearchResponseToHits(
searchResponse : SearchResponse
): Array[SearchHit] ={
searchResponse.getHits.getHits
}
解析CountResponse为long值
/**
* 解析 CountResponse 为 long 值
* @param countResponse
* @return
*/
def analysisCountResponseToLong(countResponse : CountResponse): Long ={
countResponse.getCount
}
解析searchResponse为各种聚合Double值
/**
* 解析 searchResponse 为 各种聚合Double 值
* @param searchResponse
* @param returnFieldName
* @param aggregationType
* @return
*/
def analysisSearchResponseToAggregationFloat(
searchResponse : SearchResponse ,
returnFieldName: String ,
aggregationType: String
): Double ={
val aggregations = searchResponse.getAggregations
/**
* 得到单个Aggregation并进行实现
*/
val aggregation: Aggregation = aggregations.get(returnFieldName).asInstanceOf[Aggregation]
/**
* 根据Aggregation的接口转实现类
* An aggregation. Extends {@link ToXContent} as it makes it easier to print out its content.
*/
aggregationType match {
case "SUM" =>
aggregation.asInstanceOf[ParsedSum].value()
case "MAX" =>
aggregation.asInstanceOf[ParsedMax].value()
case "MIN" =>
aggregation.asInstanceOf[ParsedMin].value()
case "AVG" =>
aggregation.asInstanceOf[ParsedAvg].value()
case _ =>
aggregation.asInstanceOf[ParsedValueCount].value()
}
}
调用使用
/**
* 连接
*/
val client = EsClientSerializable.client
/**
* 测试searchCountByMatchs
*/
val matchs = new JSONObject()
matchs.put("cert_no.keyword", "xxx")
val countResponse = searchCountByMatchs(
client,
"vs_fms_repay_plan_wide_table_new",
matchs
)
println(SingleResponseAnalysisUtils.analysisCountResponseToLong(countResponse))
/**
* 测试 searchSumByMatchs
*/
val searchResponse = searchSumByMatchs(
client,
"vs_fms_repay_plan_wide_table_new",
matchs,
"result_principal",
"principal",
"MAX"
)
println(SingleResponseAnalysisUtils.analysisSearchResponseToAggregationFloat(
searchResponse,
"result_principal",
"MAX"
))
说明
- 后续会将ES实际项目中的部分逐渐更新出来;
- 包含在大数据场景下:实时、离线不同项目中的运用;
版权声明:本文为Kevin__Durant原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。