使用sharding时,有些语句可能被分发到多个数据库节点执行,之后将这些结果汇总加工返回到客户端。归并引擎的作用便是完成对结果的汇总加工,也叫作结果归并。
sharding结果归并从功能上分为遍历、排序、分组、分页和聚合5种类型,从结构划分可分为流式归并、内存归并和装饰者归并,流式归并和内存归并是互斥的,装饰者归并可以在流式归并和内存归并之上做进一步的处理。
流式归并是指每一次从结果集中获取到的数据,都能够通过逐条获取的方式返回正确的单条数据,它与数据库原生的返回结果集的方式最为契合。遍历、排序以及流式分组都属于流式归并的一种。
内存归并则是需要将结果集的所有数据都遍历并存储在内存中,再通过统一的分组、排序以及聚合等计算之后,再将其封装成为逐条访问的数据结果集返回。
装饰者归并是对所有的结果集归并进行统一的功能增强,目前装饰者归并有分页归并和聚合归并这2种类型。
本文接下来介绍select语句的结果归并,对于其他语句不再本文介绍范围内。
这里写目录标题
1、归并引擎
归并引擎根据select语句特点,查找合适的归并处理策略,将数据库执行结果封装到MergedResult对象中。sharding对MergedResult使用ShardingResultSet封装,ShardingResultSet实现了ResultSet,应用程序获得便是ShardingResultSet对象,ShardingResultSet将对其方法的调用都委托给了MergedResult。所以应用程序中调用ShardingResultSet,其实就是调用MergedResult。
从上面的介绍中,可以看到MergedResult的作用:持有多个数据库的结果集对象(ResultSet),当应用程序获取数据库结果时,可以在多个结果集之间自动切换,使得应用程序就像使用一个结果集对象而不是多个。
sharding提供了12个功能各不相同的MergedResult实现类,归并引擎的作用便是查找合适的MergedResult实现类,并创建MergedResult对象,将该对象返回给上层调用。
sharding提供了两种归并引擎实现类:ShardingResultMergerEngine和EncryptResultDecoratorEngine,后者在加密场景中使用,一般情况下使用的是ShardingResultMergerEngine。
2、MergedResult
先来看一下MergedResult接口:
public interface MergedResult {
/**
* Iterate next data.
*/
boolean next() throws SQLException;
/**
* Get data value.
*
* @param columnIndex column index
* @param type class type of data value
* @return data value
*/
Object getValue(int columnIndex, Class<?> type) throws SQLException;
/**
* Get calendar value.
*
* @param columnIndex column index
* @param type class type of data value
* @param calendar calendar
* @return calendar value
*/
Object getCalendarValue(int columnIndex, Class<?> type, Calendar calendar) throws SQLException;
/**
* Get InputStream.
*
* @param columnIndex column index
* @param type class type of data value
*/
InputStream getInputStream(int columnIndex, String type) throws SQLException;
/**
* Judge ResultSet is null or not.
*
* @return ResultSet is null or not
*/
boolean wasNull() throws SQLException;
}
MergedResult提供了next方法和getValue方法,next方法用于判断是否已经完成对结果集的遍历,getValue方法可以获得数据库的执行结果。
MergedResult将结果集封装,调用方调用next方法和getValue方法便可以完成对所有结果的遍历,大大简化了调用方的操作。
sharding提供了12种MergedResult实现类:
实现类 | 作用 |
---|---|
LimitDecoratorMergedResult | mysql使用,用于分页查询 |
IteratorStreamMergedResult | 对结果集执行简单遍历 |
GroupByStreamMergedResult | 用于分组查询 |
OrderByStreamMergedResult | 排序 |
GroupByMemoryMergedResult | 分组,与上面分组不同的是,该类需要在内存中完成分组 |
SingleLocalDataMergedResult | DAL语句使用,DAL表示数据库管理类的语句,比如show create table语句 |
MultipleLocalDataMergedResult | 暂没有使用场景 |
LogicTablesMergedResult | DAL语句使用 |
ShowCreateTableMergedResult | DAL语句使用 |
TransparentMergedResult | 用于update/insert/delete等语句 |
TopAndRowNumberDecoratorMergedResult | SQL Server使用,用于分页查询 |
RowNumberDecoratorMergedResult | Oracle使用,用于分页查询 |
下面对部分MergedResult实现类详细介绍。
2.1、IteratorStreamMergedResult
该实现类是遍历归并,是最简单的归并方式。sharding将多个结果集对象组织成一个链表,顺次遍历链表中的每个结果集。下面看一下它的源码。
//入参queryResults是结果集对象组成的链表
public IteratorStreamMergedResult(final List<QueryResult> queryResults) {
this.queryResults = queryResults.iterator();
setCurrentQueryResult(this.queryResults.next());
}
//next方法遍历完当前的结果集对象,便切换到链表中下一个结果集对象
@Override
public boolean next() throws SQLException {
if (getCurrentQueryResult().next()) {
return true;
}
if (!queryResults.hasNext()) {
return false;
}
//设置当前正在访问的结果集对象
setCurrentQueryResult(queryResults.next());
boolean hasNext = getCurrentQueryResult().next();
if (hasNext) {
return true;
}
while (!hasNext && queryResults.hasNext()) {
setCurrentQueryResult(queryResults.next());
hasNext = getCurrentQueryResult().next();
}
return hasNext;
}
2.2、OrderByStreamMergedResult
当SQL语句中有order by子句,且需要归并多个数据库节点返回的结果集时,sharding使用该类处理结果集。
因为每个数据库节点执行的SQL都有order by子句,都是已经排序过的数据,那么将每个结果集的当前数据值进行比较(通过实现Java的Comparable接口完成),并将其放入优先级队列。 每次获取下一条数据时,只需将队列顶端结果集的游标下移,并根据新游标重新进入优先级排序队列找到自己的位置即可。
该类使用的是流式归并的方式,每次next仅获取唯一正确的一条数据,极大的节省了内存的消耗。
下面我们看一下源码:
//queryResults:多个结果集对象组成的链表
public OrderByStreamMergedResult(final List<QueryResult> queryResults, final SelectStatementContext selectStatementContext, final SchemaMetaData schemaMetaData) throws SQLException {
//获得排序字段,也就是order by子句里面的字段名
this.orderByItems = selectStatementContext.getOrderByContext().getItems();
//orderByValuesQueue就是上文提到的优先级队列,结果集放入该队列中
this.orderByValuesQueue = new PriorityQueue<>(queryResults.size());
orderResultSetsToQueue(queryResults, selectStatementContext, schemaMetaData);
isFirstNext = true;
}
private void orderResultSetsToQueue(final List<QueryResult> queryResults, final SelectStatementContext selectStatementContext, final SchemaMetaData schemaMetaData) throws SQLException {
//获得各个结果集对象的第一个结果,并将它们放入到优先级队列中
for (QueryResult each : queryResults) {
OrderByValue orderByValue = new OrderByValue(each, orderByItems, selectStatementContext, schemaMetaData);
if (orderByValue.next()) {
orderByValuesQueue.offer(orderByValue);
}
}
setCurrentQueryResult(orderByValuesQueue.isEmpty() ? queryResults.get(0) : orderByValuesQueue.peek().getQueryResult());
}
@Override
public boolean next() throws SQLException {
if (orderByValuesQueue.isEmpty()) {
return false;
}
if (isFirstNext) {
isFirstNext = false;
return true;
}
OrderByValue firstOrderByValue = orderByValuesQueue.poll();
if (firstOrderByValue.next()) {
orderByValuesQueue.offer(firstOrderByValue);
}
if (orderByValuesQueue.isEmpty()) {
return false;
}
setCurrentQueryResult(orderByValuesQueue.peek().getQueryResult());
return true;
}
优先级队列里面的排序方式是:
public int compareTo(final OrderByValue o) {
int i = 0;
//遍历各个排序字段
for (OrderByItem each : orderByItems) {
//调用java的Comparable接口完成排序,也对排序方向进行了判断
int result = CompareUtil.compareTo(orderValues.get(i), o.orderValues.get(i), each.getSegment().getOrderDirection(),
each.getSegment().getNullOrderDirection(), orderValuesCaseSensitive.get(i));
if (0 != result) {
return result;
}
i++;
}
return 0;
}
2.3、GroupByStreamMergedResult
处理group by子句,sharding提供了两种处理方式,一种是流式处理,一种是内存中再分组加工处理。两者的区别是,如果order by子句的字段与group by子句的字段一样,则使用流水处理,如果不一样,则使用内存再加工处理。本小节先介绍流式处理,下一小节介绍内存处理方式。
GroupByStreamMergedResult是OrderByStreamMergedResult的子类,因此GroupByStreamMergedResult使用优先级队列对每个结果集排序。每次调用next方法时,从优先级队列里面获取下一个结果作为当前结果,然后再从优先级队列里面获得下一个结果,将该结果与第一个结果比较,如果相同,则两个结果合并然后遍历下一个,如果不同,则next方法返回。
GroupByStreamMergedResult对结果合并时,也会对sum、max、min等聚合方法进行处理。
2.4、GroupByMemoryMergedResult
因为排序字段与分组字段不同,无法使用流式方式处理结果集。GroupByMemoryMergedResult的处理方式是遍历每条数据库执行结果记录,将它们放入到一个HashMap对象中,该HashMap对象的key是group by字段的值,value是对应的数据库执行结果记录,不过如果有多个记录具有相同的group by字段值,则value是一个聚合后的记录。
遍历完所有的结果后,将HashMap的value转换为一个List对象,然后对List按照order by子句进行排序。这样每次调用next方法获得下一个记录时,就从List中返回。
2.5、LimitDecoratorMergedResult
LimitDecoratorMergedResult是一个装饰器,它会对其他的MergedResult对象进行装饰。
//mergedResult是被装饰的MergedResult对象
//pagination用于表示分页,记录了分页数据
public LimitDecoratorMergedResult(final MergedResult mergedResult, final PaginationContext pagination) throws SQLException {
super(mergedResult);
this.pagination = pagination;
skipAll = skipOffset();
}
//skipOffset方法调用mergedResult的next方法,跳过指定的分页记录
private boolean skipOffset() throws SQLException {
for (int i = 0; i < pagination.getActualOffset(); i++) {
if (!getMergedResult().next()) {
return true;
}
}
rowNumber = 0;
return false;
}
@Override
public boolean next() throws SQLException {
if (skipAll) {
return false;
}
if (!pagination.getActualRowCount().isPresent()) {
return getMergedResult().next();
}
return ++rowNumber <= pagination.getActualRowCount().get() && getMergedResult().next();
}
3、引用
https://shardingsphere.apache.org/document/legacy/4.x/document/cn/features/sharding/principle/merge/