014.Elasticsearch分布式原理

1.集群发现机制 通常我们在每台机器部署并启动一个ES进程,怎么让多台机器上的多个ES进程,互相发现对方,然后完美的组成一个ES集群呢?
默认情况下,ES进程会绑定在自己的回环地址上,也就是127.0.0.1,然后扫描本机上的9300~9305端口号,尝试跟这些端口上启动的其他ES进程进行通信,然后组成一个集群。这对于在本机上搭建ES集群的开发环境是很方便的。但是对于生产环境下的集群是不行的,需要将每台ES进程绑定在一个非回环的IP地址上,才能跟其他节点进行通信,同时需要使用集群发现机制(discovery)来跟其他节点上的ES node进行通信,同时discovery机制也负责ES集群的Master选举。
ES node中有Master Node和Data Node两种角色。
ES 是一种p2p,也就是点对点(Peer to Peer)的分布式系统架构,不是Hadoop生态普遍采用的那种Master-Slave主从架构的分布式系统。集群中的每个node是直接跟其他节点进行通信的,几乎所有的API操作,比如index,delete,search等都不是Client跟Master通信,而是Client跟任何一个node进行通信,那个node再将请求转发给对应的node来进行执行。
两个角色,Master Node,Data Node。正常情况下,就只有一个Master Node。master node的责任就是负责维护整个集群的状态信息,也就是一些集群元数据信息,同时在新node加入集群或者从集群中下线时,或者是创建或删除了一个索引后,重新分配shard。包括每次集群状态如果有改变的化,那么master都会负责将集群状态同步给所有的node。
Master Node负责接收所有的集群状态变化相关的信息,然后将改变后的最新集群状态推动给集群中所有的Data Node,集群中所有的node都有一份完整的集群状态。只不过Master Node负责维护而已。其他的node,除了master之外的Data Node,就是负责数据的读写。
如果要让多个Node组成一个es集群,首先第一个要设置的参数,就是cluster.name,多个node的cluster.name一样,才满足组成一个集群的基本条件。cluster.name的默认值是my-application,在生产环境中,一定要修改这个值,否则可能会导致未知的node无端加入集群,造成集群运行异常。
而ES中默认的discovery机制,就是zen discovery机制,zen discovery机制提供了unicast discovery集群发现机制,集群发现时的节点间通信是依赖的Transport Module,也就是ES底层的网络通信模块和协议。
ES默认配置是使用unicast集群发现机制,从而让经过特殊配置的节点可以组成一个集群,而不是随便哪个节点都可以组成一个集群。但是默认配置下,unicast是本机,也就是localhost,因此只能在一台机器上启动多个node来组成一个集群。虽然ES还是会提供multicast plugin作为一个发现机制,但是已经不建议在生产环境中使用了。虽然我们可能想要multicast的简单性,就是所有的node可以再接收到一条multicast ping之后就立即自动加入集群。但是multicast机制有很多的问题,而且很脆弱,比如网络有轻微的调整,就可能导致节点无法发现对方。因此现在建议在生产环境中用unicast机制,提供一个ES种子节点作为中转路由节点就可以了。
另外还需要在集群中规划出专门的Master Eligible Node和Data node,一个节点只要它是Master Eligible Node,才有可能被选举为真正的Master Node,选举出真正的Master Node之后,其他的Master Eligible Node,将在Master Node故障之后,通过选举,从中再产生一个新的Master Node,同时,所有的非Master Node,都是Data Node,也就是说,Master Eligible Node只是有机会成为Master Node,只要你不是Master Node,你就是Data Node,而不是Master Eligible Node的Data Node是没有升级成为Master Node的资格的。
如果是一个小集群,10个以内的节点,那就所有节点都可以作为Master Eligible Node以及Data node即可,超过10个node的集群再单独拆分Master Eligible Node和Data Node。

# 设置为Master Eligible Node node.master: true # 设置为Data Node node.data: true

默认情况下,ES会将自己绑定到127.0.0.1上,对于运行一个单节点的开发模式下的ES是ok的。但是为了让节点间可以互相通信以组成一个集群,需要让节点绑定到一个IP地址上
network.host: 192.168.0.1

