Rust开发postgres扩展

前言
Rust 语言是一门通用系统级编程语言,无GC且能保证内存安全、并发安全和高性能而著称。自2008年开始由 Graydon Hoare 私人研发,2009年得到 Mozilla 赞助,2010年首次发布 0.1.0 版本,用于Servo 引擎的研发,于 2015年5月15号发布 1.0 版本。 自发布以来,截止到2021 年的今天,经历六年的发展,Rust 得到稳步上升,已逐渐趋于成熟稳定。 至 2016 年开始,截止到 2021年,Rust 连续五年成为 StackOverflow 语言榜上最受欢迎的语言[1]。 2021年 2 月 9 号,Rust 基金会宣布成立。华为、AWS、Google、微软、Mozilla、Facebook 等科技行业领军巨头加入 Rust 基金会,成为白金成员,以致力于在全球范围内推广和发展 Rust 语言。 ?
官方网如此介绍 Rust : 一门赋予每个人 构建可靠且高效软件能力的语言。 Rust 语言有三大优势值得大家关注:

  1. 高性能。Rust 速度惊人且内存利用率极高。由于没有运行时和垃圾回收,它能够胜任对性能要求特别高的服务,可以在嵌入式设备上运行,还能轻松和其他语言集成。
  2. 可靠性。Rust 丰富的类型系统和所有权模型保证了内存安全和线程安全,让您在编译期就能够消除各种各样的错误。
  3. 生产力。Rust 拥有出色的文档、友好的编译器和清晰的错误提示信息, 还集成了一流的工具——包管理器和构建工具, 智能地自动补全和类型检验的多编辑器支持, 以及自动格式化代码等等。
Rust 和 C 都是硬件直接抽象
Rust 和 C 都是直接对硬件的抽象,都可看作一种「可移植汇编程序」。 Rust 和 C 都能控制数据结构的内存布局、整数大小、栈与堆内存分配、指针间接寻址等,并且一般都能翻译成可理解的机器代码,编译器很少插入 "魔法"。 即便 Rust 比 C 有更高层次的结构,如迭代器、特质(trait)和智能指针,它们也被设计为可预测地优化为简单的机器代码(又称 "零成本抽象")。 Rust的类型的内存布局很简单,例如,可增长的字符串String 和 Vec 正好是{byte*, capacity, length}。Rust没有任何像 Cpp里的 移动 或 复制构造函数 这样的概念,所以对象的传递保证不会比传递指针或 memcpy 更复杂。
总的来说,Rust有媲美C的高性能,同时又具有高效的开发生产力,同时通过FFI可以高效的和其他语言如C进行混合编程或互相调用。本篇文章主要介绍如何利用Rust开发postgres extension。 ?
部署开发环境
配置RUST开发环境
参考 https://www.rust-lang.org/too... IDE 推荐 https://code.visualstudio.com/ + https://rls.booyaa.wtf/
配置postgres开发环境
ubuntu
sudo apt-get install build-essential libreadline-dev zlib1g-dev flex bison libxml2-dev libxslt-dev libssl-dev libxml2-utils xsltproc

Red hat
sudo yum install -y bison-devel readline-devel zlib-devel openssl-devel wget sudo yum groupinstall -y 'Development Tools'

