用 spark SQL 中使用相同名称的计算列替换列
Replace a column with a calcutated column using the same name in spark SQL
我从我的数据湖中读取文件并将它们加载到数据框中
由于kafka中的转换问题,加载的数据有一些字段与源数据库中的类型不同(here)
因此,我使用错误的数据类型(二进制)从 S3 加载数据,并使用 UDF 函数将每一列转换为另一列
然后,我重命名新列以替换旧列,以在我的源数据库和目标数据库中保持相同的结构
步骤:
之前:
myTable
|
+-- myField1 (binary)
+-- myField2 (binary)
+-- myField3 (binary)
中间状态1(用UDF函数投射):
myTable
|
+-- myField1 (binary)
+-- myField1_new (numeric)
+-- myField2 (binary)
+-- myField2_new (numeric)
+-- myField3 (binary)
+-- myField3_new (numeric)
中间状态 2(删除旧列):
myTable
|
+-- myField1_new (numeric)
+-- myField2_new (numeric)
+-- myField3_new (numeric)
最终状态(重命名计算列):
myTable
|
+-- myField1 (numeric)
+-- myField1 (numeric)
+-- myField1 (numeric)
这是我使用的语法:
spark.sql('select *,
MyUDF(myfield1) myfield1_new,
MyUDF(myfield2) myfield2_new,
MyUDF(myfield3) myfield3_new
from my_table')
.drop('myfield1').withColumnRenamed('myfield1_new', 'myfield1')
.drop('myfield2').withColumnRenamed('myfield2_new', 'myfield2')
.drop('myfield3').withColumnRenamed('myfield3_new', 'myfield3')
.show(1, False)
我的问题是这个过程真的很慢,因为在实际生产中需要计算 439 个字段 table (439 !!!)
有没有更快的方法?即时重命名或其他什么?
感谢您的帮助
我看到了一个 这个问题的讨论帖。
将那个扩展为,假设你有 df
as
+--------------------+
| myfield|
+--------------------+
|[00, 8F, 2B, 9C, 80]|
| [52, F4, 92, 80]|
+--------------------+
EDIT: 由于myfield
列的格式为bytearray(b'\x00')
,转换方式如下(@Ftagn指出) .否则,如果它是一个字符串列表,则使用 commented return
.
def func(val):
return int.from_bytes(val, byteorder='big', signed=False) / 1000000
# return int("".join(val), 16)/1000000
func_udf = udf(lambda x: func(x), FloatType())
要创建输出,请使用
df = df.withColumn("myfield1", func_udf("myfield"))
这会产生,
+--------------------+--------+
| myfield|myfield1|
+--------------------+--------+
|[00, 8F, 2B, 9C, 80]| 2402.0|
| [52, F4, 92, 80]| 1391.76|
+--------------------+--------+
相反,如果您使用,
df = df.withColumn("myfield", func_udf("myfield"))
你明白了,
+-------+
|myfield|
+-------+
| 2402.0|
|1391.76|
+-------+
我从我的数据湖中读取文件并将它们加载到数据框中 由于kafka中的转换问题,加载的数据有一些字段与源数据库中的类型不同(here)
因此,我使用错误的数据类型(二进制)从 S3 加载数据,并使用 UDF 函数将每一列转换为另一列
然后,我重命名新列以替换旧列,以在我的源数据库和目标数据库中保持相同的结构
步骤:
之前:
myTable
|
+-- myField1 (binary)
+-- myField2 (binary)
+-- myField3 (binary)
中间状态1(用UDF函数投射):
myTable
|
+-- myField1 (binary)
+-- myField1_new (numeric)
+-- myField2 (binary)
+-- myField2_new (numeric)
+-- myField3 (binary)
+-- myField3_new (numeric)
中间状态 2(删除旧列):
myTable
|
+-- myField1_new (numeric)
+-- myField2_new (numeric)
+-- myField3_new (numeric)
最终状态(重命名计算列):
myTable
|
+-- myField1 (numeric)
+-- myField1 (numeric)
+-- myField1 (numeric)
这是我使用的语法:
spark.sql('select *,
MyUDF(myfield1) myfield1_new,
MyUDF(myfield2) myfield2_new,
MyUDF(myfield3) myfield3_new
from my_table')
.drop('myfield1').withColumnRenamed('myfield1_new', 'myfield1')
.drop('myfield2').withColumnRenamed('myfield2_new', 'myfield2')
.drop('myfield3').withColumnRenamed('myfield3_new', 'myfield3')
.show(1, False)
我的问题是这个过程真的很慢,因为在实际生产中需要计算 439 个字段 table (439 !!!)
有没有更快的方法?即时重命名或其他什么?
感谢您的帮助
我看到了一个
将那个扩展为,假设你有 df
as
+--------------------+
| myfield|
+--------------------+
|[00, 8F, 2B, 9C, 80]|
| [52, F4, 92, 80]|
+--------------------+
EDIT: 由于myfield
列的格式为bytearray(b'\x00')
,转换方式如下(@Ftagn指出) .否则,如果它是一个字符串列表,则使用 commented return
.
def func(val):
return int.from_bytes(val, byteorder='big', signed=False) / 1000000
# return int("".join(val), 16)/1000000
func_udf = udf(lambda x: func(x), FloatType())
要创建输出,请使用
df = df.withColumn("myfield1", func_udf("myfield"))
这会产生,
+--------------------+--------+
| myfield|myfield1|
+--------------------+--------+
|[00, 8F, 2B, 9C, 80]| 2402.0|
| [52, F4, 92, 80]| 1391.76|
+--------------------+--------+
相反,如果您使用,
df = df.withColumn("myfield", func_udf("myfield"))
你明白了,
+-------+
|myfield|
+-------+
| 2402.0|
|1391.76|
+-------+