只要不是本地回环地址,ES就会认为我们从开发模式迁移到生产模式,同时会启用一系列的集群检测
  • ping:ping是一个node用discovery机制来发现其他node的一个过程
  • unicast:unicast discovery集群发现机制是要求配置一个主机列表,这些主机作为Gossip通信协议的路由器,如果通过hostname来指定,那么在ping的时候会被解析为IP地址
discovery.zen.ping.unicast.hosts: ["host1", "host2"]

简单来说,如果要让多个节点发现对方并且组成一个集群,那么就得有一个中间的公共节点,当不同的节点发送请求到这些公共节点,通过这些公共节点交换各自的信息,进而让所有的node感知到其他的node存在,并且进行通信,最后组成一个集群。这就是基于gossip流言式通信协议的unicast集群发现机制。
当一个node与discovery.zen.ping.unicast.hosts中的一个成员通信之后,就会接收到一份完整的集群状态,接着这个node再跟master通信,并且加入集群中。这就意味着,discovery.zen.ping.unicast.hosts是不需要列出集群中的所有节点的,只要提供少数几个node,比如3个,让新的node可以连接上即可,如果我们给集群中分配了几个节点作为专门的master节点,那么这里配置那些master节点即可,这个配置中也可以指定端口:
discovery.zen.ping.unicast.hosts: ["host1", "host2:9201"]

为集群选举出一个master是很重要的,ES集群会自动完成这个操作,node.master设置为false的节点是无法称为Master的,discovery.zen.minimum_master_nodes参数用于设置对于一个ES集群来说,必须拥有的最少的正常在线的Master Eligible Node的个数,否则会发生"集群脑裂"现象,假如在集群中设置了3个Master Eligible Node,那么这个值应该为(master_eligible_nodes / 2) + 1,即2
discovery.zen.minimum_master_nodes: 2

1.1 集群脑裂 discovery.zen.minimum_master_nodes参数对于集群的可靠性来说,是非常重要的。这个设置可以预防脑裂问题,也就是预防一个集群中存在两个master。
这个参数的作用,就是告诉ES直到有足够的master候选节点时,才可以选举出一个master,否则就不要选举出一个master。这个参数必须被设置为集群中master候选节点的quorum数量,也就是大多数。quorum的算法,就是:master候选节点数量 / 2 + 1,比如我们有10个节点,都能维护数据,也可以是master候选节点,那么quorum就是10 / 2 + 1 = 6,如果我们有三个master候选节点,还有100个数据节点,那么quorum就是3 / 2 + 1 = 2,Elasticsearch要求最少有3个节点,如果我们有2个节点,都可以是master候选节点,那么quorum是2 / 2 + 1 = 2。此时就有问题了,因为如果一个Master挂掉了,那么剩下一个master候选节点,是无法满足quorum数量的,也就无法选举出新的master,集群就彻底挂掉了。此时就只能将这个参数设置为1,如果发生了网络分区,那么两个分区中都会有一个Master,还是无法避免集群脑裂
那么这个是参数是如何避免脑裂问题的产生的呢?比如我们有3个节点,quorum是2,现在网络故障,1个节点在一个网络区域,另外2个节点在另外一个网络区域,不同的网络区域内无法通信。这个时候有两种情况:
  • 如果master是单独的那个节点,另外2个节点是master候选节点,那么此时那个单独的master节点因为没有指定数量的候选master node在自己当前所在的集群内,因此就会取消当前master的角色,尝试重新选举,但是无法选举成功。然后另外一个网络区域内的node因为无法连接到master,就会发起重新选举,因为有两个master候选节点,满足了quorum,因此可以成功选举出一个master。此时集群中就会还是只有一个master。
  • 如果master和另外一个node在一个网络区域内,然后一个node单独在一个网络区域内。那么此时那个单独的node因为连接不上master,会尝试发起选举,但是因为master候选节点数量不到quorum,因此无法选举出master。而另外一个网络区域内,原先的那个master还会继续工作。这也可以保证集群内只有一个master节点。
