浪潮云溪分布式数据库协议代码解析(1)

云溪数据库支持 PostgreSQL protocol 3.0,用于客户端与服务端之间的信息通信,应用于连接认证及数据请求阶段。
PostgreSQL 协议的消息通用格式如下图所示,包含 1 字节的消息类型,4 字节的长度(不包括类型的长度),以及消息的内容。由于历史原因,startup 消息不包含类型。
连接认证阶段

  1. 用户使用客户端,通过云溪数据库 sql 命令,尝试连接服务端时,客户端会获取连接命令的参数,生成 URL,具体格式如下
postgres://:@:/?
其中,包含了当前用户的用户名和密码,节点的 IP 地址和端口,连接的数据库库名,以及额外的连接参数。
  1. 客户端会根据 URL,建立与服务端之间的连接,发送一个 startup 消息。
func(c *Connector) open(ctx context.Context)(cn *conn, err error){ ... cn.startup(o) ... }

上述代码,将构建一个 startup 消息,该消息没有消息类型,包含了协议版本号,和连接参数等内容。
  1. 服务端接收解析 startup 消息,获得连接参数。
func(s *Server) ServeConn(ctx context.Context, conn net.Conn)error{ ... var buf pgwirebase.ReadBuffer n, err := buf.ReadUntypedMsg(conn) if err !=nil{ return err } version, err := buf.GetUint32() if err !=nil{ return err } ... // get connection parameters if sArgs, err = parseOptions(ctx, buf.Msg); err !=nil{ return sendErr(err) } ... }

  1. 服务端发送 AuthenticationRequest 消息,要求客户端进一步提供认证信息,以进行用户身份认证。
func authPassword( c AuthConn, tlsState tls.ConnectionState, insecure bool, hashedPassword []byte, validUntil *tree.DTimestamp, encryption string, execCfg *sql.ExecutorConfig, entry *hba.Entry, )(security.UserAuthHook,error){ if err := c.SendAuthRequest(authCleartextPassword,nil); err !=nil{ returnnil,err } // recevice password from client password, err := c.ReadPasswordString() ... }

认证请求消息中,除了消息类型’R’外,还包含认证方式,目前云溪数据库支持证书、口令和 GSSAPI 三种认证方式。证书认证不需要额外的认证信息,认证通过后直接发送 AuthenticationOk 消息,跳过 5、6。
  1. 客户端收到 AuthenticationRequest 消息后,则会发送对应的认证信息,回应此消息,该回应的消息类型为’p’。
func(cn *conn) startup(o values){ ... for{ // recevice responses after sending startup t, r := cn.recv() switch t { case'K': cn.processBackendKeyData(r) case'S': cn.processParameterStatus(r) case'R': cn.auth(r, o) case'Z': cn.processReadyForQuery(r) return default: errorf("unknown response forstartup: %q",t) } } }func(cn *conn) auth(r *readBuf, o values){ switch code := r.int32(); code { case0: // OK case3: w := cn.writeBuf('p') w.string(o["password"]) cn.send(w) ... case7:// GSSAPI, startup ... w := cn.writeBuf('p') w.bytes(token) cn.send(w) ... } }

  1. 服务端收到认证回应后,进行用户的身份认证。
  2. 服务端认证完成后,给客户端发送认证结果。成功,发送 AuthenticationOk(‘R’),authType 为 0;失败,则发送 ErrorResponse(‘E’),连接过程结束。
func(c *conn) handleAuthentication( ctx context.Context, insecure bool, ie *sql.InternalExecutor, auth *hba.Conf, execCfg *sql.ExecutorConfig, )(authErr error){ ... c.msgBuilder.initMsg(pgwirebase.ServerMsgAuth) c.msgBuilder.putInt32(authOK) return c.msgBuilder.finishMsg(c.conn) }

  1. 服务端认证完成后,将发送多条参数信息 ParameterStatus(‘S’),包括 server_version, client_encoding 和 DateStyle 等参数。每个参数,都会发送一条 ParameterStatus 消息。???????
func(c *conn) serveImpl( ctx context.Context, draining func()bool, sqlServer *sql.Server, reserved mon.BoundAccount, stopper *stop.Stopper, )error{ ... sendStatusParam:=func(param, value string)error{ c.msgBuilder.initMsg(pgwirebase.ServerMsgParameterStatus) c.msgBuilder.writeTerminatedString(param) c.msgBuilder.writeTerminatedString(value) return c.msgBuilder.finishMsg(c.conn) } ... for _, param :=range statusReportParams { value := connHandler.GetStatusParam(ctx, param) if err := sendStatusParam(param, value); err !=nil{ return err } } } ... }

  1. 服务端发送 ReadyForQuery(‘Z’),表示一切准备就绪,通知客户端可以发送 SQL 请求了。???????
func(c *conn) serveImpl( ctx context.Context, draining func()bool, sqlServer *sql.Server, reserved mon.BoundAccount, stopper *stop.Stopper, )error{ ... // An initial readyForQuery message is part of the handshake. c.msgBuilder.initMsg(pgwirebase.ServerMsgReady) c.msgBuilder.writeByte(byte(sql.IdleTxnBlock)) if err := c.msgBuilder.finishMsg(c.conn); err !=nil{ return err } ... }

【浪潮云溪分布式数据库协议代码解析(1)】???????
至此,客户端与服务端之间,已经成功建立起连接,用户可以执行后续操作了。

    推荐阅读