postgresql|postgresql startup处理
内容简介
本文主要介绍postgresql 的startup处理流程。startup发生在一个client连接server端时,server与client建立连接,并创建对应的backend process。后续就可以进行正常的消息交互了。
PostgreSQL 通信协议包括两个阶段: startup 阶段和常规 normal 阶段。
- startup 阶段,客户端尝试创建连接并发送授权信息,如果一切正常,服务端会反馈状态信息,连接成功创建,随后进入 normal 阶段。 normal 阶段,客户端发送请求至服务端,服务端执行命令并将结果返回给客户端。客户端请求结束后,可以主动发送消息断开连接。
- 【postgresql|postgresql startup处理】normal 阶段,客户端可以通过两种 "子协议" 来发送请求,分别是 simpel query 和 extened query。
- simple query:客户端发送字符串文本请求,后端收到后立即处理并返回结果。
- extened query:发送请求的过程被分为若干步骤,通常包括 Parse,Bind 和 Execute。Extended Query 协议将以上 Simple Query 的处理流程分为若干步骤,每一步都由单独的服务端消息进行确认。该协议可以使用服务端的 perpared-statement 功能,即先发送一条参数化 SQL,服务端收到 SQL(Statement)之后对其进行解析、重写并保存,这里保存的 Statement 也就是所谓 Prepared-statement,可以被复用;执行 SQL 时,直接获取事先保存的 Prepared-statement 生成计划并执行,避免对同类型 SQL 重复解析和重写。
pg版本 12.4
startup流程
文章图片
startup消息类型 startup阶段server端发送给client端的消息类型如下。详细可参照postgresql通信协议
case 'R':/* Authentication Request */
case 'S':/* parameter status */
case 'K':/* secret key data from the backend */
case 'Z':/* backend is ready for new query */
client发给server端的消息类型
case 'p':/* password */
函数调用关系图
文章图片
startup简要分析 参照上面的"startup流程图",client端首先向server端发送startup package。在server端postgres server process处理并创建对应的backend process。详细参考"postgresql启动分析"
startup package结构
文章图片
本文线索 本文会结合一个线索来梳理流程。在startup package中client端可以包含一个application_name,我们会分析这个参数如何在server端生效。
BackendStartup分析 BackendStartup是postmaster accept client端后的入口函数,负责fork backend process。为啥是入口呢,参考"postgresql启动分析"
static int BackendStartup(Port *port)
// 创建backend structure
Backend*bn = (Backend *) malloc(sizeof(Backend));
pid = fork_process();
if (pid == 0)/* child */
{
free(bn);
/* Detangle from postmaster */
InitPostmasterChild();
/* Close the postmaster's sockets */
ClosePostmasterPorts(false);
/* Perform additional initialization and collect startup packet */
BackendInitialize(port);
/* And run the backend */
BackendRun(port);
}
BackendInitialize分析 BackendInitialize负责backend中进一步的初始化,并处理startup package。
注意全局变量MyProcPort赋值为port,后面会使用。
static void BackendInitialize(Port *port)
// MyProcPort全局变量
MyProcPort = port;
// initialize libpq to talk to client
pq_init();
// 设置处理结果的destination
whereToSendOutput = DestRemote;
// timeout初始化
InitializeTimeouts// 从socket中获取client的host,port,后面会存入
// port->remote_host
// port->remote_port
pg_getnameinfo_all(&port->raddr.addr,remote_host,remote_port)// 设置一个timer,用于startup的超时处理
// 参照"startup流程",startup可能会有authentication过程,故设置超时处理
RegisterTimeout(STARTUP_PACKET_TIMEOUT, StartupPacketTimeoutHandler);
enable_timeout_after(STARTUP_PACKET_TIMEOUT, AuthenticationTimeout * 1000);
// 处理startup package
// !!! 下面会单独分析ProcessStartupPacket
ProcessStartupPacket(port, false, false)// 取消timer
disable_timeout(STARTUP_PACKET_TIMEOUT, false);
// share memory处理
check_on_shmem_exit_lists_are_empty// process name设置
initStringInfo(&ps_data)
appendStringInfo(&ps_data, "%s", port->remote_host);
appendStringInfo(&ps_data, "(%s)", port->remote_port);
// 例如ps中显示的backend进程信息: postgres mydb [local] idle
init_ps_display(ps_data.data);
ProcessStartupPacket分析 下面单独分析BackendInitialize中的ProcessStartupPacket。
最终application_name被保存到port->guc_options和port->application_name中。
static int ProcessStartupPacket(Port *port, bool ssl_done, bool gss_done)
{
// 读取startup package中的length(前四个byte)
pq_getbytes((char *) &len, 1)
pq_getbytes(((char *) &len) + 1, 3) len = pg_ntoh32(len);
len -= 4;
// 读取数据
pq_getbytes(buf, len)// 解析protocol version
port->proto = proto = pg_ntoh32(*((ProtocolVersion *) buf));
// protocol version可能是一个cancel请求
if (proto == CANCEL_REQUEST_CODE)
{
processCancelRequest(port, buf);
/* Not really an error, but we don't want to proceed further */
return STATUS_ERROR;
}// SSL or GSSAPI negotiation request
// 用于client申请ssl加密安全channel
// 这不是必须的,取决于client
// 编译时可以config server是否支持ssl和gssapi
//USE_OPENSSL, ENABLE_GSS
// backend会先通知client是否支持ssl/gssapi
// 再递归调用ProcessStartupPacket完成negotiation
if (proto == NEGOTIATE_SSL_CODE && !ssl_done)
// SSLok = 'N': server不支持ssl
// SSLok = 'S': server支持ssl
if (send(port->sock, &SSLok, 1, 0) != 1)
return ProcessStartupPacket(port, true, SSLok == 'S');
......else if (proto == NEGOTIATE_GSS_CODE && !gss_done)
...// ssl negotiation完成后
// 切换到TopMemoryContext
// !!!! "TODO": further parse
oldcontext = MemoryContextSwitchTo(TopMemoryContext);
port->guc_options = NIL;
while (offset < len)
{
if (strcmp(nameptr, "database") == 0)
port->database_name = pstrdup(valptr);
else if (strcmp(nameptr, "user") == 0)
port->user_name = pstrdup(valptr);
......
else
{
port->guc_options = lappend(port->guc_options,pstrdup(nameptr));
port->guc_options = lappend(port->guc_options,pstrdup(valptr));
// 这里单独把application_name保存到port中,是保证其可以在authention中打log用
// 因为那时候GUC还没有setup,不能用GUC
if (strcmp(nameptr, "application_name") == 0)
{
char*tmp_app_name = pstrdup(valptr);
port->application_name = tmp_app_name;
}......
// 切换到TopMemoryContext
MemoryContextSwitchTo(oldcontext);
BackendRun分析
static void BackendRun(Port *port)
// 为PostgresMain准备参数,是从postmaster中的-o选项获取option(保存于ExtraOptions)
// 具体可以参考https://segmentfault.com/a/1190000039193129
char**av;
intac = 0;
av = (char **) MemoryContextAlloc(TopMemoryContext,
maxac * sizeof(char *));
av[ac++] = "postgres";
pg_split_opts(av, &ac, ExtraOptions);
MemoryContextSwitchTo(TopMemoryContext);
PostgresMain(ac, av, port->database_name, port->user_name);
PostgresMain分析 PostgresMain是所有backend process的主入口函数。
void PostgresMain(int argc, char *argv[],
const char *dbname,
const char *username)
// 有一种standalone的模式,以及作为wal sender。我们只分析作为client的backend process的情况。// 是否向client发送ready for query消息, 默认true,表示startup阶段结束会发送。其它阶段比如每次simple query结束也会发送。
// 详见"startup消息类型"
// case 'Z':/* backend is ready for new query */
volatile bool send_ready_for_query = true;
// 参数解析,会通过SetConfigOption设置GUC,这里是第一次调用,模式为PGC_POSTMASTER,是从postmaster中的-o选项获取option(保存于ExtraOptions)。后面还会有第二次调用,对应从client端获取的option,模式为PGC_BACKEND,PGC_SU_BACKEND。
// "TODO": SetConfigOption 设置GUC解析(特别是ctx区别)
process_postgres_switches(argc, argv, PGC_POSTMASTER, &dbname);
// signal重新注册
pqsignal(SIGHUP, SignalHandlerForConfigReload);
pqsignal(SIGINT, StatementCancelHandler);
/* cancel current query */
pqsignal(SIGTERM, die);
pqsignal(SIGQUIT, quickdie);
pqsignal(SIGPIPE, SIG_IGN);
pqsignal(SIGUSR1, procsignal_sigusr1_handler);
pqsignal(SIGUSR2, SIG_IGN);
pqsignal(SIGFPE, FloatExceptionHandler);
pqsignal(SIGCHLD, SIG_DFL);
// per process的一些init,Semaphore...
// !!! TODO: detailed parse
InitProcess();
// 基本的初始化,bufferpool,timer, portal manager, GUC, process setting...
// 特别的,application_name设置到GUC也在这里
// 下面会单独介绍
InitPostgres(dbname, InvalidOid, username, InvalidOid, NULL, false);
// 删除PostmasterContext,backend不再需要访问postmaster的context了
if (PostmasterContext)
{
MemoryContextDelete(PostmasterContext);
PostmasterContext = NULL;
}// 向client report parameter status
// 只发送GUC中GUC_REPORT类型的参数(auto-report changes to client)
// 参照"startup消息类型"
// case 'S':/* parameter status */
// !!!TODO:参数举例
BeginReportingGUCOptions();
process_session_preload_libraries();
// 向client发送 secret key数据
// 参照"startup消息类型"
// case 'K':/* secret key data from the backend */
if (whereToSendOutput == DestRemote)
{
StringInfoData buf;
pq_beginmessage(&buf, 'K');
pq_sendint32(&buf, (int32) MyProcPid);
pq_sendint32(&buf, (int32) MyCancelKey);
pq_endmessage(&buf);
}// 创建MessageContext,这是给消息处理用,每一轮循环会重置
MessageContext = AllocSetContextCreate(TopMemoryContext, "MessageContext");
// 创建RowDescriptionContext,用于RowDescription messages
row_description_context = AllocSetContextCreate(TopMemoryContext, "RowDescriptionContext",MemoryContextSwitchTo(row_description_context);
initStringInfo(&row_description_buf);
MemoryContextSwitchTo(TopMemoryContext);
// 异常恢复的入口
if (sigsetjmp(local_sigjmp_buf, 1) != 0)
{
......
}// 主循环
for (;
;
)
{
// 初始化
doing_extended_query_message = false;
// 上面提到的MessageContext重置
MemoryContextSwitchTo(MessageContext);
MemoryContextResetAndDeleteChildren(MessageContext);
initStringInfo(&input_message);
InvalidateCatalogSnapshotConditionally();
if (send_ready_for_query)
{
if (IsAbortedTransactionBlockState())
else if (IsTransactionOrTransactionBlock())
else {
ProcessCompletedNotifies();
pgstat_report_stat(false);
set_ps_display("idle");
pgstat_report_activity(STATE_IDLE, NULL);
}// 详见"startup消息类型"
// case 'Z':/* backend is ready for new query */
ReadyForQuery(whereToSendOutput);
send_ready_for_query = false;
}// 到这里startup阶段就结束了,接下来是normal阶段的消息处理
// 下面以simple query为例// 读取command
firstchar = ReadCommand(&input_message);
// 分情况处理command
switch (firstchar)
{
// 'Q':/* simple query */
case 'Q':/* simple query */
{
const char *query_string;
/* Set statement_timestamp() */
SetCurrentStatementStartTimestamp();
query_string = pq_getmsgstring(&input_message);
pq_getmsgend(&input_message);
if (am_walsender)
{
if (!exec_replication_command(query_string))
exec_simple_query(query_string);
}
else
// 执行simple query
exec_simple_query(query_string);
send_ready_for_query = true;
}
break;
// 其它类型消息处理, bind, parse......
InitPostgres介绍 InitPostgres在PostgresMain中调用,进行基本的初始化,bufferpool,timer, portal manager, GUC, process setting...。
特别的,application_name设置到GUC也在这里。authentication也发生在这里。
void
InitPostgres(const char *in_dbname, Oid dboid, const char *username,
Oid useroid, char *out_dbname, bool override_allow_connections)
{
boolbootstrap = IsBootstrapProcessingMode();
// InitProcess中初始化的process信息在这里用到了
InitProcessPhase2();
// shared-invalidation manager相关
// !!! TODO: detailed parse
MyBackendId = InvalidBackendId;
SharedInvalBackendInit(false);
ProcSignalInit(MyBackendId);
// timer 注册
if (!bootstrap)
{
RegisterTimeout(DEADLOCK_TIMEOUT, CheckDeadLockAlert);
RegisterTimeout(STATEMENT_TIMEOUT, StatementTimeoutHandler);
RegisterTimeout(LOCK_TIMEOUT, LockTimeoutHandler);
RegisterTimeout(IDLE_IN_TRANSACTION_SESSION_TIMEOUT,
IdleInTransactionSessionTimeoutHandler);
}// init buffer pool
InitBufferPoolBackend();
// xlog recovery检查
// !!!TODO: detailed parse
(void) RecoveryInProgress();
//初始化relation cache和system catalog caches
RelationCacheInitialize();
InitCatalogCache();
InitPlanCache();
// 初始化portal manager
// !!!TODO: detailed parse
EnablePortalManager();
// stats collection初始化
pgstat_initialize();
RelationCacheInitializePhase2();
// 注册process-exit callback,用来进行pre-shutdown cleanup
before_shmem_exit(ShutdownPostgres, 0);
// start 一个transaction,获取snapshot
// 只用于后面的各种表的访问,会在本函数结尾end
// !!! TODO: detailed parse
if (!bootstrap){
SetCurrentStatementStartTimestamp();
StartTransactionCommand();
XactIsoLevel = XACT_READ_COMMITTED;
(void) GetTransactionSnapshot();
}// authentication
// !!!! TODO: detailed parse
else
{
/* normal multiuser case */
Assert(MyProcPort != NULL);
PerformAuthentication(MyProcPort);
InitializeSessionUserId(username, useroid);
am_superuser = superuser();
}// 从pg_database表中获取client指定连接的database的oid,table space oid,存到MyDatabaseId, MyDatabaseTableSpace。
else if (in_dbname != NULL)
{
HeapTupletuple;
Form_pg_database dbform;
tuple = GetDatabaseTuple(in_dbname);
if (!HeapTupleIsValid(tuple))
ereport(FATAL,
(errcode(ERRCODE_UNDEFINED_DATABASE),
errmsg("database \"%s\" does not exist", in_dbname)));
dbform = (Form_pg_database) GETSTRUCT(tuple);
MyDatabaseId = dbform->oid;
MyDatabaseTableSpace = dbform->dattablespace;
/* take database name from the caller, just for paranoia */
strlcpy(dbname, in_dbname, sizeof(dbname));
}// 获得client连接database的读写锁
LockSharedObject(DatabaseRelationId, MyDatabaseId, 0, RowExclusiveLock);
//
MyProc->databaseId = MyDatabaseId;
// 设置当前catalog snapshot为invalid
// !!!TODO:detailed parse
InvalidateCatalogSnapshot();
// 获取database path
// 每个数据库都有对应的存储目录,例如下面base为table space,123000为这个db的oid,可以通过pg_database查询 - select oid, datname from pg_database;
// /usr/local/pgsql/data4/base/123000/
fullpath = GetDatabasePath(MyDatabaseId, MyDatabaseTableSpace);
// 检查数据库目录
if (access(fullpath, F_OK) == -1)
ValidatePgVersion(fullpath);
// 记录数据库目录到全局变量DatabasePath
SetDatabasePath(fullpath);
RelationCacheInitializePhase3();
initialize_acl();
// 读取pg_database,并设置GUC: server_encoding, client_encoding, lc_collate, lc_ctype
CheckMyDatabase(dbname, am_superuser, override_allow_connections);
// 会把port->guc_options中的option设置到GUC中
// client发送的application_name也被设置到GUC中了
// 我们的线索也到此为止了,回忆一下:
// MyProcPort是在BackendInitialize中被设置为port
// 而client发送的application_name是在BackendInitialize调用的ProcessStartupPacket中被记录到port->guc_options。
if (MyProcPort != NULL)
process_startup_options(MyProcPort, am_superuser);
// pg_db_role_setting中load setting并且设置到数据库
// !!!!TODO: detailed parse
process_settings(MyDatabaseId, GetSessionUserId());
// search path设置
InitializeSearchPath();
// client encoding设置
InitializeClientEncoding();
// session 设置
InitializeSession();
// pgstat start
pgstat_bestart();
// 关闭上面开始的transaction
CommitTransactionCommand();
}
PerformAuthentication分析 InitPostgres中调用了PerformAuthentication进行authentication。此操作对应"startup流程"部分的authentication request/authentication ok。
static void
PerformAuthentication(Port *port)
{
ClientAuthInProgress = true;
// 设置一个60s的超时,防止hang住,会按照STATEMENT_TIMEOUT处理
enable_timeout_after(STATEMENT_TIMEOUT, AuthenticationTimeout * 1000);
// 设置process信息
set_ps_display("authentication");
// authentication主函数,如果遇到错误可能不return
// 下面单独分析它
ClientAuthentication(port);
disable_timeout(STATEMENT_TIMEOUT, false);
// 设置process信息
set_ps_display("startup");
ClientAuthInProgress = false;
}
ClientAuthentication分析 ClientAuthentication是authentication主函数。
postgresql支持下面的authentication方法。
typedef enum UserAuth
{
uaReject,
uaImplicitReject,/* Not a user-visible option */
uaTrust,
uaIdent,
uaPassword,
uaMD5,
uaSCRAM,
uaGSS,
uaSSPI,
uaPAM,
uaBSD,
uaLDAP,
uaCert,
uaRADIUS,
uaPeer
} UserAuth;
postgresql向client发送的authentication code。
/* These are the authentication request codes sent by the backend. */#define AUTH_REQ_OK0/* User is authenticated*/
#define AUTH_REQ_KRB41/* Kerberos V4. Not supported any more. */
#define AUTH_REQ_KRB52/* Kerberos V5. Not supported any more. */
#define AUTH_REQ_PASSWORD3/* Password */
#define AUTH_REQ_CRYPT4/* crypt password. Not supported any more. */
#define AUTH_REQ_MD55/* md5 password */
#define AUTH_REQ_SCM_CREDS6/* transfer SCM credentials */
#define AUTH_REQ_GSS7/* GSSAPI without wrap() */
#define AUTH_REQ_GSS_CONT8/* Continue GSS exchanges */
#define AUTH_REQ_SSPI9/* SSPI negotiate without wrap() */
#define AUTH_REQ_SASL10/* Begin SASL authentication */
#define AUTH_REQ_SASL_CONT 11/* Continue SASL authentication */
#define AUTH_REQ_SASL_FIN12/* Final SASL message */typedef uint32 AuthRequest;
void
ClientAuthentication(Port *port)
{
// 读取pg_hba.conf,查找与本client符合的rule
// 记录到port->hba
hba_getauthmethod(port);
// -- parse hba_getauthmethod
check_hba(port);
roleid = get_role_oid(port->user_name, true);
// parsed_hba_lines是在load_hba中解析过的,详见
// https://segmentfault.com/a/1190000039193129
foreach(line, parsed_hba_lines){
check_hostname
check_ip
check_same_host_or_net
check_db
check_role
... CHECK_FOR_INTERRUPTS();
// 针对不同搞的authentication method进行处理,下面以常见的password方式进行解析
switch (port->hba->auth_method)
{
case uaPassword:
status = CheckPasswordAuth(port, &logdetail);
// -- parse CheckPasswordAuth
// 向client发送authentication request,code 为AUTH_REQ_PASSWORD
sendAuthRequest(port, AUTH_REQ_PASSWORD, NULL, 0);
// -- parse sendAuthRequest
// 参照"startup消息类型"
// case 'R':/* Authentication Request */
pq_beginmessage(&buf, 'R');
pq_sendint32(&buf, (int32) areq);
pq_endmessage(&buf);
passwd = recv_password_packet(port);
// -- parse recv_password_packet
// 参照"startup消息类型"
// case 'p':/* password */
pq_startmsgread();
mtype = pq_getbyte();
if (mtype != 'p')initStringInfo(&buf);
if (pq_getmessage(&buf, 0))// 获取role对应的password
// 本地存储的是shadow_pass(password的hash)
shadow_pass = get_role_password(port->user_name, logdetail);
// client发送的password进行比较
result = plain_crypt_verify(port->user_name, shadow_pass, passwd, logdetail);
break;
// 根据authentication的结果
// 向client发送结果,消息类型为'R'。 如成功,则为authenticon ok(0)
// case 'R':/* Authentication Request */
if (status == STATUS_OK)
sendAuthRequest(port, AUTH_REQ_OK, NULL, 0);
else
auth_failed(port, status, logdetail);
}
结语 本文分析了startup的主要流程,并且以application_name为线索,帮助更连贯的理解整体流程。在这个过程中,对backend process有了更深入理解,其process管理,signal,semaphore,GUC,与client间的通信模型等等。还有很多地方没有进行详细分析(特别是mark TODO的),后续再做进一步分析。
Q&A 暂无
遗留问题
- authentication部分,理论上client应该可以直接提供password,流程上应该可以省去server到client的authentication request这部分处理暂时没有发现。
- 后续可以再抓包展示一下startup过程中的消息流
推荐阅读
- RGB图片处理(1)——RGB概念理解
- 使用CompletableFuture进行多任务并行处理
- h5中使用javascript在客户端对图片进行压缩和尺寸处理,附ts代码
- c++|基于C++的OpenCV(八)图像处理
- go处理文件上传
- 机器学习算法|图像处理中的经典机器学习方法
- 1|C语言编程>第二十周 ③ 请补充fun函数,该函数的功能是(把字符串s中的字符按字符的ASCII码升序排列,处理后的字符串仍然保存在原串中,字符串及其
- 数据库|《SQL必知必会》(7,8)(创建计算字段、使用数据处理函数)
- 数据处理|S-G滤波之包络滤波
- 中智车联(用|中智车联:用 TDengine 高效处理车辆运营可视化管理)