gRPC-go 连接管理

gRPC-go 连接管理
(金庆的专栏 2017.12)
【gRPC-go 连接管理】把 example greeter 改一下,处理 SayHello() 请求时,不仅仅返回本次请求者的名字,
还返回上次请求的名字,如:

λ go run greeter_client/main.go 2017/12/25 17:59:13 Greeting: Hello 'world' (prev '') 2017/12/25 17:59:15 Greeting: Hello 'world2' (prev 'world')

先将客户端单次请求改为多次请求:
r, err := c.SayHello(context.Background(), &pb.HelloRequest{Name: name}) log.Printf("Greeting: %s", r.Message) time.Sleep(2 * time.Second)r, err = c.SayHello(context.Background(), &pb.HelloRequest{Name: name + "2"}) log.Printf("Greeting: %s", r.Message) ...

服务器需要为每个连接保存各自的数据。连接创建时初始化数据,连接断开时清理数据。
这里利用了连接统计的接口,不知道是否是最适当的实现方式?
服务器创建时添加 StatsHandler 选项,输入一个 stats.Handler 的实现。
-s := grpc.NewServer() +s := grpc.NewServer(grpc.StatsHandler(&statshandler{}))

statshandler 需实现4个方法,只用到2个连接相关的方法,TagConn() 和 HandleConn(),
另外2个 TagRPC() 和 HandleRPC() 用于RPC统计, 实现为空。
type statshandler struct{}// TagConn 用来给连接打个标签,以此来标识连接(实在是找不出还有什么办法来标识连接). // 这个标签是个指针,可保证每个连接唯一。 // 将该指针添加到上下文中去,键为 connCtxKey{}. func (h *statshandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context { return context.WithValue(ctx, connCtxKey{}, info) }// TagRPC 为空. func (h *statshandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context { return ctx }// HandleConn 会在连接开始和结束时被调用,分别会输入不同的状态. func (h *statshandler) HandleConn(ctx context.Context, s stats.ConnStats) { tag, ok := getConnTagFromContext(ctx) if !ok { log.Fatal("can not get conn tag") }connsMutex.Lock() defer connsMutex.Unlock()switch s.(type) { case *stats.ConnBegin: conns[tag] = "" log.Printf("begin conn, tag = (%p)%#v, now connections = %d\n", tag, tag, len(conns)) case *stats.ConnEnd: delete(conns, tag) log.Printf("end conn, tag = (%p)%#v, now connections = %d\n", tag, tag, len(conns)) default: log.Printf("illegal ConnStats type\n") } }// HandleRPC 为空. func (h *statshandler) HandleRPC(ctx context.Context, s stats.RPCStats) { }

用一个map来管理所有连接,以连接的标签(是个指针)为键,值为上次请求者的名字。
因为有多线程访问,所有加个 Mutex 来保护。
连接结束时,将从 conns 中删除连接相关的数据。
var connsMutex sync.Mutex var conns map[*stats.ConnTagInfo]string = make(map[*stats.ConnTagInfo]string)

getConnTagFromContext() 从上下文中取连接标签:
type connCtxKey struct{}func getConnTagFromContext(ctx context.Context) (*stats.ConnTagInfo, bool) { tag, ok := ctx.Value(connCtxKey{}).(*stats.ConnTagInfo) return tag, ok }

最后将 SayHello() 改为记录请求者名字,并返回上次请求者的名字。
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) { tag, _ := getConnTagFromContext(ctx) log.Printf("SayHello(), conn tag = (%p)%#v\n", tag, tag)connsMutex.Lock() defer connsMutex.Unlock() prev := conns[tag] conns[tag] = in.Namereturn &pb.HelloReply{Message: fmt.Sprintf("Hello '%s' (prev '%s')", in.Name, prev)}, nil }

测试多个客户端连接,可以看到每个客户端有自己的状态,互不影响。
E:\Git\grpc-go\examples\helloworld (master) λ go run greeter_server/main.go 2017/12/25 18:39:03 start 2017/12/25 18:39:11 begin conn, tag = (0xc042182040)&stats.ConnTagInfo{RemoteAddr:(*net.TCPAddr)(0xc0420818f0), LocalAddr:(*net.TCPAddr)(0xc0420818c0)}, now connections = 1 2017/12/25 18:39:11 SayHello(), conn tag = (0xc042182040)&stats.ConnTagInfo{RemoteAddr:(*net.TCPAddr)(0xc0420818f0), LocalAddr:(*net.TCPAddr)(0xc0420818c0)} 2017/12/25 18:39:13 SayHello(), conn tag = (0xc042182040)&stats.ConnTagInfo{RemoteAddr:(*net.TCPAddr)(0xc0420818f0), LocalAddr:(*net.TCPAddr)(0xc0420818c0)} 2017/12/25 18:39:13 begin conn, tag = (0xc0421ae200)&stats.ConnTagInfo{RemoteAddr:(*net.TCPAddr)(0xc0421de060), LocalAddr:(*net.TCPAddr)(0xc0421de030)}, now connections = 2 2017/12/25 18:39:13 SayHello(), conn tag = (0xc0421ae200)&stats.ConnTagInfo{RemoteAddr:(*net.TCPAddr)(0xc0421de060), LocalAddr:(*net.TCPAddr)(0xc0421de030)} 2017/12/25 18:39:15 SayHello(), conn tag = (0xc042182040)&stats.ConnTagInfo{RemoteAddr:(*net.TCPAddr)(0xc0420818f0), LocalAddr:(*net.TCPAddr)(0xc0420818c0)} 2017/12/25 18:39:15 SayHello(), conn tag = (0xc0421ae200)&stats.ConnTagInfo{RemoteAddr:(*net.TCPAddr)(0xc0421de060), LocalAddr:(*net.TCPAddr)(0xc0421de030)} 2017/12/25 18:39:17 SayHello(), conn tag = (0xc042182040)&stats.ConnTagInfo{RemoteAddr:(*net.TCPAddr)(0xc0420818f0), LocalAddr:(*net.TCPAddr)(0xc0420818c0)} 2017/12/25 18:39:17 SayHello(), conn tag = (0xc0421ae200)&stats.ConnTagInfo{RemoteAddr:(*net.TCPAddr)(0xc0421de060), LocalAddr:(*net.TCPAddr)(0xc0421de030)} 2017/12/25 18:39:19 end conn, tag = (0xc042182040)&stats.ConnTagInfo{RemoteAddr:(*net.TCPAddr)(0xc0420818f0), LocalAddr:(*net.TCPAddr)(0xc0420818c0)}, now connections = 1 2017/12/25 18:39:19 SayHello(), conn tag = (0xc0421ae200)&stats.ConnTagInfo{RemoteAddr:(*net.TCPAddr)(0xc0421de060), LocalAddr:(*net.TCPAddr)(0xc0421de030)} 2017/12/25 18:39:21 end conn, tag = (0xc0421ae200)&stats.ConnTagInfo{RemoteAddr:(*net.TCPAddr)(0xc0421de060), LocalAddr:(*net.TCPAddr)(0xc0421de030)}, now connections = 0

    推荐阅读