cargo pgx 子命令安装
cargo install cargo-pgx
接下来执行命令cargo pgx init。
Discovered Postgres v13.3, v12.7, v11.12, v10.17 Downloading Postgres v12.7 from https://ftp.postgresql.org/pub/source/v12.7/postgresql-12.7.tar.bz2 Downloading Postgres v13.3 from https://ftp.postgresql.org/pub/source/v13.3/postgresql-13.3.tar.bz2 Downloading Postgres v10.17 from https://ftp.postgresql.org/pub/source/v10.17/postgresql-10.17.tar.bz2 Downloading Postgres v11.12 from https://ftp.postgresql.org/pub/source/v11.12/postgresql-11.12.tar.bz2 Removing /home/wdy/.pgx/10.17 Untarring Postgres v10.17 to /home/wdy/.pgx/10.17 Configuring Postgres v10.17 Removing /home/wdy/.pgx/11.12 Untarring Postgres v11.12 to /home/wdy/.pgx/11.12 Untarring Postgres v12.7 to /home/wdy/.pgx/12.7 Configuring Postgres v11.12 Removing /home/wdy/.pgx/13.3 Untarring Postgres v13.3 to /home/wdy/.pgx/13.3 Configuring Postgres v12.7 Compiling Postgres v10.17 Configuring Postgres v13.3 Compiling Postgres v11.12 Compiling Postgres v12.7 Compiling Postgres v13.3 Installing Postgres v10.17 to /home/wdy/.pgx/10.17/pgx-install Installing Postgres v11.12 to /home/wdy/.pgx/11.12/pgx-install Installing Postgres v12.7 to /home/wdy/.pgx/12.7/pgx-install Installing Postgres v13.3 to /home/wdy/.pgx/13.3/pgx-install Validating /home/wdy/.pgx/10.17/pgx-install/bin/pg_config Initializing data directory at /home/wdy/.pgx/data-10 Validating /home/wdy/.pgx/11.12/pgx-install/bin/pg_config Initializing data directory at /home/wdy/.pgx/data-11 Validating /home/wdy/.pgx/12.7/pgx-install/bin/pg_config Initializing data directory at /home/wdy/.pgx/data-12 Validating /home/wdy/.pgx/13.3/pgx-install/bin/pg_config Initializing data directory at /home/wdy/.pgx/data-13

这个命令会下载版本v10,v11,v12,v13的postgres然后编译到目录~/.pgx/中。这个下载步骤是必须的,因为后续pgx会为每中版本的postgres的header文件生成对应的Rust bindings,以及后续pgx 的测试框架中也会用到。 ?
开发一个简单的extension
创建一个extension项目
使用下面命令创建一个名为my_extension的项目
cargo pgx new my_extension
命令执行后,会生成如下一个目录结构
├── Cargo.toml ├── my_extension.control ├── sql │├── lib.generated.sql │└── load-order.txt └── src └── lib.rs

sql/lib.generated.sql 内容如下
CREATE OR REPLACE FUNCTION "hello_my_extension"() RETURNS text STRICT LANGUAGE c AS 'MODULE_PATHNAME', 'hello_my_extension_wrapper';

src/lib.rs内容如下
use pgx::*; pg_module_magic!(); #[pg_extern] fn hello_my_extension() -> &'static str { "Hello, my_extension" }#[cfg(any(test, feature = "pg_test"))] mod tests { use pgx::*; #[pg_test] fn test_hello_my_extension() { assert_eq!("Hello, my_extension", crate::hello_my_extension()); }}#[cfg(test)] pub mod pg_test { pub fn setup(_options: Vec<&str>) { // perform one-off initialization when the pg_test framework starts }pub fn postgresql_conf_options() -> Vec<&'static str> { // return any postgresql.conf settings that are required for your tests vec![] } }

可以看到pgx已经默认给出了最简单的扩展实现。 #[pg_extern] 宏所修饰的函数就是我们要实现的extension函数,mod tests , pub mod pg_test 是pgx已经为我们写好了的测试模块,用于编写相关测试代码。 ?
pgx默认已经给我们写好了名为 hello_my_extension 的extension,功能很简单,就是返回 "Hello, my_extension" 字符串 ?
运行extension
cd my_extension cargo pgx run pg13# or pg10 or pg11 or pg12

