1、启动服务的主要代码
func StartServer() {
lis, _ := net.Listen("tcp", "127.0.0.1:8090")
//创建一个grpc服务器对象
gRpcServer := grpc.NewServer()
pb.RegisterHelloServiceServer(gRpcServer, &impl.HelloServiceServer{})
//开启服务端
gRpcServer.Serve(lis)
}
2、创建服务对象grpc.NewServer() (1)NewServer的内部机构
创建服务对象主要做一下几件事:
(1)接受入参参数切片,并把默认参数全部赋值到入参上 (2)构造 Server 实例NewServer 源码如下:
(3)判断是否 tracing (链路跟踪),IsOn(返回 channelz 数据收集是否打开)
(4)返回 server 实例
// NewServer creates a gRPC server which has no service registered and has not started to accept requests yet.
//NewServer创建一个grpc服务器,该服务器没有注册服务,还不能开始接收请求
func NewServer(opt ...ServerOption) *Server {
opts := defaultServerOptions
for _, o := range opt {
o.apply(&opts)
}
s := &Server{
lis:make(map[net.Listener]bool),
opts:opts,
conns:make(map[transport.ServerTransport]bool),
m:make(map[string]*service),
quit:grpcsync.NewEvent(),//退出事件
done:grpcsync.NewEvent(),//完成事件
czData: new(channelzData),
}
s.cv = sync.NewCond(&s.mu)
if EnableTracing {
_, file, line, _ := runtime.Caller(1)
s.events = trace.NewEventLog("grpc.Server"))
}
if channelz.IsOn() {
s.channelzID = channelz.RegisterServer(&channelzServer{s}, "")
}
return s
}
saver传入切片参数(ServerOption 入参)
type serverOptions struct {
credscredentials.TransportCredentials//cred证书
codecbaseCodec //序列化和反序列化
cpCompressor //压缩接口
dcDecompressor //解压缩接口
unaryIntUnaryServerInterceptor //一元拦截器
streamIntStreamServerInterceptor //流拦截器
inTapHandletap.ServerInHandle //见下面文档
statsHandlerstats.Handler //见下面文档
maxConcurrentStreamsuint32 //http2中最大的并发流个数
maxReceiveMessageSize int //最大接受消息大小
maxSendMessageSizeint //最大发送消息大小
unknownStreamDesc*StreamDesc
keepaliveParamskeepalive.ServerParameters //长连接的server参数
keepalivePolicykeepalive.EnforcementPolicy
//初始化stream的Window大小,下限值是64K,上限2^31
initialWindowSizeint32
//初始化conn大小,一个conn会有多个stream,等于上面的值 * 16 ,http2的限 制是大于0,默认一个连接有100个流,超过了就被拒绝
initialConnWindowSize int32
writeBufferSizeint //写缓冲大小
readBufferSizeint //读缓冲大小
connectionTimeouttime.Duration //连接超时时间
maxHeaderListSize*uint32 //最大头部大小
}
NewServer返回的Server结构体
// Server is a gRPC server to serve RPC requests.
type Server struct {
opts serverOptions //上面介绍的就是
musync.Mutex // guards following
lismap[net.Listener]bool //服务端的监听地址
//server transport是所有grpc服务器端传输实现的通用接口。
connsmap[transport.ServerTransport]bool
servebool//表示服务是否开启,在Serve()方法中赋值为true
drainbool//在调用GracefulStop(优雅的停止服务)方法被赋值为true
cv*sync.Cond// 当连接关闭以正常停止时发出信号
mmap[string]*service // service name -> service info
events trace.EventLog //跟踪事件日志
quit*grpcsync.Event//同步退出事件
done*grpcsync.Event //同步完成事件
channelzRemoveOncesync.Once
serveWGsync.WaitGroup //counts active Serve goroutines for GracefulStop
channelzID int64 // channelz unique identification number
czData*channelzData //存储一些conn的自增id数据
}
(2)NewServer实战分析
a、拦截器参数:grpc.UnaryInterceptor()是要拦截的信息
grpc.UnaryInterceptor(
grpc_middleware.ChainUnaryServer(监控grpc_prometheus.UnaryServerInterceptor,
grpc_validator.UnaryServerInterceptor(),
),
),
拦截器的意义
当进行grpc调用的时候,并不希望客户端与服务端建立连接后直接就进入对应的方法体内。比如需要例子:验证签名
来确认客户端的身份,再执行相应的方法。这个时候就可以哟拿到Interceptor。
golang grpc的拦截器(Interceptor)为UnaryServerInterceptor,为一个指向函数的指针。
UnaryServerInterceptor在服务端对于一次RPC调用进行拦截。UnaryServerInterceptor是一个函数指针,当客户端进行grpc调用的时候,首先并不执行用户调用的方法,先执行UnaryServerInterceptor所指的函数,随后再进入真正要执行的函数。
//需要验证签名
var interceptor grpc.UnaryServerInterceptor
interceptor = func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
//对req中的签名进行验证
//如果失败直接返回Nil,err=验证签名失败
//handler是客户端原来打算调用的方法,如果验证成功,执行真正的方法
return handler(ctx, req)
}
//服务端初始化程序
lis, err := net.Listen("tcp", fmt.Sprintf(":%s", port))
if err != nil {
panic(err)
return
}
var opts []grpc.ServerOption//grpc为使用的第三方的grpc包
opts = append(opts, grpc.UnaryInterceptor(interceptor))
server := grpc.NewServer(opts...)
chatprt.RegisterChatServer()//填入相关参数
server.Serve(lis)
拦截器参数grpc_middleware.ChainUnaryServer
grpc_middleware.ChainUnaryServer()函数用于添加拦截器。
go-grpc-middleware包
封装了认证(auth)
, 日志( logging)
, 消息(message)
, 验证(validation)
, 重试(retries)
和监控(retries)
等拦截器。grpc.StreamInterceptor中添加流式RPC的拦截器。
grpc.UnaryInterceptor中添加简单RPC的拦截器。
在如下链接中详细添加了认证、日志、消息、验证等到拦截器中:http://www.gzywkj.com/post/13028.html
学习链接:
https://learnku.com/articles/35336
https://blog.csdn.net/loveteda/article/details/89959983
b、建立长链接keepalive
grpc.KeepaliveParams(
keepalive.ServerParameters{
MaxConnectionIdle: time.Hour,
Time:time.Minute,
},
),
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
PermitWithoutStream: true,
}),
grpc.KeepaliveParams()设置长链接的参数
grpc.KeepaliveEnforcementPolicy()设置长链接ping的参数
ServerParameters结构体
type ServerParameters struct {
// MaxConnectionIdle 某一个客户端空闲了多少时间,发送一个GoAway(断开请求)
MaxConnectionIdle time.Duration
// MaxConnectionAge 任何一个链接,链接超过多少时间,发送一个GoAway
MaxConnectionAge time.Duration
// MaxConnectionAgeGrace 是对MaxConnectionAge的补充,在强制关闭之前,允许挂起多长时间(保持连接多长时间)
MaxConnectionAgeGrace time.Duration
//Time 如果客户端空闲多少时间就发起ping一下,保持建立连接
Time time.Duration
// Timeout 如果客户端已经断开了,等待ping后回应的ack多少时间
Timeout time.Duration
}
EnforcementPolicy结构体
// MinTime 一个客户端ping超过多长时间,就断开连接
MinTime time.Duration
// PermitWithoutStream 当没有数据流时是否允许ping
PermitWithoutStream bool
【LQH入职第12天--启动服务】https://www.cnblogs.com/hatlonely/p/11945203.html
c、消息大小设置
grpc.MaxRecvMsgSize(10*1024*1024),
grpc.MaxSendMsgSize(10*1024*1024),
maxReceiveMessageSize int //最大接受消息大小3、注册信号,使程序可信号中断
maxSendMessageSize int //最大发送消息大小
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
go func{
开协程处理数据
}
sign := <-signalChan
4、注册实现类到Server上
func RegisterHelloServiceServer(s *grpc.Server, srv HelloServiceServer) {
s.RegisterService(&_HelloService_serviceDesc, srv)
}
这个方法的入参有两个,一个是 NewServer 创建的 grpcServer 实例,一个是 HelloService 的实现类,然后调用RegisterService 方法如下,registerservice 将服务及其实现注册到 grpc 服务器。它是从 idl (接口描述语言 Interface Description Lanauage)的代码中调用的。这必须是在调用 SERVE 方法之前调用。
grpcServer 的 RegisterService 的方法。
s.register(sd, ss)
方法最终是吧服务的名称的和服务的描述信息注册到上面 Server 中的 map[string]*service
中func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
ht := reflect.TypeOf(sd.HandlerType).Elem()
st := reflect.TypeOf(ss)
if !st.Implements(ht) {
grpclog.Fatalf("grpc: Server.RegisterService found")
}
//注册服务
s.register(sd, ss)
}
//删除了非核心代码
func (s *Server) register(sd *ServiceDesc, ss interface{}) {
s.mu.Lock()
defer s.mu.Unlock()
//构造服务的实例
srv := &service{
server: ss, md: make(map[string]*MethodDesc),
sd: make(map[string]*StreamDesc), mdata:sd.Metadata,
}
//放入方法
for i := range sd.Methods {
d := &sd.Methods[i]
srv.md[d.MethodName] = d
}
//放入流
for i := range sd.Streams {
d := &sd.Streams[i]
srv.sd[d.StreamName] = d
}
s.m[sd.ServiceName] = srv
}
https://learnku.com/articles/35336
5、注册反射服务 (1)反射服务的注册
reflection.Register(s)
reflection包中只有一个Register函数,用于将grpc.Server注册到反射服务中。reflection包文档给出了简单的使用方法:Protobuf本身
具有反射功能,可以在运行时获取对象的Proto文件。gRPC同样也提供了一个名为reflection的反射包
,用于为gRPC服务提供查询
。gRPC官方提供了一个C++实现的grpc_cli工具,可以用于查询gRPC列表或调用gRPC方法。但是C++版本的grpc_cli安装比较复杂,我们推荐用纯Go语言实现的grpcurl工具。本节将简要介绍grpcurl工具的用法。
import (
"google.golang.org/grpc/reflection"
)func main() {
s := grpc.NewServer()
pb.RegisterYourOwnServer(s, &server{})// Register reflection service on gRPC server.
reflection.Register(s)s.Serve(lis)
}
如果启动了gprc反射服务,那么就可以通过reflection包提供的反射服务查询gRPC服务或调用gRPC方法。
(2)反射服务的使用
grpcurl是Go语言开源社区开发的工具,需要手工安装:
$ go get github.com/fullstorydev/grpcurl $ go install获取服务列表:grpcurl ip/域名:端口 list
github.com/fullstorydev/grpcurl/cmd/grpcurl
eg 、grpcurl localhost:1234 list
获取服务的方法列表:grpcurl ip/域名:端口 describe 服务名
eg、
grpcurl -plaintext localhost:1234 describe HelloService.HelloService
学习链接:https://chai2010.gitbooks.io/advanced-go-programming-book/content/ch4-rpc/ch4-08-grpcurl.html
6、启动监控 (1)监控实现
func StartMetric(ctx context.Context, server *grpc.Server, metricPort int) {
grpc_prometheus.Register(server)
grpc_prometheus.EnableHandlingTimeHistogram()
if metricPort != 0 {
http.Handle("/metrics", promhttp.Handler())
go http.ListenAndServe(":"+strconv.Itoa(metricPort), nil)
}
}
grpc_prometheus.Register():注册prometheus监控到服务上
grpc_prometheus.EnableHandlingTimeHistogram():
// 启用Prometheus时间直方图记录,RPC调用的耗时会被记录。Prometheus持有、查询Histogram指标的成本比较高
// 生成的指标都是面向gRPC协议的、通用的,不牵涉Istio的逻辑。指标名以grpc_开头
https://blog.gmem.cc/interaction-between-istio-mixer-and-envoy
(2)http中Handler 和 HandlerFunc的区别
http.Handle(pattern string, handler Handler)
,http.Handle(pattern string, handler Handler) 接收两个参数,一个是路由匹配的字符串
,另外一个是 Handler 类型的值
。http.HandleFunc
该方法接收两个参数,一个是路由匹配的字符串
,另外一个是 func(ResponseWriter, *Request) 类型的函数
。例子:
package mainimport (
"fmt"
"net/http"
)type HelloHandler struct{}
func (h HelloHandler) ServeHTTP (w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "Hello Handler!")
}func hello (w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "Hello!")
}func main() {
server := http.Server{
Addr: "127.0.0.1:8080",
}
helloHandler := HelloHandler{}
http.Handle("/hello1", helloHandler)
http.HandleFunc("/hello2", hello)
server.ListenAndServe()
}
https://www.jianshu.com/p/3b5c4fc0695c
(3)http.ListenAndServe()监听服务端口
ListenAndServe()源码:
func (srv *Server) ListenAndServe() error {
addr := srv.Addr
if addr == "" {
addr = ":http"
}
ln, err := net.Listen("tcp", addr)
if err != nil {
return err
}
return srv.Serve(tcpKeepAliveListener{ln.(*net.TCPListener)})
}
到此大致梳理完了 http.ListenAndServe()的基本逻辑。回到文字开头的代码 http.ListenAndServe(":12345", nil)https://blog.csdn.net/natpan/article/details/84027821
1、执行ListenAndServe(":12345",nil)后,会创建一个Server类型数据。
2、通过“:12345” 来初始化 Server的 Addr成员。Handler成员为nil
3、调用Server类型的成员函数ListenAndServe()。并通过未导出的tcpKeepAliveListener
类型做了接口转换。
tcpKeepAliveListener类型即*net.TCPListener,用于超时时间的设定
4、调用Server类型的Serve方法,实现接口的监听 及并发处理请求。
即httpListenAndServe()函数 封装了 底层TCP通信的实现逻辑。
(4)golang编写Prometheus Exporter(监控)
例子:
package mainimport (
"log"
"net/http"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
func main() {
http.Handle("/metrics", promhttp.Handler())
log.Fatal(http.ListenAndServe(":8080", nil))
}
这个代码中我们仅仅通过http模块指定了一个路径,并将client_golang库中的
promhttp.Handler()
作为处理函数传递进去后,就可以获取指标信息了,两行代码实现了一个exporter。这里内部其实是使用了一个默认的收集器
将通过NewGoCollector采集
当前Go运行时
的相关信息
比如go堆栈使用
,goroutine的数据
等等。 通过访问http://localhost:8080/metrics即可查看详细的指标参数。学习链接:https://blog.csdn.net/u014029783/article/details/80001251
7、启动协程,运行服务
go func() {
if err := s.Serve(lis);
err != nil {
panic(err)
} else {
fmt.Println("grpc service start at port (%v)", port)
xzap.Sugar(ctx).Infof("grpc server at port (%v)", port)
}
}()
Server () 方法就正式开始监听客户端的连接,并开启协程处理客户端连接,方法核心步骤如下
(1)加锁,初始化一些参数
(2)defer 处理最后的资源情况
(3)for 循环接受客户端的连接,每一个客户端的连接,开启一个协程处理
server源码如下:
func (s *Server) Serve(lis net.Listener) error {
s.mu.Lock()
s.serve = true
s.serveWG.Add(1)
//优雅的停止服务
defer func() {
s.serveWG.Done()
if s.quit.HasFired() {
<-s.done.Done()
}
}()
//包装连接对象,并声明为true,代表有效
ls := &listenSocket{Listener: lis}
s.lis[ls] = true
s.mu.Unlock()
//清理资源
defer func() {
s.mu.Lock()
if s.lis != nil && s.lis[ls] {
ls.Close()
delete(s.lis, ls)
}
s.mu.Unlock()
}()
//监听客户端连接
for {
rawConn, err := lis.Accept()
s.serveWG.Add(1)
//处理客户端请求
go func() {
s.handleRawConn(rawConn)
s.serveWG.Done()
}()
}
}
学习链接:https://learnku.com/articles/35336
8、rpc-gateway使用(提供http服务)
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
mux := runtime.NewServeMux(
runtime.WithIncomingHeaderMatcher(HttpHeaderMatcher),
runtime.WithMarshalerOption(runtime.MIMEWildcard, &runtime.JSONPb{OrigName: true, EmitDefaults: true}),
)
opts := []grpc.DialOption{grpc.WithInsecure()}
registers := []func(ctx context.Context, mux *runtime.ServeMux, endpoint string, opts []grpc.DialOption) error{
//注册htt链接的断点服务
Register**Endpoint,
}
for _, r := range registers {
err := r(ctx, mux, endPoint, opts)
if err != nil {
panic(err)
}
}
err2 := http.ListenAndServe(fmt.Sprintf(":%v", configs.FlagGwPort), mux)
xzap.Sugar(ctx).Infof("grpc gate way start at port (%v) error (%s)", configs.FlagGwPort, err2)
(1)runtime.NewServeMux()使用
runtime.NewServeMux
:返回一个新的ServeMux,它的内部映射是空的;ServeMux是grpc-gateway的一个请求多路复用器
。它将http请求与模式匹配,并调用相应的处理程序。(2)grpc.WithTransportCredentials()使用
grpc.WithTransportCredentials
:配置一个连接级别的安全
凭据(例:TLS、SSL),返回值为type DialOption(3)runtime.WithMarshalerOption()使用
runtime.WithMarshalerOption()是
json默认封装
参数(4)安全检查(检测头文件是否匹配)
func HttpHeaderMatcher(key string) (string, bool) {
key = textproto.CanonicalMIMEHeaderKey(key)
httpDataHeaderPrefix := "Api-Auth-"
matchKey, matcher := runtime.DefaultHeaderMatcher(key)
if !matcher {
if strings.HasPrefix(key, httpDataHeaderPrefix) {
return key[len(httpDataHeaderPrefix):], true
} else {
return "", false
} }
return matchKey, matcher
}
textproto.CanonicalMIMEHeaderKey()格式化MIME头
标准化 MIME 头: Accept-Encoding 字符连接的单词首字母大写 ,其余小写func CanonicalMIMEHeaderKey(s string) stringtextproto.CanonicalMIMEHeaderKey("content-type") // Content-Type
??注:消息的头字段就是Content-Type
https://www.cnblogs.com/liuzhiyun/p/9808997.html
runtime.DefaultHeaderMatcher()默认匹配头文件
strings.HasPrefix()函数用来检测字符串是否以指定的前缀开头。
(5)设置链接选项
opts := []grpc.DialOption{grpc.WithInsecure()}
WithInsecure
返回一个 DialOption,并且它最终会通过读取设置的值
来禁用安全传输(6)读取函数函数组,并遍历调用函数
registers := []func(ctx context.Context, mux *runtime.ServeMux, endpoint string, opts []grpc.DialOption) error{
Register**1HandlerFromEndpoint,
docUnderstanding.Register**2HandlerFromEndpoint,
docUnderstanding.Register**3HandlerFromEndpoint,
docUnderstanding.Register**4HandlerFromEndpoint,
}
for _, r := range registers {
err := r(ctx, mux, endPoint, opts)
if err != nil {
panic(err)
}
}
??注:这种实现的前提是函数的参数都完全相同。
(7)监听端口
http.ListenAndServe()
9、退出服务,关闭协程 (1)退出服务
s.GracefulStop()
//主要做的工作是断开连接,等所有连接都断开才退出服务。
(2)关闭所有协程
cancel()
学习连接:https://learnku.com/articles/35336