ElasticSearch系列 - ElasticSearch读写原理分析

1. 写流程

Elasticsearch 是当前主流的搜索引擎,其具有扩展性好,查询速度快,查询结果近实时等优点,本文将对Elasticsearch的写操作进行分析。

1.1. lucene的写操作及其问题

Elasticsearch底层使用Lucene来实现doc的读写操作,Lucene通过

public long addDocument(...);
public long deleteDocuments(...);
public long updateDocument(...);

 

三个方法来实现文档的写入,更新和删除操作。但是存在如下问题

  1. 没有并发设计
    lucene只是一个搜索引擎库,并没有涉及到分布式相关的设计,因此要想使用Lucene来处理海量数据,并利用分布式的能力,就必须在其之上进行分布式的相关设计。
  2. 非实时
    将文件写入lucence后并不能立即被检索,需要等待lucene生成一个完整的segment才能被检索
  3. 数据存储不可靠
    写入lucene的数据不会立即被持久化到磁盘,如果服务器宕机,那存储在内存中的数据将会丢失
  4. 不支持部分更新
    lucene中提供的updateDocuments仅支持对文档的全量更新,对部分更新不支持

1.2. Elasticsearch的写入方案

针对Lucene的问题,ES做了如下设计

1.2.1. 分布式设计:

为了支持对海量数据的存储和查询,Elasticsearch引入分片的概念,一个索引被分成多个分片,每个分片可以有一个主分片和多个副本分片,每个分片副本都是一个具有完整功能的lucene实例。分片可以分配在不同的服务器上,同一个分片的不同副本不能分配在相同的服务器上。
在进行写操作时,ES会根据传入的_routing参数(或mapping中设置的_routing, 如果参数和设置中都没有则默认使用_id), 按照公式shard_num = hash(\routing) % num_primary_shards,计算出文档要分配到的

分片,在从集群元数据中找出对应主分片的位置,将请求路由到该分片进行文档写操作。 

1.2.2. 近实时性-refresh操作

当一个文档写入Lucene后是不能被立即查询到的,Elasticsearch提供了一个refresh操作,会定时地调用lucene的reopen(新版本为openIfChanged)为内存中新写入的数据生成一个新的segment,此时被处理的文档均可以被检索到。refresh操作的时间间隔由refresh_interval参数控制,默认为1s, 当然还可以在写入请求中带上refresh表示写入后立即refresh,另外还可以调用refresh API显式refresh。

1.2.3.  数据存储可靠性

  1. 引入translog
    当一个文档写入Lucence后是存储在内存中的,即使执行了refresh操作仍然是在文件系统缓存中,如果此时服务器宕机,那么这部分数据将会丢失。为此ES增加了translog, 当进行文档写操作时会先将文档写入Lucene,然后写入一份到translog,写入translog是落盘的(如果对可靠性要求不是很高,也可以设置异步落盘,可以提高性能,由配置index.translog.durabilityindex.translog.sync_interval控制),这样就可以防止服务器宕机后数据的丢失。由于translog是追加写入,因此性能比较好。与传统的分布式系统不同,这里是先写入Lucene再写入translog,原因是写入Lucene可能会失败,为了减少写入失败回滚的复杂度,因此先写入Lucene.
  2. flush操作
    另外每30分钟或当translog达到一定大小(由index.translog.flush_threshold_size控制,默认512mb), ES会触发一次flush操作,此时ES会先执行refresh操作将buffer中的数据生成segment,然后调用lucene的commit方法将所有内存中的segment fsync到磁盘。此时lucene中的数据就完成了持久化,会清空translog中的数据(6.x版本为了实现sequenceIDs,不删除translog) 
  3. merge操作
    由于refresh默认间隔为1s中,因此会产生大量的小segment,为此ES会运行一个任务检测当前磁盘中的segment,对符合条件的segment进行合并操作,减少lucene中的segment个数,提高查询速度,降低负载。不仅如此,merge过程也是文档删除和更新操作后,旧的doc真正被删除的时候。用户还可以手动调用_forcemerge API来主动触发merge,以减少集群的segment个数和清理已删除或更新的文档。
  4. 多副本机制
    另外ES有多副本机制,一个分片的主副分片不能分片在同一个节点上,进一步保证数据的可靠性。

1.2.4. 部分更新

lucene支持对文档的整体更新,ES为了支持局部更新,在Lucene的Store索引中存储了一个_source字段,该字段的key值是文档ID, 内容是文档的原文。当进行更新操作时先从_source中获取原文,与更新部分合并后,再调用lucene API进行全量更新, 对于写入了ES但是还没有refresh的文档,可以从translog中获取。另外为了防止读取文档过程后执行更新前有其他线程修改了文档,ES增加了版本机制,当执行更新操作时发现当前文档的版本与预期不符,则会重新获取文档再更新。

1.3.  ES的写入流程