使用cargo pgx run 后跟参数pg13或pg10或pg11或pg12,对应不同的postgres版本,cargo pgx run会把extension编译为一个 .so 共享库文件,复制到对应版本的 ~/.pgx/ 目录中,然后启动Postgres实例,通过psql连接到和extension同名的数据库上。编译完成后,开发者就会处于psql的shell界面中,可以调用extension进行测试了。 ?
这里我们执行 cargo pgx run pg12 得到输出如下:
$ cargo pgx run pg12 building extension with features `pg12` "cargo" "build" "--features" "pg12" "--no-default-features" Updating crates.io index Downloaded generic-array v0.14.4 Downloaded humantime v2.1.0 Downloaded lazycell v1.3.0 Downloaded shlex v1.0.0 Downloaded stable_deref_trait v1.2.0 Downloaded termcolor v1.1.2 Downloaded typenum v1.13.0 Downloaded time-macros v0.1.1 Downloaded which v3.1.1 Downloaded atomic-traits v0.2.0 Downloaded seahash v4.1.0 Downloaded uuid v0.8.2 Downloaded as-slice v0.1.5 Downloaded pgx v0.1.21 Downloaded peeking_take_while v0.1.2 Downloaded proc-macro-hack v0.5.19 Downloaded rustc-hash v1.1.0 Downloaded serde_cbor v0.11.1 Downloaded time-macros-impl v0.1.2 Downloaded bindgen v0.58.1 Downloaded cexpr v0.4.0 Downloaded env_logger v0.8.4 Downloaded standback v0.2.17 Downloaded generic-array v0.12.4 Downloaded getrandom v0.2.3 Downloaded clang-sys v1.2.0 Downloaded glob v0.3.0 Downloaded const_fn v0.4.8 Downloaded generic-array v0.13.3 Downloaded half v1.7.1 Downloaded hash32 v0.1.1 Downloaded enum-primitive-derive v0.2.1 Downloaded heapless v0.6.1 Downloaded build-deps v0.1.4 Downloaded libloading v0.7.0 Downloaded nom v5.1.2 Downloaded time v0.2.27 Downloaded pgx-macros v0.1.21 Downloaded pgx-pg-sys v0.1.21 Downloaded 39 crates (2.0 MB) in 2.74s Compiling version_check v0.9.3 Compiling libc v0.2.98 Compiling cfg-if v1.0.0 Compiling autocfg v1.0.1 Compiling proc-macro2 v1.0.27 Compiling unicode-xid v0.2.2 Compiling syn v1.0.73 Compiling memchr v2.4.0 Compiling lazy_static v1.4.0 Compiling serde_derive v1.0.126 Compiling tinyvec_macros v0.1.0 Compiling getrandom v0.1.16 Compiling matches v0.1.8 Compiling serde v1.0.126 Compiling log v0.4.14 Compiling crossbeam-utils v0.8.5 Compiling byteorder v1.4.3 Compiling glob v0.3.0 Compiling percent-encoding v2.1.0 Compiling crc32fast v1.2.1 Compiling ryu v1.0.5 Compiling adler v1.0.2 Compiling ppv-lite86 v0.2.10 Compiling crossbeam-epoch v0.9.5 Compiling regex-syntax v0.6.25 Compiling bitflags v1.2.1 Compiling mime v0.3.16 Compiling typenum v1.13.0 Compiling rayon-core v1.9.1 Compiling scopeguard v1.1.0 Compiling serde_json v1.0.64 Compiling unicode-width v0.1.8 Compiling itoa v0.4.7 Compiling base64 v0.11.0 Compiling httpdate v0.3.2 Compiling xml-rs v0.8.3 Compiling proc-macro-hack v0.5.19 Compiling humantime v2.1.0 Compiling vec_map v0.8.2 Compiling semver-parser v0.7.0 Compiling ansi_term v0.11.0 Compiling unescape v0.1.0 Compiling bindgen v0.58.1 Compiling termcolor v1.1.2 Compiling strsim v0.8.0 Compiling either v1.6.1 Compiling shlex v1.0.0 Compiling lazycell v1.3.0 Compiling peeking_take_while v0.1.2 Compiling rustc-hash v1.1.0 Compiling const_fn v0.4.8 Compiling stable_deref_trait v1.2.0 Compiling heapless v0.6.1 Compiling half v1.7.1 Compiling cfg-if v0.1.10 Compiling once_cell v1.8.0 Compiling seahash v4.1.0 Compiling libloading v0.7.0 Compiling tinyvec v1.2.0 Compiling unicode-bidi v0.3.5 Compiling unicase v2.6.0 Compiling nom v5.1.2 Compiling standback v0.2.17 Compiling generic-array v0.14.4 Compiling time v0.2.27 Compiling memoffset v0.6.4 Compiling miniz_oxide v0.4.4 Compiling rayon v1.5.1 Compiling num-traits v0.2.14 Compiling form_urlencoded v1.0.1 Compiling hash32 v0.1.1 Compiling build-deps v0.1.4 Compiling clang-sys v1.2.0 Compiling textwrap v0.11.0 Compiling semver v0.9.0 Compiling unicode-normalization v0.1.19 Compiling rustc_version v0.2.3 Compiling aho-corasick v0.7.18 Compiling quote v1.0.9 Compiling atty v0.2.14 Compiling dirs-sys v0.3.6 Compiling socks v0.3.3 Compiling num_cpus v1.13.0 Compiling which v3.1.1 Compiling getrandom v0.2.3 Compiling crossbeam-channel v0.5.1 Compiling idna v0.2.3 Compiling mime_guess v2.0.3 Compiling atomic-traits v0.2.0 Compiling generic-array v0.13.3 Compiling generic-array v0.12.4 Compiling colored v2.0.0 Compiling clap v2.33.3 Compiling rand_core v0.5.1 Compiling dirs v3.0.2 Compiling regex v1.5.4 Compiling uuid v0.8.2 Compiling flate2 v1.0.20 Compiling url v2.2.2 Compiling cexpr v0.4.0 Compiling as-slice v0.1.5 Compiling rand_chacha v0.2.2 Compiling crossbeam-deque v0.8.0 Compiling env_proxy v0.4.1 Compiling env_logger v0.8.4 Compiling rand v0.7.3 Compiling rttp_client v0.1.0 Compiling thiserror-impl v1.0.26 Compiling time-macros-impl v0.1.2 Compiling enum-primitive-derive v0.2.1 Compiling time-macros v0.1.1 Compiling thiserror v1.0.26 Compiling serde-xml-rs v0.4.1 Compiling toml v0.5.8 Compiling serde_cbor v0.11.1 Compiling pgx-utils v0.1.21 Compiling pgx-pg-sys v0.1.21 Compiling pgx-macros v0.1.21 Compiling pgx v0.1.21 Compiling my_extension v0.0.0 (/home/wdy/gitlab/valid-my-idea/rust-dig/pgextionsion/my_extension) Finished dev [unoptimized + debuginfo] target(s) in 1m 28sinstalling extension Copying control file to `/home/wdy/.pgx/12.7/pgx-install/share/postgresql/extension/my_extension.control` Copying shared library to `/home/wdy/.pgx/12.7/pgx-install/lib/postgresql/my_extension.so` Writing extension schema to `/home/wdy/.pgx/12.7/pgx-install/share/postgresql/extension/my_extension--1.0.sql` Finished installing my_extension Starting Postgres v12 on port 28812 Creating database my_extension psql (12.7) Type "help" for help.my_extension=#

