LQH入职第12天--启动服务

1、启动服务的主要代码

func StartServer() { lis, _ := net.Listen("tcp", "127.0.0.1:8090") //创建一个grpc服务器对象 gRpcServer := grpc.NewServer() pb.RegisterHelloServiceServer(gRpcServer, &impl.HelloServiceServer{}) //开启服务端 gRpcServer.Serve(lis) }

2、创建服务对象grpc.NewServer() (1)NewServer的内部机构
创建服务对象主要做一下几件事:
(1)接受入参参数切片,并把默认参数全部赋值到入参上 (2)构造 Server 实例
(3)判断是否 tracing (链路跟踪),IsOn(返回 channelz 数据收集是否打开)
(4)返回 server 实例
NewServer 源码如下:
// NewServer creates a gRPC server which has no service registered and has not started to accept requests yet. //NewServer创建一个grpc服务器,该服务器没有注册服务,还不能开始接收请求 func NewServer(opt ...ServerOption) *Server { opts := defaultServerOptions for _, o := range opt { o.apply(&opts) } s := &Server{ lis:make(map[net.Listener]bool), opts:opts, conns:make(map[transport.ServerTransport]bool), m:make(map[string]*service), quit:grpcsync.NewEvent(),//退出事件 done:grpcsync.NewEvent(),//完成事件 czData: new(channelzData), } s.cv = sync.NewCond(&s.mu) if EnableTracing { _, file, line, _ := runtime.Caller(1) s.events = trace.NewEventLog("grpc.Server")) } if channelz.IsOn() { s.channelzID = channelz.RegisterServer(&channelzServer{s}, "") } return s }

saver传入切片参数(ServerOption 入参)
type serverOptions struct { credscredentials.TransportCredentials//cred证书 codecbaseCodec //序列化和反序列化 cpCompressor //压缩接口 dcDecompressor //解压缩接口 unaryIntUnaryServerInterceptor //一元拦截器 streamIntStreamServerInterceptor //流拦截器 inTapHandletap.ServerInHandle //见下面文档 statsHandlerstats.Handler //见下面文档 maxConcurrentStreamsuint32 //http2中最大的并发流个数 maxReceiveMessageSize int //最大接受消息大小 maxSendMessageSizeint //最大发送消息大小 unknownStreamDesc*StreamDesc keepaliveParamskeepalive.ServerParameters //长连接的server参数 keepalivePolicykeepalive.EnforcementPolicy //初始化stream的Window大小,下限值是64K,上限2^31 initialWindowSizeint32 //初始化conn大小,一个conn会有多个stream,等于上面的值 * 16 ,http2的限 制是大于0,默认一个连接有100个流,超过了就被拒绝 initialConnWindowSize int32 writeBufferSizeint //写缓冲大小 readBufferSizeint //读缓冲大小 connectionTimeouttime.Duration //连接超时时间 maxHeaderListSize*uint32 //最大头部大小 }

NewServer返回的Server结构体
// Server is a gRPC server to serve RPC requests. type Server struct { opts serverOptions //上面介绍的就是 musync.Mutex // guards following lismap[net.Listener]bool //服务端的监听地址 //server transport是所有grpc服务器端传输实现的通用接口。 connsmap[transport.ServerTransport]bool servebool//表示服务是否开启,在Serve()方法中赋值为true drainbool//在调用GracefulStop(优雅的停止服务)方法被赋值为true cv*sync.Cond// 当连接关闭以正常停止时发出信号 mmap[string]*service // service name -> service info events trace.EventLog //跟踪事件日志 quit*grpcsync.Event//同步退出事件 done*grpcsync.Event //同步完成事件 channelzRemoveOncesync.Once serveWGsync.WaitGroup //counts active Serve goroutines for GracefulStop channelzID int64 // channelz unique identification number czData*channelzData //存储一些conn的自增id数据 }

(2)NewServer实战分析
a、拦截器参数:grpc.UnaryInterceptor()是要拦截的信息
grpc.UnaryInterceptor( grpc_middleware.ChainUnaryServer(监控grpc_prometheus.UnaryServerInterceptor, grpc_validator.UnaryServerInterceptor(), ), ),

拦截器的意义
当进行grpc调用的时候,并不希望客户端与服务端建立连接后直接就进入对应的方法体内。比如需要验证签名来确认客户端的身份,再执行相应的方法。这个时候就可以哟拿到Interceptor。
golang grpc的拦截器(Interceptor)为UnaryServerInterceptor,为一个指向函数的指针。
UnaryServerInterceptor在服务端对于一次RPC调用进行拦截。UnaryServerInterceptor是一个函数指针,当客户端进行grpc调用的时候,首先并不执行用户调用的方法,先执行UnaryServerInterceptor所指的函数,随后再进入真正要执行的函数。
例子:
//需要验证签名 var interceptor grpc.UnaryServerInterceptor interceptor = func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { //对req中的签名进行验证 //如果失败直接返回Nil,err=验证签名失败 //handler是客户端原来打算调用的方法,如果验证成功,执行真正的方法 return handler(ctx, req) } //服务端初始化程序 lis, err := net.Listen("tcp", fmt.Sprintf(":%s", port)) if err != nil { panic(err) return } var opts []grpc.ServerOption//grpc为使用的第三方的grpc包 opts = append(opts, grpc.UnaryInterceptor(interceptor)) server := grpc.NewServer(opts...) chatprt.RegisterChatServer()//填入相关参数 server.Serve(lis)

拦截器参数grpc_middleware.ChainUnaryServer
grpc_middleware.ChainUnaryServer()函数用于添加拦截器。
go-grpc-middleware包封装了认证(auth), 日志( logging), 消息(message), 验证(validation), 重试(retries)监控(retries)等拦截器。
grpc.StreamInterceptor中添加流式RPC的拦截器。 grpc.UnaryInterceptor中添加简单RPC的拦截器。

在如下链接中详细添加了认证、日志、消息、验证等到拦截器中:http://www.gzywkj.com/post/13028.html
学习链接:
https://learnku.com/articles/35336
https://blog.csdn.net/loveteda/article/details/89959983
b、建立长链接keepalive
grpc.KeepaliveParams( keepalive.ServerParameters{ MaxConnectionIdle: time.Hour, Time:time.Minute, }, ), grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ PermitWithoutStream: true, }),

