Citus 分布式 PostgreSQL 集群 – SQL Reference(查询分布式表 SQL)

image

如前几节所述,Citus 是一个扩展,它扩展了最新的 PostgreSQL 以进行分布式执行。这意味着您可以在 Citus 协调器上使用标准 PostgreSQL SELECT 查询进行查询。 Citus 将并行化涉及复杂选择、分组和排序以及 JOINSELECT 查询,以加快查询性能。在高层次上,CitusSELECT 查询划分为更小的查询片段,将这些查询片段分配给 worker,监督他们的执行,合并他们的结果(如果需要,对它们进行排序),并将最终结果返回给用户。

在以下部分中,我们将讨论您可以使用 Citus 运行的不同类型的查询。

聚合函数

Citus 支持和并行化 PostgreSQL 支持的大多数聚合函数,包括自定义用户定义的聚合。 聚合使用以下三种方法之一执行,优先顺序如下:

  1. 当聚合按表的分布列分组时,Citus 可以将整个查询的执行下推到每个 worker。 在这种情况下支持所有聚合,并在 worker 上并行执行。(任何正在使用的自定义聚合都必须安装在 worker 身上。)

  2. 当聚合没有按表的分布列分组时,Citus 仍然可以根据具体情况进行优化。Citussum()avg()count(distinct) 等某些聚合有内部规则,允许它重写查询以对 worker 进行部分聚合。例如,为了计算平均值,Citus 从每个 worker 那里获得一个总和和一个计数,然后 coordinator 节点计算最终的平均值。特殊情况聚合的完整列表:

    avg, min, max, sum, count, array_agg, jsonb_agg, jsonb_object_agg, json_agg, json_object_agg, bit_and, bit_or, bool_and, bool_or, every, hll_add_agg, hll_union_agg, topn_add_agg, topn_union_agg, any_value, var_pop(float4), var_pop(float8), var_samp(float4), var_samp(float8), variance(float4), variance(float8) stddev_pop(float4), stddev_pop(float8), stddev_samp(float4), stddev_samp(float8) stddev(float4), stddev(float8) tdigest(double precision, int), tdigest_percentile(double precision, int, double precision), tdigest_percentile(double precision, int, double precision[]), tdigest_percentile(tdigest, double precision), tdigest_percentile(tdigest, double precision[]), tdigest_percentile_of(double precision, int, double precision), tdigest_percentile_of(double precision, int, double precision[]), tdigest_percentile_of(tdigest, double precision), tdigest_percentile_of(tdigest, double precision[])

  3. 最后的手段:从 worker 中提取所有行并在 coordinator 节点上执行聚合。 如果聚合未在分布列上分组,并且不是预定义的特殊情况之一,则 Citus 会退回到这种方法。 它会导致网络开销,并且如果要聚合的数据集太大,可能会耗尽 coordinator 的资源。(可以禁用此回退,见下文。)

请注意,查询中的微小更改可能会改变执行模式,从而导致潜在的令人惊讶的低效率。例如,按非分布列分组的 sum(x) 可以使用分布式执行,而 sum(distinct x) 必须将整个输入记录集拉到 coordinator

SELECT sum(value1), sum(distinct value2) FROM distributed_table;

为避免意外将数据拉到 coordinator,可以设置一个 GUC

SET citus.coordinator_aggregation_strategy TO 'disabled';

请注意,禁用 coordinator 聚合策略将完全阻止 “类型三”(最后的手段) 聚合查询工作。

Count (Distinct) 聚合

Citus 以多种方式支持 count(distinct) 聚合。
如果 count(distinct) 聚合在分布列上,Citus 可以直接将查询下推给 worker
如果不是,Citus 对每个 worker 运行 select distinct 语句,
并将列表返回给 coordinator,从中获取最终计数。

请注意,当 worker 拥有更多 distinct 项时,传输此数据会变得更慢。
对于包含多个 count(distinct) 聚合的查询尤其如此,例如:

-- multiple distinct counts in one query tend to be slow
SELECT count(distinct a), count(distinct b), count(distinct c)
FROM table_abc;

对于这类查询,worker 上产生的 select distinct 语句本质上会产生要传输到 coordinator 的行的 cross-product(叉积)

为了提高性能,您可以选择进行近似计数。请按照以下步骤操作:

  1. 在所有 PostgreSQL 实例(coordinator 和所有 worker)上下载并安装 hll 扩展。有关获取扩展的详细信息,请访问 PostgreSQL hll github 存储库。
  2. 只需从 coordinator 运行以下命令,即可在所有 PostgreSQL 实例上创建 hll 扩展
    CREATE EXTENSION hll;
    
  3. 通过设置 Citus.count_distinct_error_rate 配置值启用计数不同的近似值。 此配置设置的较低值预计会提供更准确的结果,但需要更多时间进行计算。我们建议将其设置为 0.005
    SET citus.count_distinct_error_rate to 0.005;
    

    在这一步之后,count(distinct) 聚合会自动切换到使用 HLL,而无需对您的查询进行任何更改。 您应该能够在表的任何列上运行近似 count distinct 查询。

