在|在 Kubernetes 上快速测试 Citus 分布式 PostgreSQL 集群(分布式表,共置,引用表,列存储)

在|在 Kubernetes 上快速测试 Citus 分布式 PostgreSQL 集群(分布式表,共置,引用表,列存储)

准备工作 这里假设,你已经在 k8s 上部署好了基于 Citus 扩展的分布式 PostgreSQL 集群。
查看 Citus 集群(kubectl get po -n citus),1 个 Coordinator(协调器) 节点 + 3 个 Worker(工作器) 节点。

NAMEREADYSTATUSRESTARTSAGE citus-coordinator-02/2Running03h55m citus-worker-02/2Running022m citus-worker-12/2Running021m citus-worker-22/2Running021m

进入 coordinator 节点(kubectl -n citus exec -it citus-coordinator-0 -- bash),查看活动的 worker 节点(psql 'host=citus-coordinator user=postgres' -c "SELECT * FROM citus_get_active_worker_nodes(); ")。
node_name| node_port -----------------------------------------------------+----------- citus-worker-1.citus-worker.citus.svc.cluster.local |6432 citus-worker-2.citus-worker.citus.svc.cluster.local |6432 citus-worker-0.citus-worker.citus.svc.cluster.local |6432 (3 rows)

一旦拥有 Citus 集群,就可以开始创建分布式表引用表和使用列存储
创建分布式表 create_distributed_table 将在本地或工作节点之间透明地切分您的表。
进入命令行工具:psql 'host=citus-coordinator user=postgres'
CREATE TABLE events ( device_id bigint, event_id bigserial, event_time timestamptz default now(), data jsonb not null, PRIMARY KEY (device_id, event_id) ); -- 将事件表分布在本地或工作节点上的分片上 SELECT create_distributed_table('events', 'device_id');

执行此操作后,对特定设备 ID 的查询将有效地路由到单个工作节点,而跨设备 ID 的查询将在集群中并行化。
INSERT INTO events (device_id, data) SELECT s % 100, ('{"measurement":'||random()||'}')::jsonb FROM generate_series(1,1000000) s; -- INSERT 0 1000000

获取设备 1 的最后 3 个事件,路由到单个节点
命令行开启计时:postgres=# \timing
SELECT * FROM events WHERE device_id = 1 ORDER BY event_time DESC, event_id DESC LIMIT 3;

device_id | event_id |event_time|data -----------+----------+-------------------------------+------------------------------------- 1 |999901 | 2022-03-24 02:30:50.205478+00 | {"measurement": 0.8822990134507691} 1 |999801 | 2022-03-24 02:30:50.205478+00 | {"measurement": 0.5239176115816448} 1 |999701 | 2022-03-24 02:30:50.205478+00 | {"measurement": 0.9900647926398349} (3 rows)Time: 4.779 ms

执行 sql 语句:

QUERY PLAN --------------------------------------------------------------------------------------------------------- Aggregate(cost=250.00..250.02 rows=1 width=8) Output: COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint) ->Custom Scan (Citus Adaptive)(cost=0.00..0.00 rows=100000 width=8) Output: remote_scan.count Task Count: 32 Tasks Shown: One of 32 ->Task Query: SELECT count(*) AS count FROM public.events_102008 events WHERE true Node: host=citus-worker-0.citus-worker.citus.svc.cluster.local port=6432 dbname=postgres ->Aggregate(cost=725.00..725.01 rows=1 width=8) Output: count(*) ->Seq Scan on public.events_102008 events(cost=0.00..650.00 rows=30000 width=0) Output: device_id, event_id, event_time, data (13 rows)Time: 5.427 ms

使用共置(Co-location)创建分布式表 具有相同分布列的分布式表可以位于同一位置,以实现分布式表之间的高性能分布式连接(join)和外键。 默认情况下,分布式表将根据分布列的类型位于同一位置,但您可以使用 create_distributed_table 中的 colocate_with 参数显式定义同一位置。
CREATE TABLE devices ( device_id bigint primary key, device_name text, device_type_id int ); CREATE INDEX ON devices (device_type_id); -- 将设备表与事件表放在一起 SELECT create_distributed_table('devices', 'device_id', colocate_with := 'events');

【在|在 Kubernetes 上快速测试 Citus 分布式 PostgreSQL 集群(分布式表,共置,引用表,列存储)】插入设备元数据
INSERT INTO devices (device_id, device_name, device_type_id) SELECT s, 'device-'||s, 55 FROM generate_series(0, 99) s;

ALTER TABLE events ADD CONSTRAINT device_id_fk FOREIGN KEY (device_id) REFERENCES devices (device_id);

