如何避免在分布式表上合并高基数子 select 聚合
How to avoid merging high cardinality sub-select aggregations on distributed tables
在 Clickhouse 中,我有一个很大的 table A,其中包含以下列:
date, user_id, operator, active
在 table A 中,事件已经根据日期、user_id 和运算符进行了预聚合,而列 'active' 表示存在某种 activity 用户在指定日期。
TableA分布在2个shards/servers:首先我在每个服务器上创建了tableA_local(PK是日期,user_id)。然后我创建了分布式 table A 以使用 hash(userid, operator) 作为分片键来合并本地 tables A_local。
User_id 是高基数字段(数千万到数亿),而列 'operator' 是低基数(大约 1000 个不同的值)。每个 user_id 都属于一个单独的运算符,也就是说 tuple(user_id, operator) 与 user_id 本身具有相同的基数。
我需要计算每个运营商在给定时间段内活跃超过 N 天的用户数。为此,我首先需要找到每个 user_id 用户在给定时间段内活跃的天数,我在 subselect 中执行此操作。然后,在 main select 中,我计算按运营商分组的用户数。
SELECT
operator,
count() AS cnt_user
FROM
(
SELECT
user_id,
operator,
count() AS cnt
FROM A
WHERE date >= '2019-06-01' AND date <= '2019-08-31'
AND active = 1
GROUP BY
user_id,
operator
HAVING cnt >= 30
)
GROUP BY operator
使用 user_id 和运算符进行分片的想法是将用户路由到不同的分片。这样,我希望完整的查询(select 和 subselect)可以在每个 shard/server 上独立地 运行,然后最终聚合将在小基数集上执行:运算符 -> 计数。
但是,当我 运行 在很长一段时间(几个月)内进行此查询时,Clickhouse 会抛出异常,告知已超出最大查询内存分配。如果我 运行 在本地 table 上进行相同的查询,则没有此类异常并返回结果。 Clickhouse 首先将来自 subselect 的所有记录合并到两个分片上,然后计算外部聚合。问题是如何重写查询 or/and 更改架构,以强制 Clickhouse 在本地执行两个聚合,然后在最后一步合并低基数聚合(通过运算符)?我希望在 user_id 和运算符上设置分片键会让 Clickhouse 自然地做到这一点,但事实似乎并非如此。
在每个分片
create view xxx as
SELECT
user_id,
operator,
count() AS cnt
FROM A_local
GROUP BY
user_id,
operator
HAVING cnt >= 30
create xxx_d Distributed(,xxx);
select ....
from xxx_d
WHERE date >= '2019-06-01' AND date <= '2019-08-31'
AND active = 1
GROUP BY operator
settings distributed_group_by_no_merge=1
在 Clickhouse 中,我有一个很大的 table A,其中包含以下列:
date, user_id, operator, active
在 table A 中,事件已经根据日期、user_id 和运算符进行了预聚合,而列 'active' 表示存在某种 activity 用户在指定日期。
TableA分布在2个shards/servers:首先我在每个服务器上创建了tableA_local(PK是日期,user_id)。然后我创建了分布式 table A 以使用 hash(userid, operator) 作为分片键来合并本地 tables A_local。 User_id 是高基数字段(数千万到数亿),而列 'operator' 是低基数(大约 1000 个不同的值)。每个 user_id 都属于一个单独的运算符,也就是说 tuple(user_id, operator) 与 user_id 本身具有相同的基数。
我需要计算每个运营商在给定时间段内活跃超过 N 天的用户数。为此,我首先需要找到每个 user_id 用户在给定时间段内活跃的天数,我在 subselect 中执行此操作。然后,在 main select 中,我计算按运营商分组的用户数。
SELECT
operator,
count() AS cnt_user
FROM
(
SELECT
user_id,
operator,
count() AS cnt
FROM A
WHERE date >= '2019-06-01' AND date <= '2019-08-31'
AND active = 1
GROUP BY
user_id,
operator
HAVING cnt >= 30
)
GROUP BY operator
使用 user_id 和运算符进行分片的想法是将用户路由到不同的分片。这样,我希望完整的查询(select 和 subselect)可以在每个 shard/server 上独立地 运行,然后最终聚合将在小基数集上执行:运算符 -> 计数。
但是,当我 运行 在很长一段时间(几个月)内进行此查询时,Clickhouse 会抛出异常,告知已超出最大查询内存分配。如果我 运行 在本地 table 上进行相同的查询,则没有此类异常并返回结果。 Clickhouse 首先将来自 subselect 的所有记录合并到两个分片上,然后计算外部聚合。问题是如何重写查询 or/and 更改架构,以强制 Clickhouse 在本地执行两个聚合,然后在最后一步合并低基数聚合(通过运算符)?我希望在 user_id 和运算符上设置分片键会让 Clickhouse 自然地做到这一点,但事实似乎并非如此。
在每个分片
create view xxx as
SELECT
user_id,
operator,
count() AS cnt
FROM A_local
GROUP BY
user_id,
operator
HAVING cnt >= 30
create xxx_d Distributed(,xxx);
select ....
from xxx_d
WHERE date >= '2019-06-01' AND date <= '2019-08-31'
AND active = 1
GROUP BY operator
settings distributed_group_by_no_merge=1