HyperLogLog 列

某些用户已经将他们的数据存储为 HLL 列。在这种情况下,他们可以通过调用 hll_union_agg(hll_column) 动态汇总这些数据。

估计 Top N 个项

通过应用 countsortlimit 来计算集合中的前 n 个元素很简单。 然而,随着数据大小的增加,这种方法变得缓慢且资源密集。使用近似值更有效。

Postgres 的开源 TopN 扩展可以快速获得 “top-n” 查询的近似结果。该扩展将 top 值具体化为 JSON 数据类型。TopN 可以增量更新这些 top 值,或者在不同的时间间隔内按需合并它们。

基本操作

在查看 TopN 的实际示例之前,让我们看看它的一些原始操作是如何工作的。首先 topn_add 更新一个 JSON 对象,其中包含一个 key 被看到的次数:

select topn_add('{}', 'a');
-- => {"a": 1}

-- record the sighting of another "a"
select topn_add(topn_add('{}', 'a'), 'a');
-- => {"a": 2}

该扩展还提供聚合以扫描多个值:

-- for normal_rand
create extension tablefunc;

-- count values from a normal distribution
SELECT topn_add_agg(floor(abs(i))::text)
  FROM normal_rand(1000, 5, 0.7) i;
-- => {"2": 1, "3": 74, "4": 420, "5": 425, "6": 77, "7": 3}

如果 distinct 值的数量超过阈值,则聚合会丢弃那些最不常见的信息。
这可以控制空间使用。阈值可以由 topn.number_of_counters GUC 控制。它的默认值为 1000

现实例子

现在来看一个更现实的例子,说明 TopN 在实践中是如何工作的。让我们提取 2000 年的亚马逊产品评论,并使用 TopN 快速查询。首先下载数据集:

curl -L https://examples.citusdata.com/customer_reviews_2000.csv.gz | \
  gunzip > reviews.csv

接下来,将其摄取到分布式表中:

CREATE TABLE customer_reviews
(
    customer_id TEXT,
    review_date DATE,
    review_rating INTEGER,
    review_votes INTEGER,
    review_helpful_votes INTEGER,
    product_id CHAR(10),
    product_title TEXT,
    product_sales_rank BIGINT,
    product_group TEXT,
    product_category TEXT,
    product_subcategory TEXT,
    similar_product_ids CHAR(10)[]
);

SELECT create_distributed_table('customer_reviews', 'product_id');

\COPY customer_reviews FROM 'reviews.csv' WITH CSV

接下来我们将添加扩展,创建一个目标表来存储 TopN 生成的 json 数据,并应用我们之前看到的 topn_add_agg 函数。

-- run below command from coordinator, it will be propagated to the worker nodes as well
CREATE EXTENSION topn;

-- a table to materialize the daily aggregate
CREATE TABLE reviews_by_day
(
  review_date date unique,
  agg_data jsonb
);

SELECT create_reference_table('reviews_by_day');

-- materialize how many reviews each product got per day per customer
INSERT INTO reviews_by_day
  SELECT review_date, topn_add_agg(product_id)
  FROM customer_reviews
  GROUP BY review_date;

现在,我们无需在 customer_reviews 上编写复杂的窗口函数,只需将 TopN 应用于 reviews_by_day。 例如,以下查询查找前五天中每一天最常被评论的产品:

SELECT review_date, (topn(agg_data, 1)).*
FROM reviews_by_day
ORDER BY review_date
LIMIT 5;
┌─────────────┬────────────┬───────────┐
│ review_date │    item    │ frequency │
├─────────────┼────────────┼───────────┤
│ 2000-01-01  │ 0939173344 │        12 │
│ 2000-01-02  │ B000050XY8 │        11 │
│ 2000-01-03  │ 0375404368 │        12 │
│ 2000-01-04  │ 0375408738 │        14 │
│ 2000-01-05  │ B00000J7J4 │        17 │
└─────────────┴────────────┴───────────┘

TopN 创建的 json 字段可以与 topn_uniontopn_union_agg 合并。 我们可以使用后者来合并整个第一个月的数据,并列出该期间最受好评的五个产品。

SELECT (topn(topn_union_agg(agg_data), 5)).*
FROM reviews_by_day
WHERE review_date >= '2000-01-01' AND review_date < '2000-02-01'
ORDER BY 2 DESC;
┌────────────┬───────────┐
│    item    │ frequency │
├────────────┼───────────┤
│ 0375404368 │       217 │
│ 0345417623 │       217 │
│ 0375404376 │       217 │
│ 0375408738 │       217 │
│ 043936213X │       204 │
└────────────┴───────────┘

