如何在更新链结束时触发触发器?
How can I fire a trigger at the end of a chain of updates?
我有几个 table 使用触发器相互交互,而我目前处理触发器执行的方式使用 pg_trigger_depth() < 2
,这很丑陋。
我真的希望最终触发器 运行 只触发一次,并且在每行的所有内容都发生之后结束。不幸的是,CONSTRAINT TRIGGER
只是 FOR EACH ROW
,并且 FOR STATEMENT
触发器实际上在触发器中的每个语句触发一次,而不是每个启动它的初始语句触发一次。
我已经查看了围绕该主题的其他几个 SO 问题,但没有找到与我正在做的事情足够相似的东西。
设置如下:
CREATE TABLE report(
report_tk SERIAL PRIMARY KEY,
report_id UUID NOT NULL,
report_name TEXT NOT NULL,
report_data INT NOT NULL,
report_subscribers TEXT[] NOT NULL DEFAULT ARRAY[]::TEXT[],
valid_range TSTZRANGE NOT NULL DEFAULT '(,)',
EXCLUDE USING GIST ((report_id :: TEXT) WITH =, report_name WITH =, valid_range WITH &&)
);
CREATE TABLE report_subscriber(
report_id INT NOT NULL REFERENCES report ON DELETE CASCADE;
subscriber_name TEXT NOT NULL,
needs_sync BOOLEAN NOT NULL DEFAULT TRUE,
EXCLUDE USING GIST (subscriber_name WITH =, valid_range WITH &&)
);
CREATE OR REPLACE FUNCTION sync_subscribers_to_report()
RETURNS TRIGGER LANGUAGE plpgsql SET SEARCH_PATH TO dwh, public AS $$
BEGIN
RAISE INFO 'Running sync to report trigger';
BEGIN
CREATE TEMPORARY TABLE lock_sync_subscribers_to_report(
) ON COMMIT DROP;
RAISE INFO 'syncing to report, stack depth is: %', pg_trigger_depth();
UPDATE report r
SET report_subscribers = x.subscribers
FROM (
SELECT
report_tk
, array_agg(DISTINCT u.subscriber_name ORDER BY u.subscriber_name) AS subscribers
FROM report_subscriber s
WHERE s.report_tk IN (
SELECT DISTINCT report_tk
FROM report_subscriber s2
WHERE s.needs_sync
)
GROUP BY s.report_tk
) x
WHERE r.report_tk = x.report_tk;
RAISE INFO 'turning off sync flag, stack depth is: %', pg_trigger_depth();
UPDATE report_subscriber
SET needs_sync = FALSE
WHERE needs_sync = TRUE;
RETURN NULL;
EXCEPTION WHEN DUPLICATE_TABLE THEN
RAISE INFO 'skipping recursive call, stack depth is: %', pg_trigger_depth();
RETURN NULL;
END;
END;
$$;
CREATE TRIGGER sync_subscribers_to_report
AFTER INSERT OR UPDATE OR DELETE
ON report_subscriber
FOR STATEMENT
EXECUTE PROCEDURE sync_subscribers_to_report();
因此,通过此设置,我希望能够:
- 插入一条报告记录
- 保证报告名称在任何一个时间点只能存在一次(valid_range 上的 EXCLUDE)
- 在订阅者中插入一个报表订阅者table
- 保证一个订阅者不能一次订阅多个报告。
- 允许多人订阅一份报告。
- 每当将记录添加到订阅者 table 时,将名称添加到报告中的订阅者列表中 table。
- 每当从订阅者 table 中删除一条记录时,从报告中的订阅者列表中删除该名称 table。
- 每当从报告 table 中删除一条记录时,删除相应的订户记录(由
ON DELETE CASCADE
如果在单个语句中对订阅者 table 进行了大量编辑(常见情况),最好只 运行 一个简单的查询来更新报告 table 使用来自订户 table.
的新记录和剩余记录的聚合
我最初的解决方案涉及向订阅者 table 添加一个 needs_update
标志并触发该标志以进行更新,然后关闭该标志。当然,这会导致我用 pg_trigger_depth() < 2
停止的触发器再次触发(2 是因为插入可能是由系统中的其他触发器引起的)。
除了丑陋之外,触发器函数中的语句导致更多 FOR EACH STATEMENT
次触发也很烦人。
我尝试了一个不同版本的标志,使用了我在其他 SO 答案()中看到的一个技巧,即创建一个临时 table 并捕获一个骗子 table异常以防止进一步执行。不过,我认为它并没有真正改善这个问题。
有没有办法以干净的方式做我想做的事情?虽然这是一个明显的玩具示例,但我的实际应用程序确实需要构建数据的 "packed array" 表示,并且以高效的方式这样做会很棒。
与其在 report_subscriber
本身中使用标志,我认为您最好使用单独的待处理更改队列。这有一些好处:
- 无触发递归
- 在幕后,
UPDATE
只是 DELETE
+ re-INSERT
,所以插入队列实际上比翻转标志更便宜
- 可能便宜很多,因为您只需要对不同的
report_id
进行排队,而不是克隆整个 report_subscriber
记录,并且您可以在临时 table 中完成, 所以存储是连续的,不需要同步到磁盘
- 翻转标志时无需担心竞争条件,因为队列是当前事务的本地队列(在您的实现中,受
UPDATE report_subscriber
影响的记录不一定与您在SELECT
...)
所以,初始化队列table:
CREATE FUNCTION create_queue_table() RETURNS TRIGGER LANGUAGE plpgsql AS $$
BEGIN
CREATE TEMP TABLE pending_subscriber_changes(report_id INT UNIQUE) ON COMMIT DROP;
RETURN NULL;
END
$$;
CREATE TRIGGER create_queue_table_if_not_exists
BEFORE INSERT OR UPDATE OF report_id, subscriber_name OR DELETE
ON report_subscriber
FOR EACH STATEMENT
WHEN (to_regclass('pending_subscriber_changes') IS NULL)
EXECUTE PROCEDURE create_queue_table();
...在更改到达时排队,忽略任何已经排队的内容:
CREATE FUNCTION queue_subscriber_change() RETURNS TRIGGER LANGUAGE plpgsql AS $$
BEGIN
IF TG_OP IN ('DELETE', 'UPDATE') THEN
INSERT INTO pending_subscriber_changes (report_id) VALUES (old.report_id)
ON CONFLICT DO NOTHING;
END IF;
IF TG_OP IN ('INSERT', 'UPDATE') THEN
INSERT INTO pending_subscriber_changes (report_id) VALUES (new.report_id)
ON CONFLICT DO NOTHING;
END IF;
RETURN NULL;
END
$$;
CREATE TRIGGER queue_subscriber_change
AFTER INSERT OR UPDATE OF report_id, subscriber_name OR DELETE
ON report_subscriber
FOR EACH ROW
EXECUTE PROCEDURE queue_subscriber_change();
...并在语句末尾处理队列:
CREATE FUNCTION process_pending_changes() RETURNS TRIGGER LANGUAGE plpgsql AS $$
BEGIN
UPDATE report
SET report_subscribers = ARRAY(
SELECT DISTINCT subscriber_name
FROM report_subscriber s
WHERE s.report_id = report.report_id
ORDER BY subscriber_name
)
FROM pending_subscriber_changes c
WHERE report.report_id = c.report_id;
DROP TABLE pending_subscriber_changes;
RETURN NULL;
END
$$;
CREATE TRIGGER process_pending_changes
AFTER INSERT OR UPDATE OF report_id, subscriber_name OR DELETE
ON report_subscriber
FOR EACH STATEMENT
EXECUTE PROCEDURE process_pending_changes();
这有一个小问题:UPDATE
不提供有关更新顺序的任何保证。这意味着,如果这两个语句同时 运行:
INSERT INTO report_subscriber (report_id, subscriber_name) VALUES (1, 'a'), (2, 'b');
INSERT INTO report_subscriber (report_id, subscriber_name) VALUES (2, 'x'), (1, 'y');
...如果他们试图以相反的顺序更新 report
记录,则可能会出现死锁。您可以通过对所有更新强制执行一致的顺序来避免这种情况,但不幸的是,无法将 ORDER BY
附加到 UPDATE
语句;我认为你需要求助于游标:
CREATE FUNCTION process_pending_changes() RETURNS TRIGGER LANGUAGE plpgsql AS $$
DECLARE
target_report CURSOR FOR
SELECT report_id
FROM report
WHERE report_id IN (TABLE pending_subscriber_changes)
ORDER BY report_id
FOR NO KEY UPDATE;
BEGIN
FOR target_record IN target_report LOOP
UPDATE report
SET report_subscribers = ARRAY(
SELECT DISTINCT subscriber_name
FROM report_subscriber
WHERE report_id = target_record.report_id
ORDER BY subscriber_name
)
WHERE CURRENT OF target_report;
END LOOP;
DROP TABLE pending_subscriber_changes;
RETURN NULL;
END
$$;
如果客户端试图在同一个事务中 运行 多个语句,这仍然有可能会死锁(因为更新排序仅在每个语句中应用,但更新锁一直保持到提交)。您可以通过在事务结束时仅触发一次 process_pending_changes()
来解决这个问题(缺点是,在该事务中,您不会看到自己的更改反映在 report_subscribers
数组)。
这里有一个 "on commit" 触发器的通用大纲,如果您认为值得麻烦填写的话:
CREATE FUNCTION run_on_commit() RETURNS TRIGGER LANGUAGE plpgsql AS $$
BEGIN
<your code goes here>
RETURN NULL;
END
$$;
CREATE FUNCTION trigger_already_fired() RETURNS BOOLEAN LANGUAGE plpgsql VOLATILE AS $$
DECLARE
already_fired BOOLEAN;
BEGIN
already_fired := NULLIF(current_setting('my_vars.trigger_already_fired', TRUE), '');
IF already_fired IS TRUE THEN
RETURN TRUE;
ELSE
SET LOCAL my_vars.trigger_already_fired = TRUE;
RETURN FALSE;
END IF;
END
$$;
CREATE CONSTRAINT TRIGGER my_trigger
AFTER INSERT OR UPDATE OR DELETE ON my_table
DEFERRABLE INITIALLY DEFERRED
FOR EACH ROW
WHEN (NOT trigger_already_fired())
EXECUTE PROCEDURE run_on_commit();
我有几个 table 使用触发器相互交互,而我目前处理触发器执行的方式使用 pg_trigger_depth() < 2
,这很丑陋。
我真的希望最终触发器 运行 只触发一次,并且在每行的所有内容都发生之后结束。不幸的是,CONSTRAINT TRIGGER
只是 FOR EACH ROW
,并且 FOR STATEMENT
触发器实际上在触发器中的每个语句触发一次,而不是每个启动它的初始语句触发一次。
我已经查看了围绕该主题的其他几个 SO 问题,但没有找到与我正在做的事情足够相似的东西。
设置如下:
CREATE TABLE report(
report_tk SERIAL PRIMARY KEY,
report_id UUID NOT NULL,
report_name TEXT NOT NULL,
report_data INT NOT NULL,
report_subscribers TEXT[] NOT NULL DEFAULT ARRAY[]::TEXT[],
valid_range TSTZRANGE NOT NULL DEFAULT '(,)',
EXCLUDE USING GIST ((report_id :: TEXT) WITH =, report_name WITH =, valid_range WITH &&)
);
CREATE TABLE report_subscriber(
report_id INT NOT NULL REFERENCES report ON DELETE CASCADE;
subscriber_name TEXT NOT NULL,
needs_sync BOOLEAN NOT NULL DEFAULT TRUE,
EXCLUDE USING GIST (subscriber_name WITH =, valid_range WITH &&)
);
CREATE OR REPLACE FUNCTION sync_subscribers_to_report()
RETURNS TRIGGER LANGUAGE plpgsql SET SEARCH_PATH TO dwh, public AS $$
BEGIN
RAISE INFO 'Running sync to report trigger';
BEGIN
CREATE TEMPORARY TABLE lock_sync_subscribers_to_report(
) ON COMMIT DROP;
RAISE INFO 'syncing to report, stack depth is: %', pg_trigger_depth();
UPDATE report r
SET report_subscribers = x.subscribers
FROM (
SELECT
report_tk
, array_agg(DISTINCT u.subscriber_name ORDER BY u.subscriber_name) AS subscribers
FROM report_subscriber s
WHERE s.report_tk IN (
SELECT DISTINCT report_tk
FROM report_subscriber s2
WHERE s.needs_sync
)
GROUP BY s.report_tk
) x
WHERE r.report_tk = x.report_tk;
RAISE INFO 'turning off sync flag, stack depth is: %', pg_trigger_depth();
UPDATE report_subscriber
SET needs_sync = FALSE
WHERE needs_sync = TRUE;
RETURN NULL;
EXCEPTION WHEN DUPLICATE_TABLE THEN
RAISE INFO 'skipping recursive call, stack depth is: %', pg_trigger_depth();
RETURN NULL;
END;
END;
$$;
CREATE TRIGGER sync_subscribers_to_report
AFTER INSERT OR UPDATE OR DELETE
ON report_subscriber
FOR STATEMENT
EXECUTE PROCEDURE sync_subscribers_to_report();
因此,通过此设置,我希望能够:
- 插入一条报告记录
- 保证报告名称在任何一个时间点只能存在一次(valid_range 上的 EXCLUDE)
- 在订阅者中插入一个报表订阅者table
- 保证一个订阅者不能一次订阅多个报告。
- 允许多人订阅一份报告。
- 每当将记录添加到订阅者 table 时,将名称添加到报告中的订阅者列表中 table。
- 每当从订阅者 table 中删除一条记录时,从报告中的订阅者列表中删除该名称 table。
- 每当从报告 table 中删除一条记录时,删除相应的订户记录(由
ON DELETE CASCADE
如果在单个语句中对订阅者 table 进行了大量编辑(常见情况),最好只 运行 一个简单的查询来更新报告 table 使用来自订户 table.
的新记录和剩余记录的聚合我最初的解决方案涉及向订阅者 table 添加一个 needs_update
标志并触发该标志以进行更新,然后关闭该标志。当然,这会导致我用 pg_trigger_depth() < 2
停止的触发器再次触发(2 是因为插入可能是由系统中的其他触发器引起的)。
除了丑陋之外,触发器函数中的语句导致更多 FOR EACH STATEMENT
次触发也很烦人。
我尝试了一个不同版本的标志,使用了我在其他 SO 答案()中看到的一个技巧,即创建一个临时 table 并捕获一个骗子 table异常以防止进一步执行。不过,我认为它并没有真正改善这个问题。
有没有办法以干净的方式做我想做的事情?虽然这是一个明显的玩具示例,但我的实际应用程序确实需要构建数据的 "packed array" 表示,并且以高效的方式这样做会很棒。
与其在 report_subscriber
本身中使用标志,我认为您最好使用单独的待处理更改队列。这有一些好处:
- 无触发递归
- 在幕后,
UPDATE
只是DELETE
+ re-INSERT
,所以插入队列实际上比翻转标志更便宜 - 可能便宜很多,因为您只需要对不同的
report_id
进行排队,而不是克隆整个report_subscriber
记录,并且您可以在临时 table 中完成, 所以存储是连续的,不需要同步到磁盘 - 翻转标志时无需担心竞争条件,因为队列是当前事务的本地队列(在您的实现中,受
UPDATE report_subscriber
影响的记录不一定与您在SELECT
...)
所以,初始化队列table:
CREATE FUNCTION create_queue_table() RETURNS TRIGGER LANGUAGE plpgsql AS $$
BEGIN
CREATE TEMP TABLE pending_subscriber_changes(report_id INT UNIQUE) ON COMMIT DROP;
RETURN NULL;
END
$$;
CREATE TRIGGER create_queue_table_if_not_exists
BEFORE INSERT OR UPDATE OF report_id, subscriber_name OR DELETE
ON report_subscriber
FOR EACH STATEMENT
WHEN (to_regclass('pending_subscriber_changes') IS NULL)
EXECUTE PROCEDURE create_queue_table();
...在更改到达时排队,忽略任何已经排队的内容:
CREATE FUNCTION queue_subscriber_change() RETURNS TRIGGER LANGUAGE plpgsql AS $$
BEGIN
IF TG_OP IN ('DELETE', 'UPDATE') THEN
INSERT INTO pending_subscriber_changes (report_id) VALUES (old.report_id)
ON CONFLICT DO NOTHING;
END IF;
IF TG_OP IN ('INSERT', 'UPDATE') THEN
INSERT INTO pending_subscriber_changes (report_id) VALUES (new.report_id)
ON CONFLICT DO NOTHING;
END IF;
RETURN NULL;
END
$$;
CREATE TRIGGER queue_subscriber_change
AFTER INSERT OR UPDATE OF report_id, subscriber_name OR DELETE
ON report_subscriber
FOR EACH ROW
EXECUTE PROCEDURE queue_subscriber_change();
...并在语句末尾处理队列:
CREATE FUNCTION process_pending_changes() RETURNS TRIGGER LANGUAGE plpgsql AS $$
BEGIN
UPDATE report
SET report_subscribers = ARRAY(
SELECT DISTINCT subscriber_name
FROM report_subscriber s
WHERE s.report_id = report.report_id
ORDER BY subscriber_name
)
FROM pending_subscriber_changes c
WHERE report.report_id = c.report_id;
DROP TABLE pending_subscriber_changes;
RETURN NULL;
END
$$;
CREATE TRIGGER process_pending_changes
AFTER INSERT OR UPDATE OF report_id, subscriber_name OR DELETE
ON report_subscriber
FOR EACH STATEMENT
EXECUTE PROCEDURE process_pending_changes();
这有一个小问题:UPDATE
不提供有关更新顺序的任何保证。这意味着,如果这两个语句同时 运行:
INSERT INTO report_subscriber (report_id, subscriber_name) VALUES (1, 'a'), (2, 'b');
INSERT INTO report_subscriber (report_id, subscriber_name) VALUES (2, 'x'), (1, 'y');
...如果他们试图以相反的顺序更新 report
记录,则可能会出现死锁。您可以通过对所有更新强制执行一致的顺序来避免这种情况,但不幸的是,无法将 ORDER BY
附加到 UPDATE
语句;我认为你需要求助于游标:
CREATE FUNCTION process_pending_changes() RETURNS TRIGGER LANGUAGE plpgsql AS $$
DECLARE
target_report CURSOR FOR
SELECT report_id
FROM report
WHERE report_id IN (TABLE pending_subscriber_changes)
ORDER BY report_id
FOR NO KEY UPDATE;
BEGIN
FOR target_record IN target_report LOOP
UPDATE report
SET report_subscribers = ARRAY(
SELECT DISTINCT subscriber_name
FROM report_subscriber
WHERE report_id = target_record.report_id
ORDER BY subscriber_name
)
WHERE CURRENT OF target_report;
END LOOP;
DROP TABLE pending_subscriber_changes;
RETURN NULL;
END
$$;
如果客户端试图在同一个事务中 运行 多个语句,这仍然有可能会死锁(因为更新排序仅在每个语句中应用,但更新锁一直保持到提交)。您可以通过在事务结束时仅触发一次 process_pending_changes()
来解决这个问题(缺点是,在该事务中,您不会看到自己的更改反映在 report_subscribers
数组)。
这里有一个 "on commit" 触发器的通用大纲,如果您认为值得麻烦填写的话:
CREATE FUNCTION run_on_commit() RETURNS TRIGGER LANGUAGE plpgsql AS $$
BEGIN
<your code goes here>
RETURN NULL;
END
$$;
CREATE FUNCTION trigger_already_fired() RETURNS BOOLEAN LANGUAGE plpgsql VOLATILE AS $$
DECLARE
already_fired BOOLEAN;
BEGIN
already_fired := NULLIF(current_setting('my_vars.trigger_already_fired', TRUE), '');
IF already_fired IS TRUE THEN
RETURN TRUE;
ELSE
SET LOCAL my_vars.trigger_already_fired = TRUE;
RETURN FALSE;
END IF;
END
$$;
CREATE CONSTRAINT TRIGGER my_trigger
AFTER INSERT OR UPDATE OR DELETE ON my_table
DEFERRABLE INITIALLY DEFERRED
FOR EACH ROW
WHEN (NOT trigger_already_fired())
EXECUTE PROCEDURE run_on_commit();