聚合概念
聚合就相当于是数据库中的分组(GROUP BY) 但是他比GROUP BY更加的强大 \
聚合类型三大类
Bucketing(桶聚合)
- Date Histogram Aggregation:根据日期阶梯分组,例如给定阶梯为周,会自动每周分为一组
- Histogram Aggregation:根据数值阶梯分组,与日期类似
- Terms Aggregation:根据词条内容分组,词条内容完全匹配的为一组
- Range Aggregation:数值和日期的范围分组,指定开始和结束,然后按段分组
- Missing Aggregation:统计文档中缺失字段的数量,缺失字段包含值为null的情况
- Filter Aggregation:对经过Filter条件过滤后的结果集进行聚合查询
每个桶都与一个键和一个文档标准相关联,通过桶的聚合查询,我们将得到一个桶的列表,即:满足条 件的文档集合。是按照某种方式对数据进行分组 比如对1,2,3,1,3使用Terms对其聚合,可以得到1桶,2桶,3桶。
看出ES的分组方式相当强大,mysql的group by只能实现类似Terms Aggregation的分组效果,而ES还可以根据阶梯和范围来分组。
Pipeline(管道)
对其他聚合的输出或相关指标进行二次聚合 比如Terms聚合后拿到了1,2,3桶这个时候我们可以在对他其他的属性进行聚合 也就是对结果在聚合 和数据库中 多字段分组一个意思(多字段分组可以使用script脚本去聚合需要添加配置 但是我试了下没什么用 如果有那个大神有好的解决办法可以分享出来 我这边用的是管道聚合也就是聚合套聚合 或者是在洗数据的时候拼接好分组字段)
Metric(指标)
指标聚合类似于 COUNT() 、 SUM() 、 MAX() 等统计方法
- Avg Aggregation:求平均值
- Max Aggregation:求最大值
- Min Aggregation:求最小值
- Percentiles Aggregation:求百分比
- Stats Aggregation:同时返回avg、max、min、sum、count等
- Sum Aggregation:求和
- Top hits Aggregation:求前几
- Value Count Aggregation:求总数
Term Aggregation
GET pre_package/_search
{
"aggs": {
"Group": {
"terms": {
"field": "productGroupId",
"size": 10
}
}
}
}
这个表示,查询索引为pre_package中的文档数据,并按照cargoOwnerName进行聚合查询,命名为:Group,且只查询前10条
Range Aggregation
GET pre_package/_search
{
"aggs": {
"Group": {
"range": {
"field": "isPrePackage",
"ranges": [
{
"to": 1
},
{
"from": 1,
"to": 2
},
{
"from": 2
}
]
}
}
}
}
按照isPrePackage属性,分为三档,分别为:小于1,1到2,大于2
Date Range Aggregation
GET pre_package/_search
{
"aggs": {
"Group": {
"date_range": {
"field": "update_date",
"ranges": [
{
"to": "2020-05-01 00:00:00"
},
{
"from": "2020-05-02 00:00:00",
"to": "2020-08-01 00:00:00"
},
{
"from": "2020-08-02 00:00:00"
}
]
}
}
}
}
基于时间范围的聚合查询
Filter Aggregation
GET pre_package/_search
{
"aggs": {
"flight_Miles": {
"filter": {
"term": {
"cargoOwnerName": "联合利华测试"
}
}
}
}
}
对经过Filter条件过滤后的结果集进行聚合查询
Missing Aggregation
GET pre_package/_search
{
"aggs": {
"without_age": {
"missing": {
"field": "orderTypeName"
}
}
}
}
统计文档中缺失字段的数量,缺失字段包含值为null的情况
Histogram Aggregation
GET pre_package/_search
{
"aggs": {
"test": {
"histogram": {
"field": "id",
"interval": 100
}
}
}
}
直方图聚合,可按照一定的区间进行统计
ElasticsearchRestTemplate操作帮助类
package com.xxl.job.executor.utils;
import com.alibaba.fastjson.JSON;
import com.google.common.collect.Lists;
import com.xxl.job.executor.es.EsSortPage;
import com.xxl.job.executor.es.QueryCondition;
import com.xxl.job.executor.es.TermQueryReq;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.AvgAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.ParsedAvg;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Sort;
import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
import org.springframework.data.elasticsearch.core.IndexOperations;
import org.springframework.data.elasticsearch.core.SearchHit;
import org.springframework.data.elasticsearch.core.SearchHits;
import org.springframework.data.elasticsearch.core.document.Document;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.*;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
import org.springframework.stereotype.Component;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @description: es操作类(ElasticsearchRestTemplate)
* @Author:
* @NAME: ElasticsearchRestEsUtils
* @date: 2022/7/1 14:17
*/
@Component
public class ElasticsearchRestEsUtils {
@Autowired
private ElasticsearchRestTemplate elasticsearchTemplate;
/**
* @description: 创建索引
* @author:
* @date 2022/7/1 14:07
* @param: [cla]
* @return: boolean
*/
public boolean createIndexOps(Class cla) {
// 索引别名
IndexOperations ops = elasticsearchTemplate.indexOps(cla);
if (!ops.exists()) {
ops.create();
ops.refresh();
ops.putMapping(ops.createMapping());
}
return true;
}
/**
* @description: 删除索引
* @author:
* @date 2022/7/1 15:23
* @param: [cla]
* @return: void
*/
public void deleteIndex(Class cla) {
// 这里使用了 restTemplate 的 indexOps() 获取 IndexOperations 对象操作索引
// dao 所提供的方法不支持操作索引
boolean delete = elasticsearchTemplate.indexOps(cla).delete();
System.out.println("delete = " + delete);
}
/**
* @description: 分页查询全部
* @author:
* @date 2022/7/1 15:37
* @param: [esSortPage]
* @return: java.util.List<?>
*/
public List<?> findByPageable(EsSortPage esSortPage) {
List<Object> relist = new ArrayList<>();
// 设置排序(排序方式,正序还是倒序,排序的 id)
Sort sort = null;
if (esSortPage.getSort() == 1) {
sort = Sort.by(Sort.Direction.DESC, esSortPage.getSortField());
} else {
sort = Sort.by(Sort.Direction.ASC, esSortPage.getSortField());
}
int currentPage = esSortPage.getCurrentPage();
int pageSize = esSortPage.getPageSize();
// 设置查询分页
PageRequest pageRequest = PageRequest.of(currentPage, pageSize, sort);
ElasticsearchRepository<?, Long> dao = esSortPage.getDao();
//分页查询
Page<?> daoAll = dao.findAll(pageRequest);
List<?> content = daoAll.getContent();
for (Object o : content) {
relist.add(o);
}
return relist;
}
/**
* @description: 根据组装条件分页查询 会根据类型去匹配全等还是分词模糊查询
* @author:
* @date 2022/7/3 11:07
* @param: [termQueryReq]
* @return: java.util.List<?>
*/
public List<?> findByTermQuery(TermQueryReq termQueryReq) {
List<Object> relist = new ArrayList<>();
// 设置排序(排序方式,正序还是倒序)
FieldSortBuilder balance = null;
if (termQueryReq.getSort() == 1) {
balance = new FieldSortBuilder(termQueryReq.getSortField()).order(SortOrder.DESC);
} else {
balance = new FieldSortBuilder(termQueryReq.getSortField()).order(SortOrder.ASC);
}
//分页条件
int currentPage = termQueryReq.getCurrentPage();
int pageSize = termQueryReq.getPageSize();
// 构建查询条件
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
;
List<QueryCondition> mapList = termQueryReq.getMapList();
for (QueryCondition queryCondition : mapList) {
if (queryCondition.getFiltration() == 1) {//过滤
RangeQueryBuilder date = QueryBuilders.rangeQuery(queryCondition.getKey()).gte(queryCondition.getValue()).lte(queryCondition.getEndValue());
boolQueryBuilder.filter(date);
} else {
if (queryCondition.getExtractive() == 1) {
if (queryCondition.getValue() != null) {
boolQueryBuilder.must(QueryBuilders.matchQuery(queryCondition.getKey(), queryCondition.getValue()).minimumShouldMatch(termQueryReq.getScore()));//多条件全匹配 and 因为text会分词匹配 所以得设置一个百分比得分 越高月精确
}
} else {
if (queryCondition.getValue() instanceof Integer[]) {
Integer[] pulldown = (Integer[]) queryCondition.getValue();
for (Integer integer : pulldown) {
boolQueryBuilder.should(QueryBuilders.matchQuery(queryCondition.getKey(), integer));//多条件匹配之一 or
}
}
}
}
}
// 分页
Pageable pageable = PageRequest.of(currentPage, pageSize);
// 执行查询
NativeSearchQuery query = new NativeSearchQueryBuilder()
.withQuery(boolQueryBuilder)
.withPageable(pageable)
.withSort(balance)
.build();
List<Object> resList = new ArrayList<>();
SearchHits<?> search = elasticsearchTemplate.search(query, termQueryReq.getCls());
for (SearchHit<?> searchHit : search) {
resList.add(searchHit.getContent());
}
return resList;
}
/**
* @description: 聚合搜索 类似于group by,对termQueryReq.getAggregate()字段进行聚合,
* @author:
* @date 2022/7/3 11:32
* @param: [termQueryReq]
* @return: java.util.List<?>
*/
public List<Map<String, Object>> findPolymerization(TermQueryReq termQueryReq) {
NativeSearchQuery query = new NativeSearchQueryBuilder()
.addAggregation(AggregationBuilders.terms("count").field(termQueryReq.getAggregate() + ".keyword"))
.build();
SearchHits<?> searchHits = elasticsearchTemplate.search(query, termQueryReq.getCls());
//取出聚合结果
Aggregations aggregations = searchHits.getAggregations();
Terms terms = (Terms) aggregations.asMap().get("count");
List<Map<String, Object>> mapList = new ArrayList<>();
for (Terms.Bucket bucket : terms.getBuckets()) {
Map<String, Object> map = new HashMap<>();
String keyAsString = bucket.getKeyAsString(); // 聚合字段列的值
long docCount = bucket.getDocCount(); // 聚合字段对应的数量
map.put("keyAsString", keyAsString);
map.put("docCount", docCount);
mapList.add(map);
}
return mapList;
}
/**
* @description: 嵌套聚合 统计出相同termQueryReq.getAggregate()的文档数量,再统计出termQueryReq.getNes()的平均值,带排序
* @author:
* @date 2022/7/3 11:36
* @param: [termQueryReq]
* @return: java.util.List<?>
*/
public List<Map<String, Object>> findNest(TermQueryReq termQueryReq) {
// 创建聚合查询条件
TermsAggregationBuilder stateAgg = AggregationBuilders.terms("count").field(termQueryReq.getAggregate() + ".keyword");
AvgAggregationBuilder balanceAgg = AggregationBuilders.avg("avg_" + termQueryReq.getNes()).field(termQueryReq.getNes());
// 嵌套
stateAgg.subAggregation(balanceAgg);
// 按balance的平均值降序排序
if (termQueryReq.getSort() == 1) {
stateAgg.order(BucketOrder.aggregation("avg_" + termQueryReq.getNes(), true));
} else {
stateAgg.order(BucketOrder.aggregation("avg_" + termQueryReq.getNes(), false));
}
NativeSearchQuery build = new NativeSearchQueryBuilder()
.addAggregation(stateAgg)
.build();
//执行查询
SearchHits<?> searchHits = elasticsearchTemplate.search(build, termQueryReq.getCls());
// 取出聚合结果
Aggregations aggregations = searchHits.getAggregations();
Terms terms = (Terms) aggregations.asMap().get("count");
List<Map<String, Object>> mapList = new ArrayList<>();
for (Terms.Bucket bucket : terms.getBuckets()) {
// state : count : avg
Map<String, Object> map = new HashMap<>();
ParsedAvg avg = bucket.getAggregations().get("avg_" + termQueryReq.getNes());
map.put("state", bucket.getKeyAsString());
map.put("count", bucket.getDocCount());
map.put("avg", avg.getValueAsString());
mapList.add(map);
}
return mapList;
}
/**
* @description: 过滤搜索
* @author:
* @date 2022/7/3 13:15
* @param: [termQueryReq]
* @return: java.util.List<?>
*/
public List<?> findFiltration(TermQueryReq termQueryReq) {
// 构建条件
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
RangeQueryBuilder balance = QueryBuilders.rangeQuery("balance").gte(20000).lte(30000);
boolQueryBuilder.filter(balance);
NativeSearchQuery query = new NativeSearchQueryBuilder()
.withQuery(boolQueryBuilder)
.build();
List<Object> resList = new ArrayList<>();
SearchHits<?> search = elasticsearchTemplate.search(query, termQueryReq.getCls());
for (SearchHit<?> searchHit : search) {
resList.add(searchHit.getContent());
}
return resList;
}
/**
* @description: 判断索引是否已存在
* @author:
* @date 2022/7/3 13:21
* @param: [indexName]
* @return: boolean
*/
public boolean indexExist(String indexName) {
if (StringUtils.isBlank(indexName)) {
return false;
}
IndexCoordinates indexCoordinates = IndexCoordinates.of(indexName);
return elasticsearchTemplate.indexOps(indexCoordinates).exists();
}
/**
* @description: 根据索引名称,删除索引
* @author:
* @date 2022/7/3 13:24
* @param: [index]
* @return: void
*/
public void indexDelete(String index) {
elasticsearchTemplate.indexOps(IndexCoordinates.of(index)).delete();
}
/**
* @description: 索引添加别名
* @author:
* @date 2022/7/3 13:25
* @param: [indexName, aliasName]
* @return: boolean
*/
public boolean indexAddAlias(String indexName, String aliasName) {
if (StringUtils.isBlank(indexName) || StringUtils.isBlank(aliasName)) {
return false;
}
// 索引封装类
IndexCoordinates indexCoordinates = IndexCoordinates.of(indexName);
// 判断索引是否存在
if (elasticsearchTemplate.indexOps(indexCoordinates).exists()) {
// 索引别名
AliasQuery query = new AliasQuery(aliasName);
// 添加索引别名
boolean bool = elasticsearchTemplate.indexOps(indexCoordinates).addAlias(query);
return bool;
}
return false;
}
/**
* @description: 索引别名删除
* @author:
* @date 2022/7/3 13:25
* @param: [indexName, aliasName]
* @return: boolean
*/
public boolean indexRemoveAlias(String indexName, String aliasName) {
if (StringUtils.isBlank(indexName) || StringUtils.isBlank(aliasName)) {
return false;
}
// 索引封装类
IndexCoordinates indexCoordinates = IndexCoordinates.of(indexName);
// 判断索引是否存在
if (elasticsearchTemplate.indexOps(indexCoordinates).exists()) {
// 索引别名
AliasQuery query = new AliasQuery(aliasName);
// 删除索引别名
boolean bool = elasticsearchTemplate.indexOps(indexCoordinates).removeAlias(query);
return bool;
}
return false;
}
/**
* @description: 索引新增数据
* @author:
* @date 2022/7/3 13:26
* @param: [t]
* @return: void
*/
public <T> void save(T t) {
// 根据索引实体名新增数据
elasticsearchTemplate.save(t);
}
/**
* @description: 批量插入数据
* @author:
* @date 2022/7/3 13:26
* @param: [queries, index]
* @return: void
*/
public void bulkIndex(List<IndexQuery> queries, String index) {
// 索引封装类
IndexCoordinates indexCoordinates = IndexCoordinates.of(index);
// 批量新增数据,此处数据,不要超过100m,100m是es批量新增的筏值,修改可能会影响性能
elasticsearchTemplate.bulkIndex(queries, indexCoordinates);
}
/**
* @description: 批量插入数据
* @author:
* @date 2022/7/3 13:26
* @param: [queries, index]
* @return: void
*/
public void bulkCls(List<IndexQuery> queries, Class cls) {
// 索引封装类
// 批量新增数据,此处数据,不要超过100m,100m是es批量新增的筏值,修改可能会影响性能
createIndexOps(cls);
elasticsearchTemplate.bulkIndex(queries, cls);
}
/**
* 根据条件删除对应索引名称的数据
*
* @param c 索引类对象
* @param filedName 索引中字段
* @param val 删除条件
* @param index 索引名
*/
public void delete(Class c, String filedName, Object val, String index) {
// 匹配文件查询
TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery(filedName, val);
NativeSearchQuery nativeSearchQuery = new NativeSearchQuery(termQueryBuilder);
// 删除索引数据
elasticsearchTemplate.delete(nativeSearchQuery, c, IndexCoordinates.of(index));
}
/**
* 根据数据id删除索引
*
* @param id 索引id
* @param index
*/
public void deleteById(Object id, String index) {
if (null != id && StringUtils.isNotBlank(index)) {
// 根据索引删除索引id数据
elasticsearchTemplate.delete(id.toString(), IndexCoordinates.of(index));
}
}
/**
* 根据id更新索引数据,不存在则创建索引
*
* @param t 索引实体
* @param id 主键
* @param index 索引名称
* @param <T> 索引实体
*/
public <T> void update(T t, Integer id, String index) {
// 查询索引中数据是否存在
Object data = elasticsearchTemplate.get(id.toString(), t.getClass(), IndexCoordinates.of(index));
if (data != null) {
// 存在则更新
UpdateQuery build = UpdateQuery.builder(id.toString()).withDocument(Document.parse(JSON.toJSONString(t))).build();
elasticsearchTemplate.update(build, IndexCoordinates.of(index));
} else {
// 不存在则创建
elasticsearchTemplate.save(t);
}
}
/**
* 拼接推送数据信息
*
* @param obj
* @return
*/
public IndexQuery assembleDataEs(Object obj) {
//拼接数据
IndexQuery indexQuery = new IndexQuery();
try {
Field field = getDeclareField(obj, "id");
field.setAccessible(true);
indexQuery.setId(field.get(obj).toString());
} catch (IllegalAccessException e) {
e.printStackTrace();
}
indexQuery.setSource(JSON.toJSONString(obj));
return indexQuery;
}
/**
* 读取Field(含父类属性)
*
* @param o
* @param fieldName
* @return
*/
public static Field getDeclareField(Object o, String fieldName) {
Field field = null;
Class<?> clazz = o.getClass();
for (; field == null; clazz = clazz.getSuperclass()) {
try {
field = clazz.getDeclaredField(fieldName);
} catch (NoSuchFieldException e) {
//e.printStackTrace();
}
}
return field;
}
}
RestHighLevelClient操作帮助类
package com.xxl.job.executor.utils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.xxl.job.executor.es.QueryCondition;
import com.xxl.job.executor.es.TermQueryReq;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.Fuzziness;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
//import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.MatchQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
import org.elasticsearch.script.Script;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.search.sort.SortOrder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
/**
* @description: es操作类(RestHighLevelClient)
* @Author:
* @NAME: EsUtils
* @date: 2022/7/1 13:39
*/
@Slf4j
@Component
public class RestHighLeveEsUtils {
@Autowired
private RestHighLevelClient restHighLevelClient;
/**
* @description: 创建索引
* @author:
* @date 2022/7/1 14:02
* @param: [indexName]
* @return: boolean
*/
public boolean createIndex(String indexName) {
//返回结果
boolean exists = true;
try {
// 1、创建索引请求
CreateIndexRequest request = new CreateIndexRequest(indexName);
// 2、客户端执行请求 indexResponse, 请求后获得相应
CreateIndexResponse createIndexResponse = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);
//判断响应对象是否为空
if (createIndexResponse.equals("") || createIndexResponse != null) {
exists = false;
}
} catch (IOException e) {
exists = false;
}
return exists;
}
/**
* @description: 判断索引是否存在
* @author:
* @date 2022/7/1 14:03
* @param: [indexName]
* @return: boolean
*/
public boolean isIndexExists(String indexName) {
boolean exists = true;
try {
GetIndexRequest request = new GetIndexRequest(indexName);
exists = restHighLevelClient.indices().exists(request, RequestOptions.DEFAULT);
} catch (IOException e) {
exists = false;
}
return exists;
}
/**
* @description: 删除索引
* @author:
* @date 2022/7/1 14:03
* @param: [indexName]
* @return: boolean
*/
public boolean delIndex(String indexName) {
boolean exists = true;
try {
DeleteIndexRequest request = new DeleteIndexRequest(indexName);
AcknowledgedResponse delete = restHighLevelClient.indices().delete(request, RequestOptions.DEFAULT);
exists = delete.isAcknowledged();
} catch (IOException e) {
exists = false;
}
return exists;
}
/**
* @description: 更新文档的信息
* @author:
* @date 2022/7/1 14:28
* @param: [indexName, obj, id]
* @return: boolean
*/
public boolean updateDocument(String indexName, Object obj, String id) {
boolean exists = true;
try {
UpdateRequest updateRequest = new UpdateRequest(indexName, id);
updateRequest.timeout("1s");
updateRequest.doc(JSON.toJSONString(obj), XContentType.JSON);
UpdateResponse updateResponse = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
if (!updateResponse.status().equals("OK")) {
exists = false;
}
} catch (IOException e) {
e.printStackTrace();
}
return exists;
}
/**
* @description: 删除文档记录
* @author:
* @date 2022/7/1 14:28
* @param: [indexName, id]
* @return: boolean
*/
public boolean deleteRequest(String indexName, String id) {
boolean exists = true;
try {
DeleteRequest request = new DeleteRequest(indexName, id);
request.timeout("1s");
DeleteResponse delete = restHighLevelClient.delete(request, RequestOptions.DEFAULT);
if (!delete.status().equals("OK")) {
exists = false;
}
} catch (IOException e) {
e.printStackTrace();
}
return exists;
}
/**
* @description: 根据id获取文档信息
* @author:
* @date 2022/7/1 14:29
* @param: [indexName, id]
* @return: java.util.Map
*/
public Map getDocument(String indexName, String id) {
Map strToMap = null;
try {
GetRequest request = new GetRequest(indexName, id);
GetResponse getResponse = restHighLevelClient.get(request, RequestOptions.DEFAULT);
strToMap = JSONObject.parseObject(getResponse.getSourceAsString());
} catch (IOException e) {
e.printStackTrace();
}
return strToMap;
}
/**
* @description: 创建bulkProcessor并初始化
* @author: River
* @date 2022/7/1 14:30
* @param: [client]
* @return: org.elasticsearch.action.bulk.BulkProcessor
*/
private BulkProcessor getBulkProcessor(RestHighLevelClient client) {
BulkProcessor bulkProcessor = null;
try {
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
log.info("Try to insert data number : " + request.numberOfActions());
}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
log.info("************** Success insert data number : " + request.numberOfActions() + " , id: "
+ executionId);
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
log.error("Bulk is unsuccess : " + failure + ", executionId: " + executionId);
}
};
BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer = (request, bulkListener) -> client
.bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
BulkProcessor.Builder builder = BulkProcessor.builder(bulkConsumer, listener);
// 设置最大的上传数量
builder.setBulkActions(1000);
builder.setBulkSize(new ByteSizeValue(100L, ByteSizeUnit.MB));
// 设置最多的线程并发数
builder.setConcurrentRequests(2);
builder.setFlushInterval(TimeValue.timeValueSeconds(100L));
builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3));
bulkProcessor = builder.build();
} catch (Exception e) {
e.printStackTrace();
try {
bulkProcessor.awaitClose(100L, TimeUnit.SECONDS);
client.close();
} catch (Exception e1) {
log.error(e1.getMessage());
}
}
return bulkProcessor;
}
/**
* @description: 批量插入
* @author:
* @date 2022/7/1 14:31
* @param: [objectArrayList, indexName, value]
* @return: boolean
*/
public boolean bulkRequest(ArrayList<Map<String, Object>> objectArrayList, String indexName, String value) {
boolean exists = true;
BulkProcessor bulkProcessor = getBulkProcessor(restHighLevelClient);
try {
for (int i = 0; i < objectArrayList.size(); i++) {
bulkProcessor.add(new IndexRequest(indexName)
.id(objectArrayList.get(i).get(value).toString())
.source(JSON.toJSONString(objectArrayList.get(i)), XContentType.JSON));
}
// 将数据刷新到es
bulkProcessor.flush();
} catch (Exception e) {
log.error(e.getMessage());
} finally {
try {
boolean terminatedFlag = bulkProcessor.awaitClose(150L, TimeUnit.SECONDS);
log.info(String.valueOf(terminatedFlag));
} catch (Exception e) {
log.error(e.getMessage());
}
}
return exists;
}
/**
* @description: 创建文档
* @author:
* @date 2022/7/3 16:03
* @param: [indexName, obj, id]
* @return: boolean
*/
public boolean addDocument(String indexName, Object obj, String id) {
boolean exists = true;
IndexResponse indexResponse = null;
try {
// 创建请求
IndexRequest request = new IndexRequest(indexName);
// 规则 put /kuang_index/_doc/1
request.id(id);
request.timeout(TimeValue.timeValueDays(1));
// 将我们的数据放入请求 json
request.source(JSON.toJSONString(obj), XContentType.JSON);
// 客户端发送请求,获取响应结果
indexResponse = restHighLevelClient.index(request, RequestOptions.DEFAULT);
if (!indexResponse.equals("CREATED")) {//判断响应结果对象是否为CREATED
exists = false;
}
} catch (IOException e) {
exists = false;
}
return exists;
}
/**
* @description: 获取文档,判断是否存在
* @author:
* @date 2022/7/3 16:03
* @param: [indexName, id]
* @return: boolean
*/
public boolean isExists(String indexName, String id) {
boolean exists = true;
try {
GetRequest request = new GetRequest(indexName, id);
// 不获取返回的 _source 的上下文了
request.fetchSourceContext(new FetchSourceContext(false));
request.storedFields("_none_");
exists = restHighLevelClient.exists(request, RequestOptions.DEFAULT);
} catch (IOException e) {
e.printStackTrace();
}
return exists;
}
/**
* @description: 模糊查询
* @author:
* @date 2022/7/3 16:05
* @param: [indexName, key, value]
* @return: java.util.List<java.util.Map<java.lang.String,java.lang.Object>>
*/
public List<Map<String, Object>> searchMatch(String indexName, String key, String value) throws IOException {
List<Map<String, Object>> map = new ArrayList<>();
SearchRequest searchRequest = new SearchRequest(indexName);
// 构建搜索条件
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
MatchQueryBuilder termQueryBuilder = new MatchQueryBuilder(key, value);
termQueryBuilder.fuzziness(Fuzziness.AUTO);
sourceBuilder.query(termQueryBuilder);
sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));
searchRequest.source(sourceBuilder);
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
for (SearchHit documentFields : searchResponse.getHits().getHits()) {
map.add(documentFields.getSourceAsMap());
}
return map;
}
/**
* @description: 精确查询
* @author:
* @date 2022/7/3 16:05
* @param: [indexName, key, value]
* @return: java.util.List<java.util.Map<java.lang.String,java.lang.Object>>
*/
public List<Map<String, Object>> searchQuery(String indexName, String key, String value) throws IOException {
List<Map<String, Object>> map = new ArrayList<>();
SearchRequest searchRequest = new SearchRequest(indexName);
// 构建搜索条件
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery(key, value);
sourceBuilder.query(termQueryBuilder);
sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));
searchRequest.source(sourceBuilder);
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
for (SearchHit documentFields : searchResponse.getHits().getHits()) {
map.add(documentFields.getSourceAsMap());
}
return map;
}
/**
* @description: 根据条件修改
* @author:
* @date 2022/7/3 16:05
* @param: [catalogId, timeliness]
* @return: void
*/
public void EsupdateTimeliness(String catalogId,String timeliness,String indexName,String catalogName){
//构建条件
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
boolQueryBuilder.must(QueryBuilders.termQuery(catalogName, catalogId));
//查询一遍,若没有数据则不执行修改
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(boolQueryBuilder);
SearchRequest request = new SearchRequest(indexName);
request.source(sourceBuilder);
//执行查询修改
UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(indexName)
.setQuery(boolQueryBuilder)
.setScript(new Script("ctx._source['timeliness']='" + timeliness + "'"));
try {
SearchResponse search = restHighLevelClient.search(request, RequestOptions.DEFAULT);
long docMun = search.getHits().getTotalHits().value;
if (docMun != 0) {
restHighLevelClient.updateByQuery(updateByQueryRequest, RequestOptions.DEFAULT).getUpdated();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private SearchResponse getCommodityList(String index,TermQueryReq req) {
SearchRequest searchRequest = new SearchRequest(index);
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
//根据ID进行排序
sourceBuilder.sort("id", SortOrder.ASC);
//查询条件
BoolQueryBuilder mustQuery = QueryBuilders.boolQuery();
List<QueryCondition> mapList = req.getMapList();
for (QueryCondition queryCondition : mapList) {
if (queryCondition.getExtractive() == 1){//and
if (queryCondition.getValue() != null){
mustQuery.must(QueryBuilders.termQuery(queryCondition.getKey(), queryCondition.getValue()));
}
}else{
Integer[] pulldown = (Integer[]) queryCondition.getValue();
for (Integer integer : pulldown) {
mustQuery.should(QueryBuilders.matchQuery(queryCondition.getKey(), integer));//多条件匹配之一 or
}
}
}
//查询字段
StringBuffer fields = new StringBuffer();
for (QueryCondition queryCondition : mapList) {
fields.append(queryCondition.getKey()).append(",");
}
sourceBuilder.fetchSource(new FetchSourceContext(true, fields.toString().split(","), Strings.EMPTY_ARRAY));
//分页
sourceBuilder.query(mustQuery);
sourceBuilder.from((req.getPageSize() - 1) * req.getPageSize());
sourceBuilder.size(req.getPageSize());
sourceBuilder.trackTotalHits(true);
searchRequest.source(sourceBuilder);
//开始查询
SearchResponse response;
List<Map<String, Object>> list = null;
try {
response = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
} catch (Exception e) {
e.printStackTrace();
return null;
}
return response;
}
}
二次聚合
网上说在es5.0以后二次聚合如果使用script脚本去聚合需要添加配置 但是我试了下没什么用 如果有那个大神有好的解决办法可以分享出来 我这边用的是管道聚合也就是聚合套聚合 或者是在洗数据的时候拼接好分组字段
PageInfo<DespatchRes> pageInfo = new PageInfo<>();
List<DespatchRes> relist = Lists.newArrayList();
//起始条数
int from = (req.getPageNum() - 1) * (req.getPageSize() - 1);
//末尾条数
int limit = from+req.getPageNum();
//根据productGroupId进行分桶 分通聚合
TermsAggregationBuilder stateTime = AggregationBuilders.terms("stateTime").field("productGroupId").size(Integer.MAX_VALUE);
TermsAggregationBuilder stateAgg = AggregationBuilders.terms("GroupId").field("pickupTime").size(Integer.MAX_VALUE);
stateAgg.order(BucketOrder.aggregation("_count", true));//根据count数量排序
//取出首条数据作为展示数据
stateAgg.subAggregation(AggregationBuilders.topHits("Group").size(1));
//对聚合后的结果进行排序和分页
stateAgg.subAggregation(new BucketSortPipelineAggregationBuilder("pageInfo", null).from(from).size(limit));
//使用指标聚合 拿到分桶中的非预报数
stateAgg.subAggregation(AggregationBuilders.sum("perSum").field("isPrePackage"));
//将stateTime桶中的数据拼接到stateAgg桶 二次聚合也就是管道聚合
stateAgg.subAggregation(stateTime);
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
//时间范围
if (StringUtils.isNotBlank(req.getPickupStartTime()) && StringUtils.isNotBlank(req.getPickupEndTime())){
RangeQueryBuilder date1 = QueryBuilders.rangeQuery("pickupTime").from(req.getPickupStartTime()).to(req.getPickupEndTime());
boolQueryBuilder.filter(date1);
}
boolQueryBuilder.must(QueryBuilders.matchQuery("sts", 0));
boolQueryBuilder.must(QueryBuilders.matchQuery("tenantId", user.getTenantId()));
//添加must条件
if (ObjectUtils.isNotEmpty(req.getCargoOwnerId())){
boolQueryBuilder.must(QueryBuilders.matchQuery("cargoOwnerId", req.getCargoOwnerId()));
}
//添加should条件
if (ObjectUtils.isNotEmpty(req.getWarehouseId())){
List<Long> warehouseId = req.getWarehouseId();
for (Long aLong : warehouseId) {
boolQueryBuilder.should(QueryBuilders.matchQuery("warehouseId", aLong));
}
}
if (ObjectUtils.isNotEmpty(req.getWarehouseCodeId())){
List<Long> warehouseCodeId = req.getWarehouseCodeId();
for (Long aLong : warehouseCodeId) {
boolQueryBuilder.should(QueryBuilders.matchQuery("referenceThree", aLong));
}
}
if (ObjectUtils.isNotEmpty(req.getOrderTypeId())){
List<Long> warehouseCodeId = req.getOrderTypeId();
for (Long aLong : warehouseCodeId) {
boolQueryBuilder.should(QueryBuilders.matchQuery("orderTypeId", aLong));
}
}
// 构建查询条件
NativeSearchQuery build = new NativeSearchQueryBuilder()
.withQuery(boolQueryBuilder)
.withTrackTotalHits(true)
.addAggregation(stateAgg)
.build();
//执行查询
SearchHits<DespatchEs> searchHits = elasticsearchTemplate.search(build, DespatchEs.class);
Aggregations aggregations = searchHits.getAggregations();
Terms terms = (Terms) aggregations.asMap().get("GroupId");
for (Terms.Bucket bucket : terms.getBuckets()) {
DespatchRes rs = new DespatchRes();
//获取count值
rs.setNumber(bucket.getDocCount());
Aggregations bucketAggregations = bucket.getAggregations();
//获取分组数据
TopHits ns = (TopHits) bucketAggregations.asMap().get("Group");
SearchHit[] hits = ns.getHits().getHits();
for (SearchHit hit : hits) {
rs.setId(Long.parseLong(hit.getId()));
Map<String, Object> sourceAsMap = hit.getSourceAsMap();
rs.setPickupTime(sourceAsMap.get("pickupTime") == null ? "" :sourceAsMap.get("pickupTime").toString());
rs.setProductGroupDetail(sourceAsMap.get("productGroupDetail") == null ? "" : sourceAsMap.get("productGroupDetail").toString());
rs.setProductGroupId(sourceAsMap.get("productGroupId") == null ? "":sourceAsMap.get("productGroupId").toString());
rs.setReferenceThree(sourceAsMap.get("referenceThree") == null ? "":sourceAsMap.get("referenceThree").toString());
}
//获取非预包数
Sum perSum = bucketAggregations.get("perSum");
//总数-非预包数 = 预包数
double v = bucket.getDocCount() - perSum.getValue();
rs.setIsPrePackageNumber(v);
relist.add(rs);
}
pageInfo.setList(relist);
pageInfo.setTotal(terms.getBuckets().size());
pageInfo.setPageNum(req.getPageNum());
pageInfo.setPageSize(req.getPageSize());
return ResultRes.success(pageInfo);
我始终相信 在这个世界上一定有着另外一个自己 在做着我不敢做的事情 过着我想过的生活