ES的任意节点都可以作为协调节点(coordinating node)接受请求,当协调节点接受到请求后进行一系列处理,然后通过_routing字段找到对应的primary shard,并将请求转发给primary shard, primary shard完成写入后,将写入并发发送给各replica, raplica执行写入操作后返回给primary shard, primary shard再将请求返回给协调节点。大致流程如下图: 

写流程具体操作流程图

1.3.1. coordinating节点

ES中接收并转发请求的节点称为coordinating节点,ES中所有节点都可以接受并转发请求。当一个节点接受到写请求或更新请求后,会执行如下操作:

  1. ingest pipeline
    查看该请求是否符合某个ingest pipeline的pattern, 如果符合则执行pipeline中的逻辑,一般是对文档进行各种预处理,如格式调整,增加字段等。如果当前节点没有ingest角色,则需要将请求转发给有ingest角色的节点执行。
  2. 自动创建索引
    判断索引是否存在,如果开启了自动创建则自动创建,否则报错
  3. 设置routing
    获取请求URL或mapping中的_routing,如果没有则使用_id, 如果没有指定_id则ES会自动生成一个全局唯一ID。该_routing字段用于决定文档分配在索引的哪个shard上。
  4. 构建BulkShardRequest
    由于Bulk Request中包含多种(Index/Update/Delete)请求,这些请求分别需要到不同的shard上执行,因此协调节点,会将请求按照shard分开,同一个shard上的请求聚合到一起,构建BulkShardRequest
  5. 将请求发送给primary shard
    因为当前执行的是写操作,因此只能在primary上完成,所以需要把请求路由到primary shard所在节点
  6. 等待primary shard返回

1.3.2.  primary shard

Primary请求的入口是PrimaryOperationTransportHandler的MessageReceived, 当接收到请求时,执行的逻辑如下

  1. 判断操作类型
    遍历bulk请求中的各子请求,根据不同的操作类型跳转到不同的处理逻辑
  2. 将update操作转换为Index和Delete操作
    获取文档的当前内容,与update内容合并生成新文档,然后将update请求转换成index请求,此处文档设置一个version v1
  3. Parse Doc
    解析文档的各字段,并添加如_uid等ES相关的一些系统字段
  4. 更新mapping
    对于新增字段会根据dynamic mapping或dynamic template生成对应的mapping,如果mapping中有dynamic mapping相关设置则按设置处理,如忽略或抛出异常
  5. 获取sequence Id和Version
    从SequcenceNumberService获取一个sequenceID和Version。SequcenID用于初始化LocalCheckPoint, verion是根据当前Versoin+1用于防止并发写导致数据不一致。
  6. 写入lucene
    这一步开始会对文档uid加锁,然后判断uid对应的version v2和之前update转换时的versoin v1是否一致,不一致则返回第二步重新执行。 如果version一致,如果同id的doc已经存在,则调用lucene的updateDocument接口,如果是新文档则调用lucene的addDoucument. 这里有个问题,如何保证Delete-Then-Add的原子性,ES是通过在Delete之前会加上已refresh锁,禁止被refresh,只有等待Add完成后释放了Refresh Lock, 这样就保证了这个操作的原子性。
  7. 写入translog
    写入Lucene的Segment后,会以key value的形式写Translog, Key是Id, Value是Doc的内容。当查询的时候,如果请求的是GetDocById则可以直接根据_id从translog中获取。满足nosql场景的实时性。
  8. 重构bulk request
    因为primary shard已经将update操作转换为index操作或delete操作,因此要对之前的bulkrequest进行调整,只包含index或delete操作,不需要再进行update的处理操作。
  9. flush translog
    默认情况下,translog要在此处落盘完成,如果对可靠性要求不高,可以设置translog异步,那么translog的fsync将会异步执行,但是落盘前的数据有丢失风险。
  10. 发送请求给replicas
    将构造好的bulkrequest并发发送给各replicas,等待replica返回,这里需要等待所有的replicas返回,响应请求给协调节点。如果某个shard执行失败,则primary会给master发请求remove该shard。这里会同时把sequenceID, primaryTerm, GlobalCheckPoint等传递给replica。
  11. 等待replica响应
    当所有的replica返回请求时,更细primary shard的LocalCheckPoint。

1.3.3.  replica shard

Replica 请求的入口是在ReplicaOperationTransportHandler的messageReceived,当replica shard接收到请求时执行如下流程:

  1. 判断操作类型
    replica收到的写如请求只会有add和delete,因update在primary shard上已经转换为add或delete了。根据不同的操作类型执行对应的操作
  2. Parse Doc
  3. 更新mapping
  4. 获取sequenceId和Version 直接使用primary shard发送过来的请求中的内容即可
  5. 写如lucene
  6. write Translog
  7. Flush translog