grpc.KeepaliveParams()设置长链接的参数
grpc.KeepaliveEnforcementPolicy()设置长链接ping的参数
ServerParameters结构体
type ServerParameters struct { // MaxConnectionIdle 某一个客户端空闲了多少时间,发送一个GoAway(断开请求) MaxConnectionIdle time.Duration // MaxConnectionAge 任何一个链接,链接超过多少时间,发送一个GoAway MaxConnectionAge time.Duration // MaxConnectionAgeGrace 是对MaxConnectionAge的补充,在强制关闭之前,允许挂起多长时间(保持连接多长时间) MaxConnectionAgeGrace time.Duration //Time 如果客户端空闲多少时间就发起ping一下,保持建立连接 Time time.Duration // Timeout 如果客户端已经断开了,等待ping后回应的ack多少时间 Timeout time.Duration }

EnforcementPolicy结构体
// MinTime 一个客户端ping超过多长时间,就断开连接 MinTime time.Duration // PermitWithoutStream 当没有数据流时是否允许ping PermitWithoutStream bool

【LQH入职第12天--启动服务】https://www.cnblogs.com/hatlonely/p/11945203.html
c、消息大小设置
grpc.MaxRecvMsgSize(10*1024*1024), grpc.MaxSendMsgSize(10*1024*1024),

maxReceiveMessageSize int //最大接受消息大小
maxSendMessageSize int //最大发送消息大小
3、注册信号,使程序可信号中断
signalChan := make(chan os.Signal, 1) signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM) go func{ 开协程处理数据 } sign := <-signalChan

