InfluxDB集群 -- 移除MetaNode源码分析

influxdb集群中,client在node1上执行remove metaNode node3的命令:

influxd_ctl remove-meta node3:8091

【InfluxDB集群 -- 移除MetaNode源码分析】整体流程如下:
  • node1收到CLI命令,向自己8084端口发送GET /remove-meta的请求,request body: {"metahttp:": "node3:8091"};
  • node1向node3发送GET http://node3:8091/ping,探测node3是否存活;
  • node1执行metaclient.DeleteMetaNode():Raft中删除该节点信息;
  • node向node3发送GET http://node3:8091/remove-meta,node3做一些清理操作;
InfluxDB集群 -- 移除MetaNode源码分析
文章图片

CLI命令处理 命令行代码入口:
// cmd/influxd_ctl/cli/cli.go func (c *CommandLine) Run() error { .... switch cmd { case "remove-meta": return do_remove_meta(c) } }

向自己的8084发送GET /remove-meta:
func do_remove_meta(c *CommandLine) error { // flag读命令行的参数 fs := flag.NewFlagSet("", flag.ExitOnError) o := RemoveMetaOptions{} fs.BoolVar(&o.Force, "force", false, "force remove meta node") fs.Parse(c.CMD[1:]) httpAddr := fs.Args()[len(fs.Args())-1] force := "false" if o.Force { force = "true" } // 向自身的8084发送remote-meta,body中带要删除节点的httpAddr url := c.getURL("remove-meta", map[string]string{"metahttp": httpAddr, "force": force}) resp, err := http.Get(url) if err != nil { return err } defer resp.Body.Close() ..... }

admin_cluster监听8084端口,负责集群的管理功能,/remove-meta的handler:
// services/admin_cluster/handler.go func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { switch r.Method { case "GET": switch r.URL.Path { case "/remove-meta": h.WrapHandler("remove-meta", h.removeMetaNode).ServeHTTP(w, r) } } }

具体的处理:
  • ping一下node3,看是否存活,若已不存活,则返回错误;
  • metaClient.DeleteMetaNode()删除集群中该节点的信息;
  • 向node3发送/remove-meta;
func (h *handler) removeMetaNode(w http.ResponseWriter, r *http.Request) { ps := r.URL.Query() var metahttp string metahttp = ps["metahttp"][0] var force bool if len(ps["force"]) == 1 && ps["force"][0] == "true" { force = true } metaLive := true // 先ping一下看移除的节点否活着 // ping meta server first url := "http://" + metahttp + "/ping" resp, err := http.Get(url) if err != nil { metaLive = false h.logger.Info("ping meta server failed", zap.Error(err)) } if !force && !metaLive { h.sendResponse(fmt.Errorf("Meta node %s could not be connected", metahttp), w) return } err = func() error { // remove meta node and leave from raft cluster nodeInfo := h.MetaClient.MetaNodeByAddr(metahttp) if nodeInfo == nil { return fmt.Errorf("Meta node %s does not exist in cluster", metahttp) } // 删除该meta节点的元信息(Raft) if err := h.MetaClient.DeleteMetaNode(nodeInfo.ID); err != nil { return err } // 向该节点的8091发送remote-meta // delete directory url := "http://" + metahttp + "/remove-meta" resp, err := http.Get(url) if err != nil { return fmt.Errorf("Removed meta node from raft cluster, but remove meta data dir failed. %v", err) } body, err := ioutil.ReadAll(resp.Body) return nil }() h.sendResponse(err, w) }

node1向node3发送/ping node1发送GET http://node3:8091/ping;
nod3的处理:
//services/httpd/handler.go // servePing returns a simple response to let the client know the server is running. func (h *Handler) servePing(w http.ResponseWriter, r *http.Request) { atomic.AddInt64(&h.stats.PingRequests, 1) h.writeHeader(w, http.StatusNoContent) }

node1删除raft中该节点信息 通过metaClient.DeleteMetaNode()删除集群中该节点的信息;
将删除节点命令封装成1个Command_DeleteMetaNodeCommand,然后retryUtilExec()执行:
// services/meta/client.go func (c *Client) DeleteMetaNode(id uint64) error { cmd := &internal.DeleteMetaNodeCommand{ ID: proto.Uint64(id), } return c.retryUntilExec(internal.Command_DeleteMetaNodeCommand, internal.E_DeleteMetaNodeCommand_Command, cmd) }

retryUtilExec()函数执行逻辑
  • 向metaServer发送POST /execute执行Command;
  • 若该metaServer是Follower,则将command转发给Leader;
  • 上述过程重试10次,直到成功;
Leader处理POST /execute请求,处理DeleteMetaNodeCommand:
// services/meta/handler.go func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { switch r.Method { case "POST": h.WrapHandler("execute", h.serveExec).ServeHTTP(w, r) } } func (h *handler) serveExec(w http.ResponseWriter, r *http.Request) { var resp *internal.Response if err := h.store.apply(body); err != nil { resp = &internal.Response{ OK:proto.Bool(false), Error: proto.String(err.Error()), } } else { resp = &internal.Response{ OK:proto.Bool(false), Index: proto.Uint64(h.store.index()), } } b, err := proto.Marshal(resp) w.Header().Add("Content-Type", "application/octet-stream") w.Write(b) }

数据更新通过store.apply()执行:
func (fsm *storeFSM) Apply(l *raft.Log) interface{} { var cmd internal.Command if err := proto.Unmarshal(l.Data, &cmd); err != nil { panic(fmt.Errorf("cannot marshal command: %x", l.Data)) } err := func() interface{} { switch cmd.GetType() { case internal.Command_DeleteMetaNodeCommand: return fsm.applyDeleteMetaNodeCommand(&cmd, s) } }() ... }

具体到fsm.applyDeleteMetaNodeComamnd():更新fsm中metaNodes的信息:
func (fsm *storeFSM) applyDeleteMetaNodeCommand(cmd *internal.Command, s *store) interface{} { ext, _ := proto.GetExtension(cmd, internal.E_DeleteMetaNodeCommand_Command) v := ext.(*internal.DeleteMetaNodeCommand) //更新other other := fsm.data.Clone() node := other.MetaNode(v.GetID()) if node == nil { return ErrNodeNotFound } //node离开 if err := s.leave(node); err != nil && err != raft.ErrNotLeader { return err } if err := other.DeleteMetaNode(v.GetID()); err != nil { return err } fsm.data = https://www.it610.com/article/other return nil }

node1向node3发送/remove-meta node3接收/remove-meta做一些资源清理操作:
// services/meta/handler.go func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { switch r.Method { case "GET": switch r.URL.Path { case "/remove-meta": h.WrapHandler("remove-meta", h.serveRemoveMeta).ServeHTTP(w, r) } } }

资源清理操作:
  • 删除本机的meta目录;
  • 新建本机的meta目录;
  • 保存本机的节点信息node.json文件;
func (h *handler) serveRemoveMeta(w http.ResponseWriter, r *http.Request) { // admin_cluster have removed meta node from data // remove meta direrr := func() error { // 删除meta目录 // remove contents in meta dir except the node file if err := os.RemoveAll(h.s.config.Dir); err != nil { return fmt.Errorf("remove meta dir failed: %v", err) } // 新建meta目录 if err := os.Mkdir(h.s.config.Dir, 755); err != nil { return fmt.Errorf("create empty meta dir failed: %v", err) } // 保存node.json // node file should be kept. Data node still needs it. if err := h.s.Node.Save(); err != nil { return fmt.Errorf("save node file failed: %v", err) } return nil }() if err != nil { h.httpError(err, w, http.StatusInternalServerError) } else { if _, err := w.Write([]byte("OK")); err != nil { h.logger.Info("Write response error", zap.Error(err)) } } }

    推荐阅读