在ES集群是可以动态增加和下线节点的,所以可能随时会改变quorum。所以这个参数也是可以通过API随时修改的,特别是在节点上线和下线的时候,都需要作出对应的修改。而且一旦修改过后,这个配置就会持久化保存下来。
PUT /_cluster/settings { "persistent" : { "discovery.zen.minimum_master_nodes": 2 } }

此外还有一些其他的关于集群发现机制相关的配置,以下将列出上述讨论的参数以及一些其他的参数做总结:
#设置集群为single-node模式,这样的话,ES将不会再从外部去发现其他节点,默认不配资,代表可以发现其他节点 discovery.type: single-node discovery.zen.ping.unicast.hosts: ["host1", "host2"] #主机名被DNS解析为IP地址的超时时间 discovery.zen.ping.unicast.resolve_timeout: 5s #在决定开始选举或加入现有集群之前节点将等待多长时间 discovery.zen.ping_timeout: 3s #节点决定加入现有集群后,向Master发送加入请求,超时时间默认discovery.zen.ping_timeout时间的20倍 discovery.zen.join_timeout: 20 #Master发送修改集群状态的消息给其他节点,如果在此时间内还没有至少discovery.zen.minimum_master_nodes个备用Master节点回复,则此次修改集群状态的动作不会发生 discovery.zen.commit_timeout: 30s #满足上面的条件后,Master发送修改集群状态的消息给全部Node,然后全部Node开始修改自身的集群状态信息,Master只有会等待所有的Node响应,最多等待此处的时间,然后才会开始下一次状态修改的流程 discovery.zen.publish_timeout: 30s #设置集群中备用master的quorum,如果集群中备用的Master的个数少于此配置,将引发集群脑裂的现象 discovery.zen.minimum_master_nodes: 2 #选举Master的时候,是否忽略node.master=false的节点发送的ping消息,默认false discovery.zen.master_election.ignore_non_master_pings: false #当集群中没有存活的Master后,禁用外部请求允许的操作,write:外部请求只能读,不能写,all:外部请求不能读写ES集群 discovery.zen.no_master_block: write #Master与Data Node互相发送ping消息的时间间隔 ping_interval: 1s #Master与Data Node互相发送ping消息超时时间 ping_timeout: 30s #Master与Data Node互相发送ping消息失败后的重试次数 ping_retries: 3

2. 分片机制 014.Elasticsearch分布式原理
文章图片
  • 一个index有多个主分片,每个主分片都是一个最小的工作单元,承载部分数据,每条数据只能存在于一个主分片及其副本中,每个主分片都有完整的建立索引和处理请求的能力,ES-6.X默认一个index有5个主分片,每个主分片有1个副本,而ES-7.X默认一个index有1个主分片,一个主分片有一个副本,主分片的副本数可以动态的修改,主分片的数量则需要在创建index的时候设置好,否则会使用默认值,修改主分片的个数代价很大,需要重建索引,操作复杂,本文不做讨论
  • ES集群会自动地、尽量均匀的把一个index的所有分片分布到集群的不同节点上,增减节点的时候,ES也自动地做分片的负载均衡
  • 主分片和其副本不能同时存在于一个节点上,否则这个index的健康状态就不是"green"
  • 查看index的分片数
    GET /index_name/_settings{ "index_name" : { "settings" : { "index" : { "creation_date" : "1593417909285", "number_of_shards" : "5", "number_of_replicas" : "1", "uuid" : "-nelhb1LRX6z7tXk647yGw", "version" : { "created" : "6060099" }, "provided_name" : "shop" } } } }

  • 设置index的分片数
    PUT index_name { "settings": { "index": { "number_of_shards": 1, "number_of_replicas": 1 } } }

  • 修改副本数
    PUT /index_name/_settings { "number_of_replicas": 2 }

  • 查看索引分片信息
    GET /index_name/_search_shards

  • 手动移动分片
    POST /_cluster/reroute { "commands": [ { "move": { "index": "index_name", "shard": 0, "from_node": "node_name1", "to_node": "node_name2" } } ] }