有关更多详细信息和示例,请参阅 TopN readme

百分位计算

在大量行上找到精确的百分位数可能会非常昂贵,
因为所有行都必须转移到 coordinator 以进行最终排序和处理。
另一方面,找到近似值可以使用所谓的 sketch 算法在 worker 节点上并行完成。 coordinator 节点然后将压缩摘要组合到最终结果中,而不是读取完整的行。

一种流行的百分位数 sketch 算法使用称为 t-digest 的压缩数据结构,可在 tdigest 扩展中用于 PostgreSQLCitus 集成了对此扩展的支持。

以下是在 Citus 中使用 t-digest 的方法:

  1. 在所有 PostgreSQL 节点( coordinator 和所有 worker)上下载并安装 tdigest 扩展。tdigest 扩展 github 存储库有安装说明。
  2. 在数据库中创建 tdigest 扩展。在 coordinator 上运行以下命令:
    CREATE EXTENSION tdigest;
    

    coordinator 也会将命令传播给 worker

当在查询中使用扩展中定义的任何聚合时,Citus 将重写查询以将部分 tdigest 计算下推到适用的 worker

T-digest 精度可以通过传递给聚合的 compression 参数来控制。
权衡是准确性与 workercoordinator 之间共享的数据量。
有关如何在 tdigest 扩展中使用聚合的完整说明,请查看官方 tdigest github 存储库中的文档。

限制下推

Citus 还尽可能将限制条款下推到 worker 的分片,以最大限度地减少跨网络传输的数据量。

但是,在某些情况下,带有 LIMIT 子句的 SELECT 查询可能需要从每个分片中获取所有行以生成准确的结果。 例如,如果查询需要按聚合列排序,则需要所有分片中该列的结果来确定最终聚合值。 由于大量的网络数据传输,这会降低 LIMIT 子句的性能。 在这种情况下,如果近似值会产生有意义的结果,Citus 提供了一种用于网络高效近似 LIMIT 子句的选项。

LIMIT 近似值默认禁用,可以通过设置配置参数 citus.limit_clause_row_fetch_count 来启用。
在这个配置值的基础上,Citus 会限制每个任务返回的行数,用于在 coordinator 上进行聚合。 由于这个 limit,最终结果可能是近似的。增加此 limit 将提高最终结果的准确性,同时仍提供从 worker 中提取的行数的上限。

SET citus.limit_clause_row_fetch_count to 10000;

分布式表的视图

Citus 支持分布式表的所有视图。有关视图的语法和功能的概述,请参阅 CREATE VIEWPostgreSQL 文档。

请注意,某些视图导致查询计划的效率低于其他视图。
有关检测和改进不良视图性能的更多信息,请参阅子查询/CTE 网络开销
(视图在内部被视为子查询。)

Citus 也支持物化视图,并将它们作为本地表存储在 coordinator 节点上。

连接(Join)

Citus 支持任意数量的表之间的 equi-JOIN,无论它们的大小和分布方法如何。
查询计划器根据表的分布方式选择最佳连接方法和 join 顺序。
它评估几个可能的 join 顺序并创建一个 join 计划,该计划需要通过网络传输最少的数据。

共置连接

当两个表共置时,它们可以在它们的公共分布列上有效地 joinco-located join(共置连接)join 两个大型分布式表的最有效方式。

注意

确保表分布到相同数量的分片中,并且每个表的分布列具有完全匹配的类型。尝试加入类型略有不同的列(例如 `int` 和 `bigint`)可能会导致问题。

引用表连接

引用表可以用作“维度”表,
以有效地与大型“事实”表连接。因为引用表在所有 worker 上完全复制,
所以 reference join 可以分解为每个 worker 上的本地连接并并行执行。
reference join 就像一个更灵活的 co-located join 版本,
因为引用表没有分布在任何特定的列上,并且可以自由地 join 到它们的任何列上。

引用表也可以与 coordinator 节点本地的表连接。

重新分区连接

在某些情况下,您可能需要在除分布列之外的列上连接两个表。
对于这种情况,Citus 还允许通过动态重新分区查询的表来连接非分布 key 列。

在这种情况下,要分区的表由查询优化器根据分布列、连接键和表的大小来确定。
使用重新分区的表,可以确保只有相关的分片对相互连接,从而大大减少了通过网络传输的数据量。

通常,co-located joinrepartition join 更有效,因为 repartition join 需要对数据进行混洗。
因此,您应该尽可能通过 common join 键来分布表。

更多

  1. Citus 分布式 PostgreSQL 集群 – SQL Reference(创建和修改分布式表 DDL)
  2. Citus 分布式 PostgreSQL 集群 – SQL Reference(摄取、修改数据 DML)

 2 total views,  1 views today

页面下部广告