可以看到最后进入了名为my_extension的psql界面中。 接下来创建EXTENSION并调用,可以看到正常输出字符串
my_extension=# create extension my_extension; CREATE EXTENSION my_extension=# select hello_my_extension(); hello_my_extension --------------------- Hello, my_extension (1 row)

Array
Rust的Vec类型对应 postgrs中的ARRAY[]:: ,如果某个扩展函数要返回多个相同类型的value,可以使用Vec进行返回,如下例中的 static_names 和 static_names_2d,最终在psql终端调用后返回的是单行单列。 如果某个扩展返回的是一个Iterator,那么返回的是一个多行单列数据,参见下面例子中的 static_names_iter 和 static_names_2d_iter。 ?
代码
#[pg_extern] fn static_names() -> Vec> { vec![Some("King"), Some("Eastern"), None, Some("Sun")] }#[pg_extern] fn static_names_iter() -> impl std::iter::Iterator> { vec![Some("Brandy"), Some("Sally"), None, Some("Anchovy")].into_iter() }#[pg_extern] fn static_names_2d() -> Vec>> { vec![ vec![Some("Brandy"), Some("Sally"), None, Some("Anchovy")], vec![Some("Eric"), Some("David")], vec![Some("ZomboDB"), Some("PostgreSQL"), Some("Elasticsearch")], ] }#[pg_extern] fn static_names_2d_iter() -> impl std::iter::Iterator>> { vec![ vec![Some("Brandy"), Some("Sally"), None, Some("Anchovy")], vec![Some("Eric"), Some("David")], vec![Some("ZomboDB"), Some("PostgreSQL"), Some("Elasticsearch")], ] .into_iter() }