3. 集群扩容
  • 垂直扩容:当集群性能达到瓶颈后,采购更强大的服务器来替换原来的服务器,成本高昂,一般不采用这种方式
  • 水平扩容:集群性能达到瓶颈后,添加更多的ES节点,是业界常用的方案
  • 扩容极限:假设ES集群中的索引是有3个shard、每个shard有一个副本,那么扩容的极限就是6个ES node,每个node上有1个shard或者1个副本,这样每个shard(主分片或者副本)可以占用单台服务器的所有资源,性能最好
  • 如果超过扩容极限:假设ES集群中的索引是有3个shard、每个shard有一个副本,那么6个ES node已经到达扩容极限了,要想再扩容,就要动态修改副本个数,比如将副本数由1修改为3后,再扩容6个ES node,将提升1倍集群的吞吐量(原来读请求只能由1个shard或者1个副本处理,现在可以由1个shard或者3个副本处理)
4. 读写请求路由原理 4.1 Document路由原理
  • 客户端对一条document进行操作(CRUD),传给ES集群的除了数据本身,还会传一个routing number,这个routing number默认是这个document的"_id",可以手动指定也可以自动生成
  • 这条document位于哪个分片上,由以下公式决定:hash(routing number) % number_of_shards,计算结果就是数据所在主分片的编号,这样就找到了数据在哪个分片上
  • 可以手动指定routing number的值
    POST /index/type/id?routing=your_routing_number

    手动设置routing number的值这个功能很重要,假如想让业务上某一类数据都保存在同一个主分片上以提高批量读取的性能,那么将这些数据设置为相同的routing_number即可
  • 这就解释了为什么无法修改一个索引的主分片个数,一旦修改,那么根据新的主分片个数计算得到的主分片编号就不对了,也就无法找到数据了
  • 根据routing number查看document在哪个主分片上
    GET /index/_search_shards?routing=your_routing_number

4.2 请求转发 写请求:
  • 假设集群中有三个节点,node01、node02、node03
  • 客户端向node01发送写请求(增删改),node01此时担任一个协调节点的角色(coordinating node,请求第一次发送给谁,谁就是协调节点),node01通过路由算法得知该条文档在node03,于是node01将写请求转发到node03
  • node03上数据对应的主分片处理该条请求之后,将数据变化同步给其副本
  • node03将处理结果返回给node01
  • node01将结果返回给客户端
  • 注意:写请求只能由主分片去完成
读请求:
  • 假设集群中有三个节点,node01、node02、node03
  • 客户端向node01发送读请求,node01通过路由算法得知该数据的主分片编号,由于节点对等关系,每个节点都保存了全部的集群状态信息,所有node01就可以知道这个主分片的副本在哪些节点
  • 于是node01直接将读请求转发到主分片所在的节点或者副本所在的节点,对同一数据的多次请求会做负载均衡(随机轮询)
  • 然后node01将返回结果返回给客户端
  • 特殊情况:如果请求转发到副本时,此时数据只在有主分片中有(还没有来得及同步给副本就收到读请求了),那么此时读请求会阻塞,直到可以副本同步成功之后再从副本中读取数据
5. 写一致性 在进行增删改操作的时候,可以手动指定写一致性策略:
PUT /index_name/type_name/id?consistency=quorum

写一致性策略包括:
  • one:只要集群中有一个primary shard是可用的,就可以执行此操作
  • all:集群中所有的primary shard和replica shard都是可用的,才可以执行这个操作
  • quorum:要求所有的shard(包括主分片和副本)中大部分都是可用的,这个操作才可以执行,quorum的计算方法为:
    (number_of_shards + number_of_replicas) / 2 + 1

    当可用shard数>=上述计算时,写操作才可以执行
  • 注意:当number_of_replicas > 1时,quorum机制才生效,考虑以下特殊情况:假设number_of_shards=1,number_of_replicas=1,那么quorum=2,也就是说,只有存活的shard>=2时,写操作才可以执行,那么假如ES集群就一个node,显然是无法满足这个条件的。为了让单节点的ES也正常运行,所以当number_of_replicas > 1时,quorum机制才生效
  • 当quorum不满足要求时,ES会默认等待1min,如果1min还无法达到执行写操作的quorum要求,那么宣告写操作失败,可以手动执行这个超时时间:
