如何在 ADF 中将一行转换为多列?
How to transform one row into multiple columns in ADF?
我有 TABLE 作为只有 1 行的源,就像在 Azure 数据工厂中有 336 列:
1
2
3
4
5
6
7
8
9
value1
value2
value3
value4
value5
value6
value7
value8
value9
并且想要将每 3 列合并为前 3 列:
1
2
3
value1
value2
value3
value4
value5
value6
value7
value8
value9
除了对每 3 列使用 Select 然后 Join 的替代方法是什么,因为它是处理这么多列的漫长过程?
如果您的数据源是 Azure SQL DB,您可以使用常规 SQL 结合 UNVPIVOT、PIVOT 和一些排名函数来转换行,以帮助对数据进行分组。一个简单的例子:
DROP TABLE IF EXISTS #tmp;
CREATE TABLE #tmp (
col1 VARCHAR(10),
col2 VARCHAR(10),
col3 VARCHAR(10),
col4 VARCHAR(10),
col5 VARCHAR(10),
col6 VARCHAR(10),
col7 VARCHAR(10),
col8 VARCHAR(10),
col9 VARCHAR(10)
);
INSERT INTO #tmp
VALUES ( 'value1', 'value2', 'value3', 'value4', 'value5', 'value6', 'value7', 'value8', 'value9' )
SELECT [1], [2], [0] AS [3]
FROM
(
SELECT
NTILE(3) OVER( ORDER BY ( SELECT NULL ) ) nt,
ROW_NUMBER() OVER( ORDER BY ( SELECT NULL ) ) % 3 groupNumber,
newCol
FROM #tmp
UNPIVOT ( newCol for sourceCol In ( col1, col2, col3, col4, col5, col6, col7, col8, col9 ) ) uvpt
) x
PIVOT ( MAX(newCol) For groupNumber In ( [1], [2], [0] ) ) pvt;
根据您拥有的列数调整 NTILE
值 - 它应该是您除以 3 的总列数。例如,如果您有 300 列,NTILE
值应为 100,如果您有 336 列,则应为 112。可以使用包含 336 列的更大示例 here。
将数据作为视图呈现给 Azure 数据工厂 (ADF),或者使用复制 activity 中的查询选项。
我的结果:
如果您使用的是 Azure Synapse Analytics,那么另一种有趣的方法是使用 Synapse Notebooks。只需三行代码,您就可以从专用的 SQL 池中获取 table,使用 stack
函数逆透视所有 336 列并将其写回数据库。这个简单的例子在 Scala 中:
val df = spark.read.synapsesql("someDb.dbo.pivotWorking")
val df2 = df.select( expr("stack(112, *)"))
// Write it back
df2.write.synapsesql("someDb.dbo.pivotWorking_after", Constants.INTERNAL)
不得不佩服它的简洁
我有 TABLE 作为只有 1 行的源,就像在 Azure 数据工厂中有 336 列:
1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 |
---|---|---|---|---|---|---|---|---|
value1 | value2 | value3 | value4 | value5 | value6 | value7 | value8 | value9 |
并且想要将每 3 列合并为前 3 列:
1 | 2 | 3 |
---|---|---|
value1 | value2 | value3 |
value4 | value5 | value6 |
value7 | value8 | value9 |
除了对每 3 列使用 Select 然后 Join 的替代方法是什么,因为它是处理这么多列的漫长过程?
如果您的数据源是 Azure SQL DB,您可以使用常规 SQL 结合 UNVPIVOT、PIVOT 和一些排名函数来转换行,以帮助对数据进行分组。一个简单的例子:
DROP TABLE IF EXISTS #tmp;
CREATE TABLE #tmp (
col1 VARCHAR(10),
col2 VARCHAR(10),
col3 VARCHAR(10),
col4 VARCHAR(10),
col5 VARCHAR(10),
col6 VARCHAR(10),
col7 VARCHAR(10),
col8 VARCHAR(10),
col9 VARCHAR(10)
);
INSERT INTO #tmp
VALUES ( 'value1', 'value2', 'value3', 'value4', 'value5', 'value6', 'value7', 'value8', 'value9' )
SELECT [1], [2], [0] AS [3]
FROM
(
SELECT
NTILE(3) OVER( ORDER BY ( SELECT NULL ) ) nt,
ROW_NUMBER() OVER( ORDER BY ( SELECT NULL ) ) % 3 groupNumber,
newCol
FROM #tmp
UNPIVOT ( newCol for sourceCol In ( col1, col2, col3, col4, col5, col6, col7, col8, col9 ) ) uvpt
) x
PIVOT ( MAX(newCol) For groupNumber In ( [1], [2], [0] ) ) pvt;
根据您拥有的列数调整 NTILE
值 - 它应该是您除以 3 的总列数。例如,如果您有 300 列,NTILE
值应为 100,如果您有 336 列,则应为 112。可以使用包含 336 列的更大示例 here。
将数据作为视图呈现给 Azure 数据工厂 (ADF),或者使用复制 activity 中的查询选项。
我的结果:
如果您使用的是 Azure Synapse Analytics,那么另一种有趣的方法是使用 Synapse Notebooks。只需三行代码,您就可以从专用的 SQL 池中获取 table,使用 stack
函数逆透视所有 336 列并将其写回数据库。这个简单的例子在 Scala 中:
val df = spark.read.synapsesql("someDb.dbo.pivotWorking")
val df2 = df.select( expr("stack(112, *)"))
// Write it back
df2.write.synapsesql("someDb.dbo.pivotWorking_after", Constants.INTERNAL)
不得不佩服它的简洁