如何在 PL/SQL 中合并两个相似的数据库架构?

How do I merge two similar database-schema in PL/SQL?

数据库模式(源和目标)非常大(每个都超过 350 table)。我的任务是以某种方式将这两个 table 合并为一个。必须迁移数据本身(table 中的内容)。我必须小心,在合并模式之前或期间没有主键的双重条目。有没有人已经这样做过并且能够为我提供他的解决方案或者任何人都可以帮助我找到完成任务的方法吗?我的方法都失败了,我的顾问只是告诉我在线获取帮助:/

我的做法: 我尝试使用“all_constraints”table 从我的数据库中获取所有 pks。

SELECT cols.table_name, cols.column_name, cols.position, cons.status, cons.owner
FROM all_constraints cons, all_cons_columns cols
WHERE cols.owner = 'DB'
AND cons.constraint_type = 'P'  
AND cons.constraint_name = cols.constraint_name
AND cons.owner = cols.owner
ORDER BY cols.table_name, cols.position;

我也“知道”主键必须有一个序列才能为其添加值:

CREATE SEQUENCE seq_pk_addition
MINVALUE 1
MAXVALUE 99999999999999999999
START WITH 1
INCREMENT BY 1
CACHE 20;

因为如果涉及到 pl/sql(或通常的 sql),我是菜鸟 那么how/what接下来我应该做什么? :/

Here is a link for an ERD of the database: https://ufile.io/9tdoj

virus scan: https://www.virustotal.com/#/file/dbe5f418115e50313a2268fb33a924cc8cb57a43bc85b3bbf5f6a571b184627e/detection

天啊!正常情况下,这样的问题会很快被关闭"too broad",但我们需要支持邪恶顾问的受害者!

至于工作量,我需要一个有经验的专家一周的全职时间,以及一个有经验的 QA 工程师的两天质量检查。

首先,如此复杂的数据合并不可能在第一次尝试时就成功。这意味着您将需要可以轻松重建的两种模式的测试副本。你需要一个地方来尝试一下。通常这是通过导出两个模式和一个空的开发数据库来完成的。

接下来,您需要两个架构足够接近以便能够比较数据。我会通过导入上面提到的导出文件来做到这一点。如果模式名称相同而不是在导入期间重命名一个。

接下来,我会仔细检查结构是否真的相同,查询如

 SELECT a.owner, a.table_name, b.owner, b.table_name
   FROM all_tables a 
   FULL JOIN all_tables b 
     ON a.table_name = b.table_name
    AND a.owner = 'SCHEMAA' 
    AND b.owner = 'SCHEMAB'
  WHERE a.owner IS NULL or b.owner IS NULL;

接下来,我将检查主键和唯一键是否重叠:

 SELECT id FROM schemaa.table1
 INTERSECT
 SELECT id FROM schemab.table1;

因为有 300 多个表,我会生成这些查询:

 DECLARE 
   stmt VARCHAR2(30000);
   n NUMBER;
   schema_a CONSTANT VARCHAR2(128 BYTE) := 'SCHEMAA';
   schema_b CONSTANT VARCHAR2(128 BYTE) := 'SCHEMAB';
 BEGIN
   FOR c IN (SELECT owner, constraint_name, table_name,
                    (SELECT LISTAGG(column_name,',') WITHIN GROUP (ORDER BY position)
                       FROM all_cons_columns c
                      WHERE s.owner = c.owner
                        AND s.constraint_name = c.constraint_name) AS cols
               FROM all_constraints s
              WHERE s.constraint_type IN ('P') 
                AND s.owner = schema_a) 
   LOOP
     dbms_output.put_line('Checking pk '||c.constraint_name||' on table '||c.table_name);
     stmt := 'SELECT count(*) FROM '||schema_a||'.'||c.table_name
          ||' JOIN '||schema_b||'.'||c.table_name
          || ' USING ('||c.cols||')';
     --dbms_output.put_line('Query '||stmt);
     EXECUTE IMMEDIATE stmt INTO n;
     dbms_output.put_line('Found '||n||' overlapping primary keys in table '||c.table_name);
   END LOOP;
 END;
 /

首先,对于 350 table 秒,很可能需要一个 dynamic SQL

  1. 声明 CURSORCOLLECTION - table of VARCHAR2 所有 table 个名称。
  2. 声明一个字符串变量来构建 dynamic SQL.
  3. loop遍历 table 名字的整个列表,并为每个 table 生成一个字符串,该字符串将作为 SQL 和 EXECUTE IMMEDIATE命令。
  4. 将要构建的动态 SQL 应该将源 table 中的值插入到目标 table 中。如果 PK 已经存在,在目标 table 中,应检查表示最后更新日期的字段,如果在源 table 中它大于目标 table,则执行更新目标 table,否则什么也不做。

正如承诺在我的评论中提供帮助,我准备了一个动态代码,您可以尝试使用源和目标 table 获取数据 merged。逻辑如下:

第 1 步:从 SOURCE 架构中获取所有 table 名称。在下面的查询中,您可能需要分别替换架构(所有者)名称。出于测试目的,我只使用了 1 table 所以当你 运行 它时,删除 table 名称过滤子句。