验证
执行 cargo pgx run pg13
installing extension Copying control file to `/home/wdy/.pgx/13.3/pgx-install/share/postgresql/extension/my_extension.control` Copying shared library to `/home/wdy/.pgx/13.3/pgx-install/lib/postgresql/my_extension.so` Writing extension schema to `/home/wdy/.pgx/13.3/pgx-install/share/postgresql/extension/my_extension--1.0.sql` Finished installing my_extension Starting Postgres v13 on port 28813 Re-using existing database my_extension psql (13.3) Type "help" for help.my_extension=# drop extension my_extension; create extension my_extension; DROP EXTENSION CREATE EXTENSION my_extension=# select static_names(); static_names ----------------------------- {King,Eastern,NULL,Sun} (1 row)my_extension=# select static_names_iter(); static_names_iter ------------------- Brandy Sally Anchovy (4 rows)my_extension=# select static_names_2d(); static_names_2d ------------------------------------------------------------------------------------- {"{Brandy,Sally,NULL,Anchovy}","{Eric,David}","{ZomboDB,PostgreSQL,Elasticsearch}"} (1 row)my_extension=# select static_names_2d_iter(); static_names_2d_iter ------------------------------------ {Brandy,Sally,NULL,Anchovy} {Eric,David} {ZomboDB,PostgreSQL,Elasticsearch} (3 rows)

完整代码
完整代码
AGGREGATE扩展
Postgres可以通过 CREATE AGGREGATE 定义一个新的聚集函数。一个简单的聚集函数包含一个或两个普通的函数:
  1. 状态转移函数 sfunc
  2. 可选的最终计算函数 ffunc
sfunc( internal-state, next-data-values ) ---> next-internal-state ffunc( internal-state ) ---> aggregate-value

  1. 可能还需要定义聚集函数内部状态的数据类型 stype
  2. 内部函数状态初始值 initcond ,是一个类型text的值。
那么我们只需参考上面的样例 开发对应的 sfunc, ffunc ,stype,然后再使用 CREATE AGGREGATE 创建聚集函数。 ?
创建扩展
执行下面命令
cargo pgx new my_agg
定义内部状态类型 stype
类型 IntegerAvgState 用于聚集函数计算过程中存储中间状态数据
#[derive(Serialize, Deserialize, PostgresType)] pub struct IntegerAvgState { sum: i32, n: i32, } impl Default for IntegerAvgState { fn default() -> Self { Self { sum: 0, n: 0 } } } impl IntegerAvgState { fn acc(&self, v: i32) -> Self { Self { sum: self.sum + v, n: self.n + 1, } } fn finalize(&self) -> i32 { self.sum / self.n } }

状态转移函数 sfunc
#[pg_extern] fn integer_avg_state_func( internal_state: IntegerAvgState, next_data_value: i32, ) -> IntegerAvgState { internal_state.acc(next_data_value) }

最终计算函数 ffunc
#[pg_extern] fn integer_avg_final_func(internal_state: IntegerAvgState) -> i32 { internal_state.finalize() }