4、注册实现类到Server上
func RegisterHelloServiceServer(s *grpc.Server, srv HelloServiceServer) { s.RegisterService(&_HelloService_serviceDesc, srv) }

这个方法的入参有两个,一个是 NewServer 创建的 grpcServer 实例,一个是 HelloService 的实现类,然后调用
grpcServer 的 RegisterService 的方法。
RegisterService 方法如下,registerservice 将服务及其实现注册到 grpc 服务器。它是从 idl (接口描述语言 Interface Description Lanauage)的代码中调用的。这必须是在调用 SERVE 方法之前调用。s.register(sd, ss) 方法最终是吧服务的名称的和服务的描述信息注册到上面 Server 中的 map[string]*service
func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) { ht := reflect.TypeOf(sd.HandlerType).Elem() st := reflect.TypeOf(ss) if !st.Implements(ht) { grpclog.Fatalf("grpc: Server.RegisterService found") } //注册服务 s.register(sd, ss) } //删除了非核心代码 func (s *Server) register(sd *ServiceDesc, ss interface{}) { s.mu.Lock() defer s.mu.Unlock() //构造服务的实例 srv := &service{ server: ss, md: make(map[string]*MethodDesc), sd: make(map[string]*StreamDesc), mdata:sd.Metadata, } //放入方法 for i := range sd.Methods { d := &sd.Methods[i] srv.md[d.MethodName] = d } //放入流 for i := range sd.Streams { d := &sd.Streams[i] srv.sd[d.StreamName] = d } s.m[sd.ServiceName] = srv }

https://learnku.com/articles/35336
5、注册反射服务 (1)反射服务的注册
reflection.Register(s)

Protobuf本身具有反射功能,可以在运行时获取对象的Proto文件。gRPC同样也提供了一个名为reflection的反射包,用于为gRPC服务提供查询。gRPC官方提供了一个C++实现的grpc_cli工具,可以用于查询gRPC列表或调用gRPC方法。但是C++版本的grpc_cli安装比较复杂,我们推荐用纯Go语言实现的grpcurl工具。本节将简要介绍grpcurl工具的用法。
reflection包中只有一个Register函数,用于将grpc.Server注册到反射服务中。reflection包文档给出了简单的使用方法:
import ( "google.golang.org/grpc/reflection" )func main() { s := grpc.NewServer() pb.RegisterYourOwnServer(s, &server{})// Register reflection service on gRPC server. reflection.Register(s)s.Serve(lis) }

如果启动了gprc反射服务,那么就可以通过reflection包提供的反射服务查询gRPC服务或调用gRPC方法。
(2)反射服务的使用
grpcurl是Go语言开源社区开发的工具,需要手工安装:
$ go get github.com/fullstorydev/grpcurl $ go install
github.com/fullstorydev/grpcurl/cmd/grpcurl
获取服务列表:grpcurl ip/域名:端口 list
eg 、grpcurl localhost:1234 list
获取服务的方法列表:grpcurl ip/域名:端口 describe 服务名
eg、
grpcurl -plaintext localhost:1234 describe HelloService.HelloService
学习链接:https://chai2010.gitbooks.io/advanced-go-programming-book/content/ch4-rpc/ch4-08-grpcurl.html
6、启动监控 (1)监控实现
func StartMetric(ctx context.Context, server *grpc.Server, metricPort int) { grpc_prometheus.Register(server) grpc_prometheus.EnableHandlingTimeHistogram() if metricPort != 0 { http.Handle("/metrics", promhttp.Handler()) go http.ListenAndServe(":"+strconv.Itoa(metricPort), nil) } }

grpc_prometheus.Register():注册prometheus监控到服务上
grpc_prometheus.EnableHandlingTimeHistogram():
// 启用Prometheus时间直方图记录,RPC调用的耗时会被记录。Prometheus持有、查询Histogram指标的成本比较高
// 生成的指标都是面向gRPC协议的、通用的,不牵涉Istio的逻辑。指标名以grpc_开头
https://blog.gmem.cc/interaction-between-istio-mixer-and-envoy
(2)http中Handler 和 HandlerFunc的区别
http.Handle(pattern string, handler Handler),http.Handle(pattern string, handler Handler) 接收两个参数,一个是路由匹配的字符串,另外一个是 Handler 类型的值
http.HandleFunc该方法接收两个参数,一个是路由匹配的字符串,另外一个是 func(ResponseWriter, *Request) 类型的函数
例子:
package mainimport ( "fmt" "net/http" )type HelloHandler struct{} func (h HelloHandler) ServeHTTP (w http.ResponseWriter, r *http.Request) { fmt.Fprintf(w, "Hello Handler!") }func hello (w http.ResponseWriter, r *http.Request) { fmt.Fprintf(w, "Hello!") }func main() { server := http.Server{ Addr: "127.0.0.1:8080", } helloHandler := HelloHandler{} http.Handle("/hello1", helloHandler) http.HandleFunc("/hello2", hello) server.ListenAndServe() }

https://www.jianshu.com/p/3b5c4fc0695c
(3)http.ListenAndServe()监听服务端口
ListenAndServe()源码:
func (srv *Server) ListenAndServe() error { addr := srv.Addr if addr == "" { addr = ":http" } ln, err := net.Listen("tcp", addr) if err != nil { return err } return srv.Serve(tcpKeepAliveListener{ln.(*net.TCPListener)}) }

到此大致梳理完了 http.ListenAndServe()的基本逻辑。回到文字开头的代码 http.ListenAndServe(":12345", nil)
1、执行ListenAndServe(":12345",nil)后,会创建一个Server类型数据。
2、通过“:12345” 来初始化 Server的 Addr成员。Handler成员为nil
3、调用Server类型的成员函数ListenAndServe()。并通过未导出的tcpKeepAliveListener
类型做了接口转换。
tcpKeepAliveListener类型即*net.TCPListener,用于超时时间的设定
4、调用Server类型的Serve方法,实现接口的监听 及并发处理请求。
即httpListenAndServe()函数 封装了 底层TCP通信的实现逻辑。
https://blog.csdn.net/natpan/article/details/84027821
(4)golang编写Prometheus Exporter(监控)
例子:
package mainimport ( "log" "net/http" "github.com/prometheus/client_golang/prometheus/promhttp" ) func main() { http.Handle("/metrics", promhttp.Handler()) log.Fatal(http.ListenAndServe(":8080", nil)) }

这个代码中我们仅仅通过http模块指定了一个路径,并将client_golang库中的promhttp.Handler()作为处理函数传递进去后,就可以获取指标信息了,两行代码实现了一个exporter。这里内部其实是使用了一个默认的收集器将通过NewGoCollector采集当前Go运行时相关信息比如go堆栈使用,goroutine的数据等等。 通过访问http://localhost:8080/metrics即可查看详细的指标参数。
学习链接:https://blog.csdn.net/u014029783/article/details/80001251
7、启动协程,运行服务
go func() { if err := s.Serve(lis); err != nil { panic(err) } else { fmt.Println("grpc service start at port (%v)", port) xzap.Sugar(ctx).Infof("grpc server at port (%v)", port) } }()

Server () 方法就正式开始监听客户端的连接,并开启协程处理客户端连接,方法核心步骤如下
(1)加锁,初始化一些参数
(2)defer 处理最后的资源情况
(3)for 循环接受客户端的连接,每一个客户端的连接,开启一个协程处理
server源码如下:
func (s *Server) Serve(lis net.Listener) error { s.mu.Lock() s.serve = true s.serveWG.Add(1) //优雅的停止服务 defer func() { s.serveWG.Done() if s.quit.HasFired() { <-s.done.Done() } }() //包装连接对象,并声明为true,代表有效 ls := &listenSocket{Listener: lis} s.lis[ls] = true s.mu.Unlock() //清理资源 defer func() { s.mu.Lock() if s.lis != nil && s.lis[ls] { ls.Close() delete(s.lis, ls) } s.mu.Unlock() }() //监听客户端连接 for { rawConn, err := lis.Accept() s.serveWG.Add(1) //处理客户端请求 go func() { s.handleRawConn(rawConn) s.serveWG.Done() }() } }

学习链接:https://learnku.com/articles/35336
8、rpc-gateway使用(提供http服务)
ctx := context.Background() ctx, cancel := context.WithCancel(ctx) defer cancel() mux := runtime.NewServeMux( runtime.WithIncomingHeaderMatcher(HttpHeaderMatcher), runtime.WithMarshalerOption(runtime.MIMEWildcard, &runtime.JSONPb{OrigName: true, EmitDefaults: true}), ) opts := []grpc.DialOption{grpc.WithInsecure()} registers := []func(ctx context.Context, mux *runtime.ServeMux, endpoint string, opts []grpc.DialOption) error{ //注册htt链接的断点服务 Register**Endpoint, } for _, r := range registers { err := r(ctx, mux, endPoint, opts) if err != nil { panic(err) } } err2 := http.ListenAndServe(fmt.Sprintf(":%v", configs.FlagGwPort), mux) xzap.Sugar(ctx).Infof("grpc gate way start at port (%v) error (%s)", configs.FlagGwPort, err2)

(1)runtime.NewServeMux()使用
runtime.NewServeMux:返回一个新的ServeMux,它的内部映射是空的;ServeMux是grpc-gateway的一个请求多路复用器。它将http请求与模式匹配,并调用相应的处理程序。
(2)grpc.WithTransportCredentials()使用
grpc.WithTransportCredentials:配置一个连接级别的安全凭据(例:TLS、SSL),返回值为type DialOption
(3)runtime.WithMarshalerOption()使用
runtime.WithMarshalerOption()是json默认封装参数
(4)安全检查(检测头文件是否匹配)
func HttpHeaderMatcher(key string) (string, bool) { key = textproto.CanonicalMIMEHeaderKey(key) httpDataHeaderPrefix := "Api-Auth-" matchKey, matcher := runtime.DefaultHeaderMatcher(key) if !matcher { if strings.HasPrefix(key, httpDataHeaderPrefix) { return key[len(httpDataHeaderPrefix):], true } else { return "", false } } return matchKey, matcher }

textproto.CanonicalMIMEHeaderKey()格式化MIME头
标准化 MIME 头: Accept-Encoding 字符连接的单词首字母大写 ,其余小写func CanonicalMIMEHeaderKey(s string) stringtextproto.CanonicalMIMEHeaderKey("content-type") // Content-Type

??注:消息的头字段就是Content-Type
https://www.cnblogs.com/liuzhiyun/p/9808997.html
runtime.DefaultHeaderMatcher()默认匹配头文件
strings.HasPrefix()函数用来检测字符串是否以指定的前缀开头。
(5)设置链接选项
opts := []grpc.DialOption{grpc.WithInsecure()}
WithInsecure 返回一个 DialOption,并且它最终会通过读取设置的值来禁用安全传输
(6)读取函数函数组,并遍历调用函数
registers := []func(ctx context.Context, mux *runtime.ServeMux, endpoint string, opts []grpc.DialOption) error{ Register**1HandlerFromEndpoint, docUnderstanding.Register**2HandlerFromEndpoint, docUnderstanding.Register**3HandlerFromEndpoint, docUnderstanding.Register**4HandlerFromEndpoint, } for _, r := range registers { err := r(ctx, mux, endPoint, opts) if err != nil { panic(err) } }

??注:这种实现的前提是函数的参数都完全相同。
(7)监听端口
http.ListenAndServe()

9、退出服务,关闭协程 (1)退出服务
s.GracefulStop()
//主要做的工作是断开连接,等所有连接都断开才退出服务。
(2)关闭所有协程
cancel()
学习连接:https://learnku.com/articles/35336

    推荐阅读