第 2 步:获取 table 的约束列名称。这用于准备稍后用于 MERGE 语句的 ON 子句。

第 3 步:获取 table 的非约束列名称。这将在 UPDATE 子句中使用 MERGE.

Step4:当数据不符合MERGE语句的ON条件时,准备insert列表。

阅读我的内联评论以了解每个步骤。

CREATE OR REPLACE PROCEDURE COPY_TABLE
AS
Type OBJ_NME is table of varchar2(100) index by pls_integer;

--To hold Table name
v_obj_nm OBJ_NME ;

--To hold Columns of table
v_col_nm OBJ_NME;

v_othr_col_nm OBJ_NME;
on_clause VARCHAR2(2000);
upd_clause VARCHAR2(4000);
cntr number:=0;
v_sql VARCHAR2(4000);

col_list1  VARCHAR2(4000);
col_list2  VARCHAR2(4000);
col_list3  VARCHAR2(4000);
col_list4  varchar2(4000);
col_list5  VARCHAR2(4000);
col_list6  VARCHAR2(4000);
col_list7  VARCHAR2(4000);
col_list8  varchar2(4000);

BEGIN

--Get Source table names
SELECT OBJECT_NAME
BULK COLLECT INTO v_obj_nm
FROM all_objects 
WHERE owner LIKE  'RU%' -- Replace `RU%` with your Source schema name here
AND object_type = 'TABLE'
and object_name ='TEST'; --remove this condition if you want this to run for all tables

FOR I IN 1..v_obj_nm.count
loop
--Columns with Constraints 
  SELECT column_name
  bulk collect into v_col_nm 
  FROM user_cons_columns
  WHERE table_name = v_obj_nm(i);  

--Columns without Constraints remain columns of table
SELECT *
BULK COLLECT INTO v_othr_col_nm
from (
      SELECT column_name 
      FROM user_tab_cols
      WHERE table_name = v_obj_nm(i)
      MINUS
      SELECT column_name  
      FROM user_cons_columns
      WHERE table_name = v_obj_nm(i));

--Prepare Update Clause  
 FOR l IN 1..v_othr_col_nm.count
  loop
   cntr:=cntr+1;
   upd_clause := 't1.'||v_othr_col_nm(l)||' = t2.' ||v_othr_col_nm(l);    
   upd_clause:=upd_clause ||' and ' ;

   col_list1:= 't1.'||v_othr_col_nm(l) ||',';
   col_list2:= col_list2||col_list1;   

   col_list5:= 't2.'||v_othr_col_nm(l) ||',';
   col_list6:= col_list6||col_list5;

   IF (cntr = v_othr_col_nm.count)
   THEN 
    --dbms_output.put_line('YES');
     upd_clause:=rtrim(upd_clause,' and');
     col_list2:=rtrim( col_list2,',');
     col_list6:=rtrim( col_list6,',');
   END IF;
     dbms_output.put_line(col_list2||col_list6); 
   --dbms_output.put_line(upd_clause);
   End loop;
  --Update caluse ends     

   cntr:=0; --Counter reset  

 --Prepare ON clause  
  FOR k IN 1..v_col_nm.count
  loop
   cntr:=cntr+1;
   --dbms_output.put_line(v_col_nm.count || cntr);
   on_clause := 't1.'||v_col_nm(k)||' = t2.' ||v_col_nm(k);    
   on_clause:=on_clause ||' and ' ;

   col_list3:= 't1.'||v_col_nm(k) ||',';
   col_list4:= col_list4||col_list3;    

   col_list7:= 't2.'||v_col_nm(k) ||',';
   col_list8:= col_list8||col_list7;    

   IF (cntr = v_col_nm.count)
   THEN 
    --dbms_output.put_line('YES');
    on_clause:=rtrim(on_clause,' and');
    col_list4:=rtrim( col_list4,',');
    col_list8:=rtrim( col_list8,',');
   end if;

   dbms_output.put_line(col_list4||col_list8);
 -- ON clause ends

 --Prepare merge Statement

    v_sql:= 'MERGE INTO '|| v_obj_nm(i)||' t1--put target schema name before v_obj_nm
              USING (SELECT * FROM '|| v_obj_nm(i)||') t2-- put source schema name befire v_obj_nm here 
              ON ('||on_clause||')
              WHEN MATCHED THEN
              UPDATE
              SET '||upd_clause ||              
              ' WHEN NOT MATCHED THEN
              INSERT  
              ('||col_list2||','
                ||col_list4||
              ')
              VALUES
              ('||col_list6||','
                ||col_list8||          
               ')';

      dbms_output.put_line(v_sql);   
      execute immediate v_sql;
  end loop;    
End loop;
END;
/

执行:

exec COPY_TABLE

输出:

anonymous block completed

PS:我已经用一个 table 测试了这个,其中有 2 列我有唯一的键约束。table 的 DDL 如下:

最后,我希望你能理解我的代码(你是一个菜鸟),如果上面的内容不能满足你的要求,你可以实现类似的东西。

 CREATE TABLE TEST
       (    COL2 NUMBER, 
            COLUMN1 VARCHAR2(20 BYTE), 
            CONSTRAINT TEST_UK1 UNIQUE (COLUMN1)  
       ) ;