[istio源码分析][galley]|[istio源码分析][galley] galley之下游(mcp)

1. 前言

转载请说明原文出处, 尊重他人劳动成果!
源码位置: https://github.com/nicktming/istio
分支: tming-v1.3.6 (基于1.3.6版本)
1. [istio源码分析][galley] galley之上游(source)
2. [istio源码分析][galley] galley之runtime
3. [istio源码分析][galley] galley之下游(mcp)
在上文 [istio源码分析][galley] galley之runtime 中分析了galley整个机制中一个承上启下的组件, 本文将分析该组件的下游部分, 也就是mcp server端会承担此部分, 所有对接的mcp client(比如pilot)将会接收到此信息.
2. server
可以先看看server端是如何初始化的.
// galley/pkg/server/components/processing.go func NewProcessing(a *settings.Args) *Processing { d := snapshot.New(groups.IndexFunction) return &Processing{ args:a, distributor:d, configzTopic: configz.CreateTopic(d), } }

p.distributor 就是snapshot的一个实例(后面会对snapshot分析), 接着看Start()方法.
func (p *Processing) Start() (err error) { // TODO: cleanup... types := p.getMCPTypes() processorCfg := runtime.Config{ DomainSuffix:p.args.DomainSuffix, Mesh:mesh, Schema:types, SynthesizeServiceEntries: p.args.EnableServiceDiscovery, } p.processor = runtime.NewProcessor(src, p.distributor, &processorCfg)grpcOptions := p.getServerGrpcOptions()p.stopCh = make(chan struct{}) var checker source.AuthChecker = server.NewAllowAllChecker() ... grpc.EnableTracing = p.args.EnableGRPCTracing p.grpcServer = grpc.NewServer(grpcOptions...)p.reporter = mcpMetricReporter("galley")options := &source.Options{ Watcher:p.distributor, Reporter:p.reporter, CollectionsOptions: source.CollectionOptionsFromSlice(types.Collections()), ConnRateLimiter:mcprate.NewRateLimiter(time.Second, 100), // TODO(Nino-K): https://github.com/istio/istio/issues/12074 }md := grpcMetadata.MD{ versionMetadataKey: []string{version.Info.Version}, } if err := parseSinkMeta(p.args.SinkMeta, md); err != nil { return err } ... serverOptions := &source.ServerOptions{ AuthChecker: checker, RateLimiter: rate.NewLimiter(rate.Every(time.Second), 100), // TODO(Nino-K): https://github.com/istio/istio/issues/12074 Metadata:md, } p.mcpSource = source.NewServer(options, serverOptions) ... }

关注这几个地方:
1. options中的Watcher就是p.distributor.
2. p.mcpSource = source.NewServer(options, serverOptions) 创建一个mcp server端.
// pkg/mcp/source/server_source.go func NewServer(srcOptions *Options, serverOptions *ServerOptions) *Server { s := &Server{ src:New(srcOptions), authCheck:serverOptions.AuthChecker, rateLimiter: serverOptions.RateLimiter, metadata:serverOptions.Metadata, } return s } // pkg/mcp/source/source.go func New(options *Options) *Source { s := &Source{ watcher:options.Watcher, collections:options.CollectionsOptions, reporter:options.Reporter, requestLimiter: options.ConnRateLimiter, } return s }

可以看到server端的src的对象都是从Processing.options里面来的.
2.1 EstablishResourceStream
// pkg/mcp/source/server_source.go func (s *Server) EstablishResourceStream(stream mcp.ResourceSource_EstablishResourceStreamServer) error { ... err := s.src.ProcessStream(stream) code := status.Code(err) if code == codes.OK || code == codes.Canceled || err == io.EOF { return nil } return err }

主要关注ProcessStream方法
2.2 ProcessStream
1. 通过newConnection为该stream建立连接.
2. 异步接收request, 通过channel(con.requestC传递request进行处理)
3. 循环处理request, 从con.requestC中获得request, 通过processClientRequest方法处理request, 从con.queue读取需要返回给client端的response, 所以可想而知processClientRequest中会组装response放到con.queue中.
func (s *Source) ProcessStream(stream Stream) error { // 为该client建立连接 con := s.newConnection(stream)defer s.closeConnection(con) // 接收request go con.receive()for { select { case <-con.queue.Ready(): collection, item, ok := con.queue.Dequeue() if !ok { break } resp := item.(*WatchResponse) w, ok := con.watches[collection] if !ok { scope.Errorf("unknown collection in dequeued watch response: %v", collection) break // bug? } // the response may have been cleared before we got to it if resp != nil { if err := con.pushServerResponse(w, resp); err != nil { return err } } case req, more := <-con.requestC: // 接收request 可想而知 // receive方法主要是把request放到con.requestC中 if !more { return con.reqError } if con.limiter != nil { if err := con.limiter.Wait(stream.Context()); err != nil { return err }} // 处理request if err := con.processClientRequest(req); err != nil { return err } case <-con.queue.Done(): // queue 关闭 scope.Debugf("MCP: connection %v: stream done", con) return status.Error(codes.Unavailable, "server canceled watch") } } }

先看一下如何建立连接的
// pkg/mcp/source/source.go func (s *Source) newConnection(stream Stream) *connection { peerAddr := "0.0.0.0"peerInfo, ok := peer.FromContext(stream.Context()) if ok { peerAddr = peerInfo.Addr.String() } else { scope.Warnf("No peer info found on the incoming stream.") peerInfo = nil }con := &connection{ stream:stream, peerAddr: peerAddr, requestC: make(chan *mcp.RequestResources), watches:make(map[string]*watch), watcher:s.watcher, id:atomic.AddInt64(&s.nextStreamID, 1), reporter: s.reporter, limiter:s.requestLimiter.Create(), queue:internal.NewUniqueScheduledQueue(len(s.collections)), }// 为每个collection建立watch collections := make([]string, 0, len(s.collections)) for i := range s.collections { collection := s.collections[i] w := &watch{ ackedVersionMap: make(map[string]string), incremental:collection.Incremental, } con.watches[collection.Name] = w collections = append(collections, collection.Name) } ... return con }

可以看到主要是为了给每个collection建立一个watch对象. 看一下接收函数如何实现的.
func (con *connection) receive() { defer close(con.requestC) for { // 接收信息 req, err := con.stream.Recv() if err != nil { if err == io.EOF { scope.Infof("MCP: connection %v: TERMINATED %q", con, err) return } con.reporter.RecordRecvError(err, status.Code(err)) scope.Errorf("MCP: connection %v: TERMINATED with errors: %v", con, err) con.reqError = err return } select { // 写入到channel con.requestC case con.requestC <- req: case <-con.queue.Done(): scope.Debugf("MCP: connection %v: stream done", con) return case <-con.stream.Context().Done(): scope.Debugf("MCP: connection %v: stream done, err=%v", con, con.stream.Context().Err()) return } } }

可以看到从client端接到request后会放入con.requestC这个channel. 所以现在回到ProcessStream方法中看看从con.requestC中收到request会如何操作, 会调用processClientRequest处理request.
2.3 processClientRequest
func (con *connection) processClientRequest(req *mcp.RequestResources) error { if isTriggerResponse(req) { return nil }collection := req.Collectioncon.reporter.RecordRequestSize(collection, con.id, internal.ProtoSize(req)) // 取出watch w, ok := con.watches[collection] if !ok { return status.Errorf(codes.InvalidArgument, "unsupported collection %q", collection) } if req.ResponseNonce == "" || w.pending.GetNonce() == req.ResponseNonce { versionInfo := ""if w.pending == nil { // 发送请求(第一次发送) scope.Infof("MCP: connection %v: inc=%v WATCH for %v", con, req.Incremental, collection) } else { // 发送ACK或者NACK (第二次发送) versionInfo = w.pending.SystemVersionInfo if req.ErrorDetail != nil { scope.Warnf("MCP: connection %v: NACK collection=%v version=%q with nonce=%q error=%#v inc=%v", // nolint: lll con, collection, req.ResponseNonce, versionInfo, req.ErrorDetail, req.Incremental) con.reporter.RecordRequestNack(collection, con.id, codes.Code(req.ErrorDetail.Code)) } else { scope.Infof("MCP: connection %v ACK collection=%v with version=%q nonce=%q inc=%v", con, collection, versionInfo, req.ResponseNonce, req.Incremental) con.reporter.RecordRequestAck(collection, con.id)internal.UpdateResourceVersionTracking(w.ackedVersionMap, w.pending) }// clear the pending request after we finished processing the corresponding response. w.pending = nil }if w.cancel != nil { w.cancel() }sr := &Request{ SinkNode:req.SinkNode, Collection:collection, VersionInfo: versionInfo, incremental: req.Incremental, } // con.watcher = snapshot // snapshot的Watcher方法中会组装response 并调用queueResponse方法将response入队列 w.cancel = con.watcher.Watch(sr, con.queueResponse, con.peerAddr) } else { ... } return nil }func (con *connection) queueResponse(resp *WatchResponse) { if resp == nil { con.queue.Close() } else { con.queue.Enqueue(resp.Collection, resp) } }

关于mcp可以参考 https://github.com/istio/api/tree/master/mcp, 这里用此图可以增加理解
[istio源码分析][galley]|[istio源码分析][galley] galley之下游(mcp)
文章图片
mcp.png
从以上图片和processClientRequest可以知道:
1. 第一次从client端发送request, 以后的内容都会是从serverpushclient端.
这里先分析从client发送request然后server返回response最后client发送ACK的过程. 然后再分析server是如何主动push信息到client端并且client端返回ACK.
2. 更新该collection对应的watch对象中的cancel方法.
3. 关注w.pending变量的作用.
4. response信息在snapshot中组装后放入到了con.queue中.
可以看一下snapshotwatch方法
func (c *Cache) Watch(request *source.Request, pushResponse source.PushResponseFunc, peerAddr string) source.CancelWatchFunc { // nolint: lll group := c.groupIndex(request.Collection, request.SinkNode) c.mu.Lock() defer c.mu.Unlock() // 更新status info := c.fillStatus(group, request, peerAddr) collection := request.Collection // return an immediate response if a snapshot is available and the // requested version doesn't match. // 这个snapshots会在setSnapshot方法中更新 if snapshot, ok := c.snapshots[group]; ok {version := snapshot.Version(request.Collection) scope.Debugf("Found snapshot for group: %q for %v @ version: %q", group, request.Collection, version)if version != request.VersionInfo { scope.Debugf("Responding to group %q snapshot:\n%v\n", group, snapshot) response := &source.WatchResponse{ Collection: request.Collection, Version:version, Resources:snapshot.Resources(request.Collection), Request:request, } // 放入到con.queue中 pushResponse(response) return nil } info.synced[request.Collection][peerAddr] = true } c.watchCount++ watchID := c.watchCount ... info.mu.Lock() // 更新watches info.watches[watchID] = &responseWatch{request: request, pushResponse: pushResponse} info.mu.Unlock() ... return cancel }

1. 如果version不同的时候会通过pushResponse方法放入到con.queue中将response发送给client端.
2. 如果snapshots中没有或者version没有更新, 则会更新info.watches, 在setSnapshot方法中server端会pushclient端.
2.4 pushServerResponse
func (con *connection) pushServerResponse(w *watch, resp *WatchResponse) error { ... if incremental { added, removed = calculateDelta(resp.Resources, w.ackedVersionMap) } else { // resp.Resources就是snapshot快照里面的内容 for _, resource := range resp.Resources { added = append(added, *resource) } } msg := &mcp.Resources{ SystemVersionInfo: resp.Version, Collection:resp.Collection, Resources:added, RemovedResources:removed, Incremental:incremental, } // increment nonce con.streamNonce++ msg.Nonce = strconv.FormatInt(con.streamNonce, 10) if err := con.stream.Send(msg); err != nil { con.reporter.RecordSendError(err, status.Code(err)) return err } scope.Debugf("MCP: connection %v: SEND collection=%v version=%v nonce=%v inc=%v", con, resp.Collection, resp.Version, msg.Nonce, msg.Incremental) // 在向client端发送成功后设置w.pending // 当client端发送ACK/NACK的时候用于验证 w.pending = msg return nil }

1. 将response组装成mcp.Resources发送给client端.
2. 在向client端发送成功后设置w.pending, 当client端发送ACK/NACK的时候server端会在processClientRequest方法用于判断.
如果是ACK,会调用UpdateResourceVersionTracking(w.ackedVersionMap, w.pending)方法更新w.ackedVersionMap, w.ackedVersionMap 记录着client端目前保存的内容.
2.5 总结
现在来整体说一下整个流程.
[istio源码分析][galley]|[istio源码分析][galley] galley之下游(mcp)
文章图片
mcp.png
【[istio源码分析][galley]|[istio源码分析][galley] galley之下游(mcp)】1. 第一次由client端发送request.
2. 然后server端发送数据给client端.
3. 然后client端向server端发送ACK/NACK, server端根据反馈情况做处理. 比如反馈ACK时会更新w.ackedVersionMap.
接着server端会主动给client端发送数据, 那何时发数据呢?这个时候就与 [istio源码分析][galley] galley之runtime 中分析的有关, 从 [istio源码分析][galley] galley之runtime 中知道上游source把数据以事件形式交由runtime处理后交给p.distributor处理, 从Start()方法中知道p.distributor就是snapshot.
func NewProcessing(a *settings.Args) *Processing { d := snapshot.New(groups.IndexFunction) return &Processing{ args:a, distributor:d, configzTopic: configz.CreateTopic(d), } } func (p *Processing) Start() (err error) { ... p.processor = runtime.NewProcessor(src, p.distributor, &processorCfg) ... return nil }

p.distributor会通过SetSnapshot
// galley/pkg/runtime/processor.go func (p *Processor) Start() error { case <-p.stateStrategy.Publish: scope.Debug("Processor.process: publish") // 将当前state对象内存中保存的对象建立一个快照 s := p.state.buildSnapshot() // 该快照将交由distributor处理 p.distributor.SetSnapshot(groups.Default, s) } } // pkg/mcp/snapshot/snapshot.go func (c *Cache) SetSnapshot(group string, snapshot Snapshot) { c.mu.Lock() defer c.mu.Unlock()// update the existing entry c.snapshots[group] = snapshot// trigger existing watches for which version changed if info, ok := c.status[group]; ok { info.mu.Lock() defer info.mu.Unlock() // 遍历所有的watches for id, watch := range info.watches { version := snapshot.Version(watch.request.Collection) if version != watch.request.VersionInfo { scope.Infof("SetSnapshot(): respond to watch %d for %v @ version %q", id, watch.request.Collection, version)response := &source.WatchResponse{ Collection: watch.request.Collection, Version:version, Resources:snapshot.Resources(watch.request.Collection), Request:watch.request, } // 调用push方法 // 将response放入到con.queue中 发送给client端 watch.pushResponse(response)// discard the responseWatch delete(info.watches, id)scope.Debugf("SetSnapshot(): watch %d for %v @ version %q complete", id, watch.request.Collection, version) } } } }

info.watches是如何产生的呢?在snapshotwatch方法中会更新info.watch, 每次client发送ACK/NACK的时候都会更新info.watch. 所以当上游有事件产生的时候都会触发SetSnapshot进而向clientpush信息.
3. 总结
1. istio 1.3.6源码
2. https://cloud.tencent.com/developer/article/1409159

    推荐阅读