elasticsearch 源码 创建索引

elasticsearch 源码 创建索引

创建索引过程,当我们客户端提交一个创建索引请求时,之前提到了es的transport模块,在处理请求时,会将请求分发到对应的TransportRequestHandler,而创建索引的入口就是TransportHandler对象,这个对象对应的类是TransportCreateIndexAction的内部类,而TransportCreateIndexAction的这个内部类继承自父类TransportMasterNodeOperationAction。

//TransportMasterNodeOperationAction.java
private class TransportHandler extends BaseTransportRequestHandler<Request> {

    @Override
    public Request newInstance() {
        return newRequest();
    }

    @Override
    public String executor() {
        return ThreadPool.Names.SAME;
    }  // 使用的是same,请求不是交由线程池执行

    @Override
    public void messageReceived(final Request request, final TransportChannel channel) throws Exception {
        // we just send back a response, no need to fork a listener
        request.listenerThreaded(false);
        execute(request, new ActionListener<Response>() {
            @Override
            public void onResponse(Response response) {
                try {
                    channel.sendResponse(response);
                } catch (Throwable e) {
                    onFailure(e);
                }
            }

            @Override
            public void onFailure(Throwable e) {
                try {
                    channel.sendResponse(e);
                } catch (Exception e1) {
                    logger.warn("Failed to send response", e1);
                }
            }
        });
    }
}

服务端在接收到创建索引请求时,会调用TransportHandler的messageReceived方法,进而调用execute方法。可以看到ActionListener 封装了response,让调用者可以执行调用OnResponse方法来响应,或者调用OnFailure来响应失败。而后

//TransportMasterNodeOperationAction.java
public void execute(Request request, ActionListener<Response> listener) {
    request.listenerThreaded(true);
    super.execute(request, listener);
}

//TransportAction.java
public void execute(Request request, ActionListener<Response> listener) {
    if (request.listenerThreaded()) {
        //对ActionListener 进行封装ThreadedActionListener    封装线程池
        listener = new ThreadedActionListener<Response>(threadPool, listener, logger);
    }
    ActionRequestValidationException validationException = request.validate();
    if (validationException != null) {
        listener.onFailure(validationException);
        return;
    }
    try {
        doExecute(request, listener);
    } catch (Throwable e) {
        logger.trace("Error during transport action execution.", e);
        listener.onFailure(e);
    }
}

方法调用,对ActionListener进行了封装,得到一个ThreadedActionListener对象,这个对象继承自ActionListener,不过他的onResponse方法和onFailure方法中响应处理Response的逻辑是由generic类型的线程池来执行。

 public void onResponse(final Response response) {
        try {
            threadPool.generic().execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        listener.onResponse(response);
                    } catch (Throwable e) {
                        listener.onFailure(e);
                    }
                }
            });
        } catch (EsRejectedExecutionException ex) {
            logger.debug("Can not run threaded action, exectuion rejected 
            try {
                listener.onResponse(response);
            } catch (Throwable e) {
                listener.onFailure(e);
            }
        }
    }

    @Override
    public void onFailure(final Throwable e) {
        try {
            threadPool.generic().execute(new Runnable() {
                @Override
                public void run() {
                    listener.onFailure(e);
                }
            });
        } catch (EsRejectedExecutionException ex) {
            listener.onFailure(e);
        }
    }

上边调用了doExecute方法,最终会调用innerExecute方法。这个方法比较长,不贴代码了说下具体逻辑
1.获取集群信息ClusterState,进而获取到所有的节点信息
2.判断本节点是否是master节点,若是mater节点则使用线程池调用masterOperation方法,使用的是same类型线程池,否则执行3
3.不是master节点,判断是否有master节点,若是master节点为null,则30s后再次执行innerExecute方法,或者集群状态更新时调用innerExecute方法,从1开始,只能重新执行一次,若重新执行依然没有master节点,返回失败响应。若是master节点不为null执行4
4.转发请求到master节点,并new TransportResponseHandler,在接收到master的响应是,再转发响应到客户端。master节点接收到请求后,从头开始执行,不出意外的话,会执行到步骤2,调用masterOperation方法。

masterOperation方法,将request中的信息生成CreateIndexClusterStateUpdateRequest对象,然后调用了MetaDataCreateIndexService中的createIndex方法。主要进行以下操作

1.构建索引的setting
2.根据索引名称获取锁,若是获取成功则调用重载的createIndex方法,否则new 一个Runnable,其中使用tryAcquire(long timeout, TimeUnit unit)方法来获取锁,等待时间是请求的timeout时间。
若是在等待时间内获取到锁,则调用重载的createIndex方法。

在createIndex方法中将索引创建任务封装成一个集群状态更新任务,提交到一个优先级线程池中取执行,索引的创建流程如下:
1)验证索引名称,判断路由表中是否含有此索引,元数据中是否含有次索引,索引名称是否合法等
2)根据索引名称查找templates,并排序,解析request中的mappings,和customs(暂时不知道这个是什么)
3)合并mapping,mapping 包含以下几部分

1.request的mapping
2.匹配到的templates
3.home/config/mappings/indexName 下的mapping
4.home/config/mappings/_default下的mapping
5._default_ 的mapping type (也就是索引类型 也叫mappings类型)
优先级从高到底

3)合并custom,将template中的custom与request中的合并
4)合并索引的setting,主要包含以下几部分

1.request中的indexSetting
2.templates中的setting
3.ES setting中的配置的默认值
4.默认值
优先级从高到低