创建聚集函数
extension_sql!( r#" CREATE AGGREGATE MYAVG (integer) ( sfunc = integer_avg_state_func, stype = IntegerAvgState, finalfunc = integer_avg_final_func, initcond = '{"sum": 0, "n": 0}' ); "# );

完整代码 ?
验证聚集函数
执行 cargo pgx run pg12 进入到postgres的psql命令行界面,然后执行下面操作
my_agg=# DROP EXTENSION my_agg; CREATE EXTENSION my_agg; DROP EXTENSION CREATE EXTENSION my_agg=#create table t (c integer); CREATE TABLE my_agg=# insert into t (c) values (1), (2), (3); INSERT 0 3 my_agg=# select MYAVG(c) from t; myavg ------- 2 (1 row)my_agg=# drop table t; DROP TABLE my_agg=#create table t (c integer,b text); CREATE TABLE my_agg=# insert into t (c,b) values (1,'king'), (2,'eastern'), (3,'sun'); INSERT 0 3 my_agg=# select MYAVG(c) from t; myavg ------- 2 (1 row)

完整代码
完整代码
TOPN AGGREGATE扩展
现在我们实现一个稍微复杂一些的聚集函数,求表中某列value最大的10个数值,经典的方法就是采用最小堆实现。 Rust标准库中 BinaryHeap 默认是最大堆,通过使用 Reverse 进行封装,就可以得到一个最小堆。 ?
创建扩展
cargo pgx new my_topn
定义内部状态类型 stype
#[derive(Serialize, Deserialize, PostgresType)] pub struct TopState { min_heap:BinaryHeap>, n:usize, } impl Default for TopState { fn default() -> Self { Self { min_heap:BinaryHeap::new(),n:10 } } }

为 TopState 实现 acc方法,用于状态转移,更新最小堆
impl TopState { fn acc(& mut self, v: i32){ if self.min_heap.len()

状态转移函数 sfunc
#[pg_extern] fn integer_topn_state_func( mut internal_state: TopState, next_data_value: i32, ) -> TopState { internal_state.acc(next_data_value); internal_state }

最终计算函数 ffunc
#[pg_extern] fn integer_topn_final_func(internal_state: TopState) -> Vec { internal_state.min_heap.into_sorted_vec().iter().map(|x|x.0).collect() }

创建聚集函数 MYMAXN
extension_sql!( r#" CREATE AGGREGATE MYMAXN (integer) ( sfunc = integer_topn_state_func, stype = TopState, finalfunc = integer_topn_final_func, initcond = '{"min_heap":[],"n":10}' ); "# );

验证聚集函数 MYMAXN
执行 cargo pgx run pg13 进入到postgres的psql命令行界面 执行下面sql命令创建测试数据
create table people ( idinteger, namevarchar(32), ageinteger, gradenumeric(4, 2), birthdaydate, logintime timestamp ); insert into people select generate_series(1,20) as id, md5(random()::text) as name, (random()*100)::integer as age, (random()*99)::numeric(4,2) as grade, now() - ((random()*1000)::integer||' day')::interval as birthday, clock_timestamp() as logintime;

得到测试数据如下
my_topn=# select age from people ; age ----- 28 62 3 20 46 23 74 19 46 26 70 90 22 45 30 46 43 70 78 96 (20 rows)

执行聚集函数MYMAXN
my_topn=# select MYMAXN(age) from people; mymaxn --------------------------------- {96,90,78,74,70,70,62,46,46,46} (1 row)

完整代码
注意
如果没有指定 initcond,则在创建extension的时候会报如下的错误提示
ERROR:must not omit initial value when transition function is strict and transition type is not compatible with input type

升级版TOPN
同时返回表中中指定列最大的n个值和最小的n个值。在原有的数据结构中新增一个最大堆即可。 ?
定义内部状态类型 stype
#[derive(Serialize, Deserialize, PostgresType)] pub struct TopState { min_heap:BinaryHeap>, max_heap:BinaryHeap, n:usize, } impl Default for TopState { fn default() -> Self { Self { min_heap:BinaryHeap::new(),max_heap:BinaryHeap::new(),n:10 } } }

为 TopState 实现 acc方法,用于状态转移,更新最小堆
impl TopState { fn acc(& mut self, v: i32){ if self.min_heap.len()=*top{ return }// 插入到最大堆中,然后移除堆中的最大值 self.max_heap.push(v); self.max_heap.pop().unwrap(); return }}

状态转移函数 sfunc
#[pg_extern] fn integer_topn_state_func( mut internal_state: TopState, next_data_value: i32, ) -> TopState { internal_state.acc(next_data_value); internal_state.acc_max(next_data_value); internal_state }

最终计算函数 ffunc
#[pg_extern] fn integer_topn_final_func(internal_state: TopState) -> Vec> { vec![ internal_state.min_heap.into_sorted_vec().iter().map(|x|x.0).collect(), internal_state.max_heap.into_sorted_vec(), ] }

创建聚集函数 MYMAXN
extension_sql!( r#" CREATE AGGREGATE MYMAXN (integer) ( sfunc = integer_topn_state_func, stype = TopState, finalfunc = integer_topn_final_func, initcond = '{"min_heap":[],"max_heap":[],"n":10}' ); "# );

验证聚集函数 MYMAXN
my_topn=# select MYMAXN(age) from people; mymaxn ---------------------------------------------------------------------- {"{96,90,78,74,70,70,62,46,46,46}","{3,19,20,22,23,26,28,30,43,45}"} (1 row)

完整代码
参考文献
  1. https://mp.weixin.qq.com/s/9r...
  2. Postgres Compile and Install from source code https://wiki.postgresql.org/w...
  3. cargo pgx 命令介绍 https://github.com/zombodb/pg...
  4. https://blog.timescale.com/bl...
  5. https://www.postgresql.org/do...
【Rust开发postgres扩展】Rust开发postgres扩展
文章图片

    推荐阅读