elasticsearch 源码 创建索引
创建索引过程,当我们客户端提交一个创建索引请求时,之前提到了es的transport模块,在处理请求时,会将请求分发到对应的TransportRequestHandler,而创建索引的入口就是TransportHandler对象,这个对象对应的类是TransportCreateIndexAction的内部类,而TransportCreateIndexAction的这个内部类继承自父类TransportMasterNodeOperationAction。
//TransportMasterNodeOperationAction.java
private class TransportHandler extends BaseTransportRequestHandler<Request> {
@Override
public Request newInstance() {
return newRequest();
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
} // 使用的是same,请求不是交由线程池执行
@Override
public void messageReceived(final Request request, final TransportChannel channel) throws Exception {
// we just send back a response, no need to fork a listener
request.listenerThreaded(false);
execute(request, new ActionListener<Response>() {
@Override
public void onResponse(Response response) {
try {
channel.sendResponse(response);
} catch (Throwable e) {
onFailure(e);
}
}
@Override
public void onFailure(Throwable e) {
try {
channel.sendResponse(e);
} catch (Exception e1) {
logger.warn("Failed to send response", e1);
}
}
});
}
}
服务端在接收到创建索引请求时,会调用TransportHandler的messageReceived方法,进而调用execute方法。可以看到ActionListener 封装了response,让调用者可以执行调用OnResponse方法来响应,或者调用OnFailure来响应失败。而后
//TransportMasterNodeOperationAction.java
public void execute(Request request, ActionListener<Response> listener) {
request.listenerThreaded(true);
super.execute(request, listener);
}
//TransportAction.java
public void execute(Request request, ActionListener<Response> listener) {
if (request.listenerThreaded()) {
//对ActionListener 进行封装ThreadedActionListener 封装线程池
listener = new ThreadedActionListener<Response>(threadPool, listener, logger);
}
ActionRequestValidationException validationException = request.validate();
if (validationException != null) {
listener.onFailure(validationException);
return;
}
try {
doExecute(request, listener);
} catch (Throwable e) {
logger.trace("Error during transport action execution.", e);
listener.onFailure(e);
}
}
方法调用,对ActionListener进行了封装,得到一个ThreadedActionListener对象,这个对象继承自ActionListener,不过他的onResponse方法和onFailure方法中响应处理Response的逻辑是由generic类型的线程池来执行。
public void onResponse(final Response response) {
try {
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
try {
listener.onResponse(response);
} catch (Throwable e) {
listener.onFailure(e);
}
}
});
} catch (EsRejectedExecutionException ex) {
logger.debug("Can not run threaded action, exectuion rejected
try {
listener.onResponse(response);
} catch (Throwable e) {
listener.onFailure(e);
}
}
}
@Override
public void onFailure(final Throwable e) {
try {
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
listener.onFailure(e);
}
});
} catch (EsRejectedExecutionException ex) {
listener.onFailure(e);
}
}
上边调用了doExecute方法,最终会调用innerExecute方法。这个方法比较长,不贴代码了说下具体逻辑
1.获取集群信息ClusterState,进而获取到所有的节点信息
2.判断本节点是否是master节点,若是mater节点则使用线程池调用masterOperation方法,使用的是same类型线程池,否则执行3
3.不是master节点,判断是否有master节点,若是master节点为null,则30s后再次执行innerExecute方法,或者集群状态更新时调用innerExecute方法,从1开始,只能重新执行一次,若重新执行依然没有master节点,返回失败响应。若是master节点不为null执行4
4.转发请求到master节点,并new TransportResponseHandler,在接收到master的响应是,再转发响应到客户端。master节点接收到请求后,从头开始执行,不出意外的话,会执行到步骤2,调用masterOperation方法。
masterOperation方法,将request中的信息生成CreateIndexClusterStateUpdateRequest对象,然后调用了MetaDataCreateIndexService中的createIndex方法。主要进行以下操作
1.构建索引的setting
2.根据索引名称获取锁,若是获取成功则调用重载的createIndex方法,否则new 一个Runnable,其中使用tryAcquire(long timeout, TimeUnit unit)方法来获取锁,等待时间是请求的timeout时间。
若是在等待时间内获取到锁,则调用重载的createIndex方法。
在createIndex方法中将索引创建任务封装成一个集群状态更新任务,提交到一个优先级线程池中取执行,索引的创建流程如下:
1)验证索引名称,判断路由表中是否含有此索引,元数据中是否含有次索引,索引名称是否合法等
2)根据索引名称查找templates,并排序,解析request中的mappings,和customs(暂时不知道这个是什么)
3)合并mapping,mapping 包含以下几部分
1.request的mapping
2.匹配到的templates
3.home/config/mappings/indexName 下的mapping
4.home/config/mappings/_default下的mapping
5._default_ 的mapping type (也就是索引类型 也叫mappings类型)
优先级从高到底
3)合并custom,将template中的custom与request中的合并
4)合并索引的setting,主要包含以下几部分
1.request中的indexSetting
2.templates中的setting
3.ES setting中的配置的默认值
4.默认值
优先级从高到低
5)使用索引名称和索引配置来创建索引,调用InternalIndicesService中的createIndex方法,初始化索引相关的各个模块
public synchronized IndexService createIndex(String sIndexName, Settings settings, String localNodeId) throws ElasticsearchException {
if (!lifecycle.started()) { //判断生命周期 是否是started
throw new ElasticsearchIllegalStateException("Can't create an index [" + sIndexName + "], node is closed");
}
Index index = new Index(sIndexName);// new index
if (indicesInjectors.containsKey(index.name())) {
throw new IndexAlreadyExistsException(index);
}
indicesLifecycle.beforeIndexCreated(index);// 调用生命周期对象中的beforeIndexCreated方法
logger.debug("creating Index [{}], shards [{}]/[{}]", sIndexName, settings.get(SETTING_NUMBER_OF_SHARDS), settings.get(SETTING_NUMBER_OF_REPLICAS));
Settings indexSettings = settingsBuilder()
.put(this.settings)
.put(settings)
.classLoader(settings.getClassLoader())
.build();
ModulesBuilder modules = new ModulesBuilder();
modules.add(new IndexNameModule(index));//索引名称
modules.add(new LocalNodeIdModule(localNodeId)); //就用来记录localNodeId
modules.add(new IndexSettingsModule(index, indexSettings)); //索引setting inject IndexSettingsService 和setting
modules.add(new IndexPluginsModule(indexSettings, pluginsService)); // 索引plugin 注入
modules.add(new IndexStoreModule(indexSettings)); // 根据配置 注入不同的store对象
/*
默认engine
public static final Class<? extends Module> DEFAULT_INDEX_ENGINE = InternalIndexEngineModule.class;
public static final Class<? extends Module> DEFAULT_ENGINE = InternalEngineModule.class;
*/
modules.add(new IndexEngineModule(indexSettings));
/*
解析 模块 各种关键字处理 pattern_replace
CharFiltersBindings 字符处理
TokenFiltersBindings 词元处理
TokenizersBindings 分词处理
AnalyzersBindings analyzer 分析器 封装了以上处理单元
https://www.elastic.co/guide/en/elasticsearch/reference/current/analysis-analyzers.html
*/
modules.add(new AnalysisModule(indexSettings, indicesAnalysisService));
//我们要修改打分机制,就需要自定义similarity BM25Similarity
modules.add(new SimilarityModule(indexSettings));
/*
索引相关 cache
*/
modules.add(new IndexCacheModule(indexSettings));
// fieldData doc_value 聚合功能
modules.add(new IndexFieldDataModule(indexSettings));
// 编解码模块 PostingsFormatProvider DocValuesFormatProvider
//PostingsFormat covers the inverted index, including all fields, terms, documents,
// positions, payloads and offsets. Queries use this API to find their matching documents.
modules.add(new CodecModule(indexSettings));
//mapping
modules.add(new MapperServiceModule());
//查询解析
modules.add(new IndexQueryParserModule(indexSettings));
//别名
modules.add(new IndexAliasesServiceModule());
// gateway full restart 索引恢复
modules.add(new IndexGatewayModule(indexSettings, injector.getInstance(Gateway.class)));
// 索引服务 包含了上边的各个服务模块,直接使用indexService
modules.add(new IndexModule(indexSettings));
Injector indexInjector;
try {
// 创建一个子注入器
indexInjector = modules.createChildInjector(injector);
} catch (CreationException e) {
throw new IndexCreationException(index, Injectors.getFirstErrorFailure(e));
} catch (Throwable e) {
throw new IndexCreationException(index, e);
}
// 映射关系
indicesInjectors.put(index.name(), indexInjector);
索引服务 包含了上边的各个服务模块,直接使用indexService
IndexService indexService = indexInjector.getInstance(IndexService.class);
// 生命周期
indicesLifecycle.afterIndexCreated(indexService);
indices = newMapBuilder(indices).put(index.name(), indexService).immutableMap();
return indexService;
}
6)将mappings信息添加到MapperService中,然后根据MapperService生成MappingMetaData元数据。每个type对应一个MappingMetaData。
7)根据索引名称,索引setting,mapping元数据MappingMetaData,custom,和请求中的索引状态来构建一个索引元数据IndexMetaData。将索引元数据添加到元数据中MetaData
8)若是request中有索引对应的block信息,则添加到ClusterBlocks中,若是request的state等于close(索引关闭),则添加此block到ClusterBlocks中。block用来定义索引的访问权限
write read metadata 三种类型阻塞
block
index read-only / write metadata
index read / read
index write /write
index metadata /metadata
index close /read write
9)更新metaData 和 block 到ClusterState 中
10)若是request的state等于Open,则构建routingTable,进行分片分配,这个过程由ShardsAllocator接口的实现类和AllocationDecider抽象类的子类来完成。
11)对比创建以后的集群状态和前的状态,有变化则集群状态的version,routingTable的version与metadata的version都会加1,构建一个新的ClusterState,发布集群状态到所有节点,将集群状态更新事件作为参数调用各个集群状态更新的listener。等到收到所有节点的ack信息,则释放锁,返回Response
注
1.源码阅读笔记