5)使用索引名称和索引配置来创建索引,调用InternalIndicesService中的createIndex方法,初始化索引相关的各个模块

public synchronized IndexService createIndex(String sIndexName, Settings settings, String localNodeId) throws ElasticsearchException {
    if (!lifecycle.started()) {  //判断生命周期 是否是started
        throw new ElasticsearchIllegalStateException("Can't create an index [" + sIndexName + "], node is closed");
    }
    Index index = new Index(sIndexName);// new index
    if (indicesInjectors.containsKey(index.name())) {
        throw new IndexAlreadyExistsException(index);
    }

    indicesLifecycle.beforeIndexCreated(index);// 调用生命周期对象中的beforeIndexCreated方法

    logger.debug("creating Index [{}], shards [{}]/[{}]", sIndexName, settings.get(SETTING_NUMBER_OF_SHARDS), settings.get(SETTING_NUMBER_OF_REPLICAS));

    Settings indexSettings = settingsBuilder()
            .put(this.settings)
            .put(settings)
            .classLoader(settings.getClassLoader())
            .build();

    ModulesBuilder modules = new ModulesBuilder();
    modules.add(new IndexNameModule(index));//索引名称
    modules.add(new LocalNodeIdModule(localNodeId)); //就用来记录localNodeId
    modules.add(new IndexSettingsModule(index, indexSettings)); //索引setting   inject IndexSettingsService  和setting
    modules.add(new IndexPluginsModule(indexSettings, pluginsService));  // 索引plugin 注入
    modules.add(new IndexStoreModule(indexSettings));  // 根据配置 注入不同的store对象
    /*
     默认engine
     public static final Class<? extends Module> DEFAULT_INDEX_ENGINE = InternalIndexEngineModule.class;
     public static final Class<? extends Module> DEFAULT_ENGINE = InternalEngineModule.class;
     */
    modules.add(new IndexEngineModule(indexSettings));
    /*
    解析 模块  各种关键字处理   pattern_replace
    CharFiltersBindings   字符处理
    TokenFiltersBindings    词元处理
    TokenizersBindings  分词处理
    AnalyzersBindings   analyzer 分析器   封装了以上处理单元
    https://www.elastic.co/guide/en/elasticsearch/reference/current/analysis-analyzers.html
     */
    modules.add(new AnalysisModule(indexSettings, indicesAnalysisService));
    //我们要修改打分机制,就需要自定义similarity   BM25Similarity
    modules.add(new SimilarityModule(indexSettings));
    /*
    索引相关 cache
     */
    modules.add(new IndexCacheModule(indexSettings));
    // fieldData  doc_value  聚合功能
    modules.add(new IndexFieldDataModule(indexSettings));
    // 编解码模块   PostingsFormatProvider   DocValuesFormatProvider
    //PostingsFormat covers the inverted index, including all fields, terms, documents,
    // positions, payloads and offsets. Queries use this API to find their matching documents.
    modules.add(new CodecModule(indexSettings));

    //mapping
    modules.add(new MapperServiceModule());
    //查询解析
    modules.add(new IndexQueryParserModule(indexSettings));
    //别名
    modules.add(new IndexAliasesServiceModule());
    // gateway  full restart 索引恢复
    modules.add(new IndexGatewayModule(indexSettings, injector.getInstance(Gateway.class)));
    // 索引服务  包含了上边的各个服务模块,直接使用indexService
    modules.add(new IndexModule(indexSettings));

    Injector indexInjector;
    try {
        // 创建一个子注入器
        indexInjector = modules.createChildInjector(injector);
    } catch (CreationException e) {
        throw new IndexCreationException(index, Injectors.getFirstErrorFailure(e));
    } catch (Throwable e) {
        throw new IndexCreationException(index, e);
    }
    // 映射关系
    indicesInjectors.put(index.name(), indexInjector);

     索引服务  包含了上边的各个服务模块,直接使用indexService
    IndexService indexService = indexInjector.getInstance(IndexService.class);

    // 生命周期
    indicesLifecycle.afterIndexCreated(indexService);

    indices = newMapBuilder(indices).put(index.name(), indexService).immutableMap();

    return indexService;
}

6)将mappings信息添加到MapperService中,然后根据MapperService生成MappingMetaData元数据。每个type对应一个MappingMetaData。
7)根据索引名称,索引setting,mapping元数据MappingMetaData,custom,和请求中的索引状态来构建一个索引元数据IndexMetaData。将索引元数据添加到元数据中MetaData
8)若是request中有索引对应的block信息,则添加到ClusterBlocks中,若是request的state等于close(索引关闭),则添加此block到ClusterBlocks中。block用来定义索引的访问权限

write read metadata 三种类型阻塞
                        block
index read-only  / write  metadata
index read  / read
index write /write
index metadata /metadata
index close /read write

9)更新metaData 和 block 到ClusterState 中
10)若是request的state等于Open,则构建routingTable,进行分片分配,这个过程由ShardsAllocator接口的实现类和AllocationDecider抽象类的子类来完成。
11)对比创建以后的集群状态和前的状态,有变化则集群状态的version,routingTable的version与metadata的version都会加1,构建一个新的ClusterState,发布集群状态到所有节点,将集群状态更新事件作为参数调用各个集群状态更新的listener。等到收到所有节点的ack信息,则释放锁,返回Response

1.源码阅读笔记


版权声明:本文为mhtian2015原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。