获得跨分片并行的所有类型 55 设备的平均测量值
SELECT avg((data->>'measurement')::double precision) FROM events JOIN devices USING (device_id) WHERE device_type_id = 55;

avg -------------------- 0.4997412230952178 (1 row)Time: 122.548 ms

Co-location 还可以帮助您扩展 INSERT..SELECT、存储过程和分布式事务。
    • https://docs.citusdata.com/en/stable/articles/aggregation.html
  • 存储过程
    • https://www.citusdata.com/blog/2020/11/21/making-postgres-stored-procedures-9x-faster-in-citus/
  • 分布式事务
    • https://www.citusdata.com/blog/2017/06/02/scaling-complex-sql-transactions/
创建引用表 当您需要不包含分布列的快速 join 或外键时,您可以使用 create_reference_table 在集群中的所有节点之间复制表。
CREATE TABLE device_types ( device_type_id int primary key, device_type_name text not null unique );

跨所有节点复制表以在任何列上启用外键和 join
SELECT create_reference_table('device_types');

INSERT INTO device_types (device_type_id, device_type_name) VALUES (55, 'laptop');

ALTER TABLE devices ADD CONSTRAINT device_type_fk FOREIGN KEY (device_type_id) REFERENCES device_types (device_type_id);

获取类型名称以笔记本电脑开头的设备的最后 3 个事件,跨分片并行
SELECT device_id, event_time, data->>'measurement' AS value, device_name, device_type_name FROM events JOIN devices USING (device_id) JOIN device_types USING (device_type_id) WHERE device_type_name LIKE 'laptop%' ORDER BY event_time DESC LIMIT 3;

device_id |event_time|value| device_name | device_type_name -----------+-------------------------------+---------------------+-------------+------------------ 31 | 2022-03-24 02:30:50.205478+00 | 0.9994211581289107| device-31| laptop 31 | 2022-03-24 02:30:50.205478+00 | 0.13771543211483106 | device-31| laptop 88 | 2022-03-24 02:30:50.205478+00 | 0.5585740912470349| device-88| laptop (3 rows)Time: 96.537 ms

使用列式存储创建表 要在 PostgreSQL 数据库中使用列式存储,您只需将 USING columnar 添加到 CREATE TABLE 语句中,您的数据将使用列式访问方法自动压缩。
CREATE TABLE events_columnar ( device_id bigint, event_id bigserial, event_time timestamptz default now(), data jsonb not null ) USING columnar;

INSERT INTO events_columnar (device_id, data) SELECT d, '{"hello":"columnar"}' FROM generate_series(1,10000000) d;

CREATE TABLE events_row AS SELECT * FROM events_columnar;

postgres=# \d+ List of relations Schema |Name|Type|Owner| Persistence | Access method |Size| Description --------+------------------------------+----------+----------+-------------+---------------+------------+------------- public | citus_tables| view| postgres | permanent|| 0 bytes| public | device_types| table| postgres | permanent| heap| 8192 bytes | public | devices| table| postgres | permanent| heap| 8192 bytes | public | events| table| postgres | permanent| heap| 8192 bytes | public | events_columnar| table| postgres | permanent| columnar| 25 MB| public | events_columnar_event_id_seq | sequence | postgres | permanent|| 8192 bytes | public | events_event_id_seq| sequence | postgres | permanent|| 8192 bytes | public | events_row| table| postgres | permanent| heap| 806 MB| (8 rows)

注意 events_row(806 MB)events_columnar(25 MB) 的对比。压缩了几十倍,效果非常的惊人,大大节省了存储空间。
使用列式存储时,您应该只使用 COPYINSERT..SELECT 批量加载数据以实现良好的压缩。柱状表目前不支持更新、删除和外键。 但是,您可以使用分区表,其中较新的分区使用基于行的存储,而较旧的分区使用列存储进行压缩。
  • Citus 集群 - 官方快速入门教程
  • Citus 集群 - 官方安装指南
  • Citus 简介 - 将 Postgres 转换为分布式数据库
  • Citus 架构及概念
  • Citus 集群官方示例 - 多租户应用程序实战
  • Citus 集群官方教程 - 迁移现有应用程序
  • Citus 集群)官方示例 - 实时仪表盘
  • Citus 集群)官方示例 - 时间序列数据
  • 多租户数据库项目实战(Python/Django+Postgres+Citus)
  • Citus 集群 - 分布式表中的分布列选择最佳实践
  • 扩展分析处理服务(Smartly.io) - 使用 Citus 对 PostgreSQL 数据库进行分片
