Apache Dubbo 有injvm方式的通信,能够避免网络带来的延迟,同时也不占用本地端口,对测试、本地验证而言,是一种比较方便的RPC通信方式。
最近看到 containerd 的代码,发现它也有类似的需求。
但使用ip端口通信,有可能会有端口冲突;使用unix socket,可能会有路径冲突。
考察了下gRPC有没有和injvm类似的,基于内存的通信方式。后来发现pipe非常好用,所以记录了下。
Golang/gRPC对网络的抽象
首先,我们先看一下gRPC一次调用的架构图。当然,这个架构图目前只关注了网络抽象分布。
文章图片
我们重点关注网络部分。
操作系统系统抽象
首先,在网络包之上,系统抽象出来了socket,代表一条虚拟连接,对于UDP,这个虚拟连接是不可靠的,对于TCP,这个链接是尽力可靠的。
对于网络编程而言,仅仅有连接是不够的,还需要告诉开发者如何创建、关闭连接。
对于服务端,系统提供了accept
方法,用来接收连接。
对于客户端,系统提供了connect
方法,用于和服务端建立连接。
Golang抽象
在Golang中,socket对等的概念叫net.Conn
,代表了一条虚拟连接。
接下来,对于服务端,accept这个行为被包装成了net.Listener
接口;对于客户端,Golang则基于connect提供了net.Dial
方法。
type Listener interface {
// 接收来自客户端的网络连接
Accept() (Conn, error)
Close() error
Addr() Addr
}
gRPC使用 那么gRPC是怎么使用Listener和Dial的呢?
对于gRPC服务端,
Serve
方法接收一个Listener,表示在这个Listener上提供服务。对于gRPC客户端,网络本质上就是一个能够连接到某个地方的东西就可以,所以只需要一个
dialer func(context.Context, string) (net.Conn, error)
函数就行了。什么是pipe 在操作系统层面,
pipe
表示一个数据管道,而这个管道两端都在本程序中,可以很好的满足我们的要求:基于内存的网络通信。Golang也基于pipe提供了
net.Pipe()
函数创建了一个双向的、基于内存通信的管道,在能力上,能够很好的满足gRPC对底层通信的要求。但是
net.Pipe
仅仅产生了两个net.Conn
,即只产生两个网络连接,没有之前提到的Listner,也没有Dial方法。【基于内存通信的gRPC调用】于是结合Golang的channel,把
net.Pipe
包装成了Listner,也提供了Dial方法:Listener.Accept()
,只需要监听一个channel,客户端连接过来的时候,把连接通过channel传递过来即可Dial方法
,调用Pipe,将一端通过channel给服务端(作为服务端连接),另一端作为客户端连接
package mainimport (
"context"
"errors"
"net"
"sync"
"sync/atomic"
)var ErrPipeListenerClosed = errors.New(`pipe listener already closed`)type PipeListener struct {
chchan net.Conn
close chan struct{}
doneuint32
msync.Mutex
}func ListenPipe() *PipeListener {
return &PipeListener{
ch:make(chan net.Conn),
close: make(chan struct{}),
}
}// Accept 等待客户端连接
func (l *PipeListener) Accept() (c net.Conn, e error) {
select {
case c = <-l.ch:
case <-l.close:
e = ErrPipeListenerClosed
}
return
}// Close 关闭 listener.
func (l *PipeListener) Close() (e error) {
if atomic.LoadUint32(&l.done) == 0 {
l.m.Lock()
defer l.m.Unlock()
if l.done == 0 {
defer atomic.StoreUint32(&l.done, 1)
close(l.close)
return
}
}
e = ErrPipeListenerClosed
return
}// Addr 返回 listener 的地址
func (l *PipeListener) Addr() net.Addr {
return pipeAddr(0)
}
func (l *PipeListener) Dial(network, addr string) (net.Conn, error) {
return l.DialContext(context.Background(), network, addr)
}
func (l *PipeListener) DialContext(ctx context.Context, network, addr string) (conn net.Conn, e error) {
// PipeListener是否已经关闭
if atomic.LoadUint32(&l.done) != 0 {
e = ErrPipeListenerClosed
return
}// 创建pipe
c0, c1 := net.Pipe()
// 等待连接传递到服务端接收
select {
case <-ctx.Done():
e = ctx.Err()
case l.ch <- c0:
conn = c1
case <-l.close:
c0.Close()
c1.Close()
e = ErrPipeListenerClosed
}
return
}type pipeAddr intfunc (pipeAddr) Network() string {
return `pipe`
}
func (pipeAddr) String() string {
return `pipe`
}
如何用pipe作为gRPC的connection 有了上面的包装,我们就可以基于此创建一个gRPC的服务器端和客户端,来进行基于内存的RPC通信了。
首先,我们简单的创建一个服务,包含了四种调用方式:
syntax = "proto3";
option go_package = "google.golang.org/grpc/examples/helloworld/helloworld";
option java_multiple_files = true;
option java_package = "io.grpc.examples.helloworld";
option java_outer_classname = "HelloWorldProto";
package helloworld;
// The greeting service definition.
service Greeter {
// unary调用
rpc SayHello(HelloRequest) returns (HelloReply) {}// 服务端流式调用
rpc SayHelloReplyStream(HelloRequest) returns (stream HelloReply);
// 客户端流式调用
rpc SayHelloRequestStream(stream HelloRequest) returns (HelloReply);
// 双向流式调用
rpc SayHelloBiStream(stream HelloRequest) returns (stream HelloReply);
}// The request message containing the user's name.
message HelloRequest {
string name = 1;
}// The response message containing the greetings
message HelloReply {
string message = 1;
}
然后生成相关的stub代码:
protoc --go_out=. --go_opt=paths=source_relative \
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
helloworld/helloworld.proto
然后开始写服务端代码,基本逻辑就是实现一个demo版本的服务端就好:
package mainimport (
"context"
"log""github.com/robberphex/grpc-in-memory/helloworld"
pb "github.com/robberphex/grpc-in-memory/helloworld"
)// helloworld.GreeterServer 的实现
type server struct {
// 为了后面代码兼容,必须聚合UnimplementedGreeterServer
// 这样以后在proto文件中新增加一个方法的时候,这段代码至少不会报错
pb.UnimplementedGreeterServer
}// unary调用的服务端代码
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
log.Printf("Received: %v", in.GetName())
return &pb.HelloReply{Message: "Hello " + in.GetName()}, nil
}// 客户端流式调用的服务端代码
// 接收两个req,然后返回一个resp
func (s *server) SayHelloRequestStream(streamServer pb.Greeter_SayHelloRequestStreamServer) error {
req, err := streamServer.Recv()
if err != nil {
log.Printf("error receiving: %v", err)
return err
}
log.Printf("Received: %v", req.GetName())
req, err = streamServer.Recv()
if err != nil {
log.Printf("error receiving: %v", err)
return err
}
log.Printf("Received: %v", req.GetName())
streamServer.SendAndClose(&pb.HelloReply{Message: "Hello " + req.GetName()})
return nil
}// 服务端流式调用的服务端代码
// 接收一个req,然后发送两个resp
func (s *server) SayHelloReplyStream(req *pb.HelloRequest, streamServer pb.Greeter_SayHelloReplyStreamServer) error {
log.Printf("Received: %v", req.GetName())
err := streamServer.Send(&pb.HelloReply{Message: "Hello " + req.GetName()})
if err != nil {
log.Printf("error Send: %+v", err)
return err
}
err = streamServer.Send(&pb.HelloReply{Message: "Hello " + req.GetName() + "_dup"})
if err != nil {
log.Printf("error Send: %+v", err)
return err
}
return nil
}// 双向流式调用的服务端代码
func (s *server) SayHelloBiStream(streamServer helloworld.Greeter_SayHelloBiStreamServer) error {
req, err := streamServer.Recv()
if err != nil {
log.Printf("error receiving: %+v", err)
// 及时将错误返回给客户端,下同
return err
}
log.Printf("Received: %v", req.GetName())
err = streamServer.Send(&pb.HelloReply{Message: "Hello " + req.GetName()})
if err != nil {
log.Printf("error Send: %+v", err)
return err
}
// 离开这个函数后,streamServer会关闭,所以不推荐在单独的goroute发送消息
return nil
}// 新建一个服务端实现
func NewServerImpl() *server {
return &server{}
}
然后我们创建一个基于pipe连接的客户端来调用服务端。
包含如下几个步骤:
- 创建服务端实现
- 基于pipe创建listener,然后基于它创建gRPC server
- 基于pipe创建客户端连接,然后创建gRPC client,调用服务
package mainimport (
"context"
"fmt"
"log"
"net"pb "github.com/robberphex/grpc-in-memory/helloworld"
"google.golang.org/grpc"
)// 将一个服务实现转化为一个客户端
func serverToClient(svc *server) pb.GreeterClient {
// 创建一个基于pipe的Listener
pipe := ListenPipe()s := grpc.NewServer()
// 注册Greeter服务到gRPC
pb.RegisterGreeterServer(s, svc)
if err := s.Serve(pipe);
err != nil {
log.Fatalf("failed to serve: %v", err)
}
// 客户端指定使用pipe作为网络连接
clientConn, err := grpc.Dial(`pipe`,
grpc.WithInsecure(),
grpc.WithContextDialer(func(c context.Context, s string) (net.Conn, error) {
return pipe.DialContext(c, `pipe`, s)
}),
)
if err != nil {
log.Fatalf("did not connect: %v", err)
}
// 基于pipe连接,创建gRPC客户端
c := pb.NewGreeterClient(clientConn)
return c
}func main() {
svc := NewServerImpl()
c := serverToClient(svc)ctx := context.Background()// unary调用
for i := 0;
i < 5;
i++ {
r, err := c.SayHello(ctx, &pb.HelloRequest{Name: fmt.Sprintf("world_unary_%d", i)})
if err != nil {
log.Fatalf("could not greet: %v", err)
}
log.Printf("Greeting: %s", r.GetMessage())
}// 客户端流式调用
for i := 0;
i < 5;
i++ {
streamClient, err := c.SayHelloRequestStream(ctx)
if err != nil {
log.Fatalf("could not SayHelloRequestStream: %v", err)
}
err = streamClient.Send(&pb.HelloRequest{Name: fmt.Sprintf("SayHelloRequestStream_%d", i)})
if err != nil {
log.Fatalf("could not Send: %v", err)
}
err = streamClient.Send(&pb.HelloRequest{Name: fmt.Sprintf("SayHelloRequestStream_%d_dup", i)})
if err != nil {
log.Fatalf("could not Send: %v", err)
}
reply, err := streamClient.CloseAndRecv()
if err != nil {
log.Fatalf("could not Recv: %v", err)
}
log.Println(reply.GetMessage())
}// 服务端流式调用
for i := 0;
i < 5;
i++ {
streamClient, err := c.SayHelloReplyStream(ctx, &pb.HelloRequest{Name: fmt.Sprintf("SayHelloReplyStream_%d", i)})
if err != nil {
log.Fatalf("could not SayHelloReplyStream: %v", err)
}
reply, err := streamClient.Recv()
if err != nil {
log.Fatalf("could not Recv: %v", err)
}
log.Println(reply.GetMessage())
reply, err = streamClient.Recv()
if err != nil {
log.Fatalf("could not Recv: %v", err)
}
log.Println(reply.GetMessage())
}// 双向流式调用
for i := 0;
i < 5;
i++ {
streamClient, err := c.SayHelloBiStream(ctx)
if err != nil {
log.Fatalf("could not SayHelloStream: %v", err)
}
err = streamClient.Send(&pb.HelloRequest{Name: fmt.Sprintf("world_stream_%d", i)})
if err != nil {
log.Fatalf("could not Send: %v", err)
}
reply, err := streamClient.Recv()
if err != nil {
log.Fatalf("could not Recv: %v", err)
}
log.Println(reply.GetMessage())
}
}
总结 当然,作为基于内存的RPC调用,还可以有更好的方式,比如直接将对象传递到服务端,直接通过本地调用方式来通信。
但这种方式破坏了很多约定,比如对象地址、比如gRPC连接参数不生效等等。
本文介绍的,基于Pipe的通信方式,除了网络层走了内存传递之外,其他都和正常RPC通信行为一致,比如同样经历了序列化、经历了HTTP/2的流控制等。当然,性能上比原生调用也会差一点,但是好在对于测试、验证场景,行为上的一致比较重要些。
本文代码已经托管到了GitHub https://github.com/robberphex...。
本文首发于 https://robberphex.com/grpc-i...。