1.4. 异常处理

  1. 如果请求在协调节点的路由阶段失败,则会等待集群状态更新,拿到更新后,进行重试,如果再次失败, 则仍旧等集群状态更新,直到超时 1 分钟为止。超时后仍失败则进行整体 请求失败处理
  2. 在主分片写入过程中,写入是阻塞的。只有写入成功,才会发起写副本请求。如果主 shard 写失败,则整个请求被认为处理失败 。 如果有部分副本写失败,则整个请求被认为处理成功。
  3. 无论主分片还是副分片,当写一个 doc 失败时,集群不会重试,而是关闭本地 shard, 然后向 Master汇报,删除是以 shard为单位的。删除后会新建一个分片,然后进行数据同步

1.5.  总结与分析

Elasticsearch建立在Lucene基础之上,底层采用lucene来实现文件的读写操作,实现了文档的存储和高效查询。然后lucene作为一个搜索库在应对海量数据的存储上仍有一些不足之处。
Elasticsearch通过引入分片概念,成功地将lucene部署到分布式系统中,增强了系统的可靠性和扩展性。
Elasticsearch通过定期refresh lucene in-momory-buffer中的数据,使得ES具有了近实时的写入和查询能力。
Elasticsearch通过引入translog,多副本,以及定期执行flush,merge等操作保证了数据可靠性和较高的存储性能。
Elasticsearch通过存储_source字段结合verison字段实现了文档的局部更新,使得ES的使用方式更加灵活多样。
Elasticsearch基于lucene,又不简单地只是lucene,它完美地将lucene与分布式系统结合,既利用了lucene的检索能力,又具有了分布式系统的众多优点。

 

2. 读流程

  1. 客户端发送请求到任意一个node,成为coordinate node
  2. coordinate node对document进行路由,将请求转发到对应的node,此时会使用round-robin随机轮询算法,在primary shard以及其所有replica中随机选择一个,让读请求负载均衡
  3. 接收请求的node返回document给coordinate node
  4. coordinate node返回document给客户端

在读取时,文档可能己经存在于主分片上,但还没有复制到副分片。在这种情况下,读请

求命中副分片时可能会报告文档不存在,但是命中主分片可能成功返回文挡。 es可以提高分片优先级进行查询(将该分片放在前列)

2.1. GET

GET读操作时,具体流程图

  1. sendRequest是异步
  2. 从es5.x开始,读取只从lucene中读取,不从translog读取,每次读取会判断是否需要刷盘
  3. 当一个请求返回onFailure后,取下一个分片地址

2.2. MGET

MGET读操作时,具体流程图

  1. 遍历请求,计算出每个 doc 的路由信息,得到由 shardid为 key组成的 request map
  2. 循环处理组织好的每个 shard 级请求,调用处理 GET 请求时使用 TransportSingle­ ShardAction#AsyncSingleAction 处理单个 doc 的流程。
  3. 收集 Response,全部 Response 返回后执行 finishHim(),给客户端返回结果。 回复的消息中文档顺序与请求的顺序一致。如果部分文档读取失败,则不影响其他结果,检索失败的 doc 会在回复信息中标出

 

3. Search流程

search流程分为两个步骤  query+fetch

因为不知道文档id,需要根据传入的搜索关键词,查询出符合条件的文档id(query阶段),然后根据得到的文档id,查询出文档信息(fetch阶段)

fetch可以转换为get/mget一样

search 有两种模式:精确/全文

精确:只返回有/无

全文:返回评分最高的(不保证是你想要的那个),所以需要调整评分

检索流程图

3.1. Query流程

Search有两种类型

DFS_QUERY_THEN_FETCH

QUERY_THEN_FETCH(默认)

DFS会全局评分,算分会更加准确,因为QUERY_THEN_FETCH模式下的算法是每个分片进行算分,然后汇总取topN,可能存在误差

QUERY_THEN_FETCH搜索在Query阶段的步骤

  1. 请求发到协调节点
  2. 协调节点转发查询请求到索引的每个分片上(可能是主,也可能是副本,负载均衡)
  3. 每个分片在本地执行查询,并且使用本地的Term/Document Frequency信息进行评分,添加结果到大小from+size的本地优先队列中
  4. 每个分片返回各自的优先队列中所有的文档id和排序值给协调节点,协调节点再合并这些值放入自己的优先队列中,产生一个全局排序后的列表

如果是DSF,就不会在分片本地进行算分,而是信息集中起来到进行算分

 

3.2. Fetch流程

fetch目的就是根据文档id得到完整的文档内容

fetch操作流程

  1. 协调节点向相关的node发生GET请求
  2. 分片所在节点查询出内容后返回数据
  3. 协调节点等待所有文档都返回了,返回返回给客户端

假设这里查询from=10000,size=10的数据

这里其实会创建number_of_shards * (from+ size)优先认列,每个分片会创建from+ size大小的优先队列。为了避免一些问题,应尽量控制分页深度

 

 

如果您觉得这篇文章对您有用,欢迎各位关注我的公众号【Java程序喵】


版权声明:本文为qq_33330687原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。