# 默认单位为ms,其他单位需要手动指定,例如timeout=30s PUT /index_name/type_name/id?timeout=30

6. 乐观锁
  • 线程安全问题:多线程环境下,对文档的修改操作可能会出现线程安全问题
  • 乐观锁和悲观锁
    • 悲观锁:认为我在修改数据的同时,别人一定会修改我的数据,所以每当我要修改数据,我就先上锁
    • 乐观锁:认为我在修改数据的同时,别人一定不会修改我的数据,所以我修改数据的时候不上锁,但是我在修改的时候,要先判断一下别人有没有修改过,如果修改过了,我先更新到被人修改之后的版本,我再更新,一般情况下,ES的业务场景都是读多写少,所以ES使用基于版本号的乐观锁机制来控制并发
  • 文档的版本号
    插入一条新文档后,它的初始"_version"=1
    PUT /user/_doc/3 { "uid": "1003", "uname": "Jerry" }{ "_index" : "user", "_type" : "_doc", "_id" : "3", "_version" : 1, "result" : "created", "_shards" : { "total" : 2, "successful" : 2, "failed" : 0 }, "_seq_no" : 5, "_primary_term" : 1 }

    之后每次对这个文档好进行修改包括删除操作,它的"_version"都会加1:
    PUT /user/_doc/3 { "uid": "1003", "uname": "Tony" }{ "_index" : "user", "_type" : "_doc", "_id" : "3", "_version" : 2, "result" : "updated", "_shards" : { "total" : 2, "successful" : 2, "failed" : 0 }, "_seq_no" : 6, "_primary_term" : 1 }DELETE /user/_doc/3{ "_index" : "user", "_type" : "_doc", "_id" : "3", "_version" : 3, "result" : "deleted", "_shards" : { "total" : 2, "successful" : 2, "failed" : 0 }, "_seq_no" : 7, "_primary_term" : 1 }

    假设有两个客户端同时修改这条数据,A客户端读取到此数据时,version=1,然后A客户端开始修改此数据,同一时间,B客户端也修改此数据,修改成功之后,verison变为了2,那么A客户端再提交自己的修改就会失败,ES的机制是,修改前后的版本号要相同才可以提交,A客户端再次提交修改请求,拿到verison=2的数据并修改,才会成功,以下是一个模拟使用版本号进行并发控制的实验:
    # 插入一条数据,其初始版本为1 PUT /user/_doc/4 { "uid": "1004", "uname": "Bob" }{ "_index" : "user", "_type" : "_doc", "_id" : "4", "_version" : 1, "result" : "created", "_shards" : { "total" : 2, "successful" : 2, "failed" : 0 }, "_seq_no" : 8, "_primary_term" : 1 }# 客户端A修改了数据,提交的时候指定当前verison=1,与服务端版本号一致 # 所以可以修改成功,然后version变为2 PUT /user/_doc/4?version=1 { "uid": "1004", "uname": "Bob Bob" }{ "_index" : "user", "_type" : "_doc", "_id" : "4", "_version" : 2, "result" : "updated", "_shards" : { "total" : 2, "successful" : 2, "failed" : 0 }, "_seq_no" : 9, "_primary_term" : 1 }# 客户端B修改数据,提交的时候指定当前verison=1,与服务端版本号不一致 # 修改失败 PUT /user/_doc/4?version=1 { "uid": "1004", "uname": "Bob Bob Bob" }{ "error": { "root_cause": [ { "type": "version_conflict_engine_exception", "reason": "[_doc][4]: version conflict, current version [2] is different than the one provided [1]", "index_uuid": "PIpA38NgRfyomgHHdAmHnw", "shard": "0", "index": "user" } ], "type": "version_conflict_engine_exception", "reason": "[_doc][4]: version conflict, current version [2] is different than the one provided [1]", "index_uuid": "PIpA38NgRfyomgHHdAmHnw", "shard": "0", "index": "user" }, "status": 409 }# 客户端B再次修改数据,提交的时候指定当前verison=2,与服务端版本号一致 # 修改成功,version+1 { "_index" : "user", "_type" : "_doc", "_id" : "4", "_version" : 3, "result" : "updated", "_shards" : { "total" : 2, "successful" : 2, "failed" : 0 }, "_seq_no" : 10, "_primary_term" : 1 }

  • External Version
    ES提供了一个特性,你可以不用它提供的内部version版本号来进行并发控制,可以基于你自己维护的一个版本号来进行并发控制,例如,假如你的数据在MySQL里也有一份,然后你的应用系统本身就维护了一个版本号,这个时候,你可能想要用你自己维护的那个version来进行控制,外部version与内部version的区别是:内部version:只有当你提供的version与ES中的version一模一样的时候,才可以进行修改,只要不一样,就报错;外部version:只要你提供的version比ES中的version大,就能完成修改
    # 插入一条数据,其初始版本为1 PUT /user/_doc/5 { "uid": "1005", "uname": "Ella" }{ "_index" : "user", "_type" : "_doc", "_id" : "5", "_version" : 1, "result" : "created", "_shards" : { "total" : 2, "successful" : 2, "failed" : 0 }, "_seq_no" : 11, "_primary_term" : 1 }# 客户端A修改了数据,提交的时候指定外部verison=5,大于服务器版本 # 所以可以修改成功,然后version变为5 PUT /user/_doc/5?version=5&version_type=external { "uid": "1005", "uname": "Ella Ella" }{ "_index" : "user", "_type" : "_doc", "_id" : "5", "_version" : 5, "result" : "updated", "_shards" : { "total" : 2, "successful" : 2, "failed" : 0 }, "_seq_no" : 12, "_primary_term" : 1 }# 客户端B修改数据,提交的时候指定当前verison=4,小于服务器版本 # 修改失败 PUT /user/_doc/5?version=4&version_type=external { "uid": "1005", "uname": "Ella Ella Ella" }{ "error": { "root_cause": [ { "type": "version_conflict_engine_exception", "reason": "[_doc][5]: version conflict, current version [5] is higher or equal to the one provided [4]", "index_uuid": "PIpA38NgRfyomgHHdAmHnw", "shard": "0", "index": "user" } ], "type": "version_conflict_engine_exception", "reason": "[_doc][5]: version conflict, current version [5] is higher or equal to the one provided [4]", "index_uuid": "PIpA38NgRfyomgHHdAmHnw", "shard": "0", "index": "user" }, "status": 409 }# 客户端B再次修改数据,提交的时候指定当前verison=7,大于服务器版本 # 修改成功,version变为7 PUT /user/_doc/5?version=7&version_type=external { "uid": "1005", "uname": "Ella Ella Ella" }{ "_index" : "user", "_type" : "_doc", "_id" : "5", "_version" : 7, "result" : "updated", "_shards" : { "total" : 2, "successful" : 2, "failed" : 0 }, "_seq_no" : 13, "_primary_term" : 1 }

另外,retry_on_conflict参数可以设置重试次数:
POST /user/_doc/5?retry_on_conflict=5&version=7

当更新失败后,再次获取document数据和最新版本号,基于最新的版本号再次尝试更新,这样的重复会自动执行retry_on_conflict设置的次数,知道更新成功或者重复这么多次后还没有更新成功则返回Error
7. Lazy Delete 【014.Elasticsearch分布式原理】上面说到,删除一条文档的时候,将其version+1,实际上,删除文档并不会直接进行物理删除,而是标记为delete状态,随着数据的增加,ES会选择一个合适的时机批量删除标记为delete状态这批数据,所以在物理删除该数据之前,客户端再插入一条id相同的数据,只是将原来的数据的状态又改为update,然后version+1,其实还是修改了原数据,而非插入一条新数据

    推荐阅读