Spark / Python:将多列传递给 UDF 时无法腌制“_thread.RLock”对象
Spark / Python : cannot pickle '_thread.RLock' object when passing multiple columns to UDF
使用 jupyter notebook,我有 2 个数据帧:
Apple 价格:
+--------------------+-------+---------+--------+
| Model_name| Price| Country|Currency|
+--------------------+-------+---------+--------+
| 24-inch iMac|1919.01|Australia| AUD|
| AirPods Max| 908.47|Australia| AUD|
| AirPods Pro| 403.2|Australia| AUD|
|AirPods(2nd gener...| 221.31|Australia| AUD|
|AirPods(3rd gener...| 281.94|Australia| AUD|
|Apple Pencil (2nd...| 201.1|Australia| AUD|
| Apple TV 4K| 251.62|Australia| AUD|
| Apple Watch SE| 433.52|Australia| AUD|
|Apple Watch Series 3| 302.15|Australia| AUD|
| MacBook Air| 1514.8|Australia| AUD|
| Magic Mouse| 110.15|Australia| AUD|
| Sport Band| 69.73|Australia| AUD|
| iPad| 504.26|Australia| AUD|
| iPad Pro|1211.64|Australia| AUD|
| iPhone 12|1009.53|Australia| AUD|
| iPhone 13|1211.64|Australia| AUD|
| iPhone SE| 686.16|Australia| AUD|
| 24-inch iMac|1597.97| Canada| CAD|
| AirPods Max| 778.5| Canada| CAD|
| AirPods Pro| 328.79| Canada| CAD|
+--------------------+-------+---------+--------+
货币转换:(比率 1$ = xx 货币)
+--------+--------------------+
|ISO_4217|Dollar_To_Curr_Ratio|
+--------+--------------------+
| EUR| 0.89|
| CAD| 1.27|
| CZK| 21.7|
| DKK| 6.59|
| HUF| 319.05|
| INR| 74.42|
| MXN| 20.5|
| NOK| 8.9|
| PHP| 51.22|
| PLN| 4.03|
| RUB| 76.28|
| SEK| 9.15|
| THB| 33.57|
| USD| 1.0|
| AUD| 1.4|
+--------+--------------------+
我的目标是将 ApplePrices 中包含的所有价格转换为美元。
所以我使用了 UDF :
def convertPriceToDollar(price, currency):
ratio = currencyConversion.select("Dollar_To_Curr_Ratio").where(col("ISO_4217") == currency)
return price * ratio
toDollarConverter = udf(convertPriceToDollar)
和 select 通过传递 2 列来调用函数:
applePrices.select(toDollarConverter('Price', 'Currency').alias("Price", "truc")).show()
但是我得到了这个错误:PicklingError: Could not serialize object: TypeError: cannot pickle '_thread.RLock' object
如果我像这样将一列传递给函数就可以正常工作:
def convertPriceToDollar(price):
return price * 1.27
toDollarConverter = udf(convertPriceToDollar)
applePrices.select(toDollarConverter('Price').alias("Price")).show()
结果:
+------------------+
| Price|
+------------------+
| 2437.1427|
| 1153.7569|
| 512.064|
| 281.0637|
| 358.0638|
| 255.397|
|319.55740000000003|
| 550.5704|
|383.73049999999995|
| 1923.796|
| 139.8905|
| 88.5571|
| 640.4102|
|1538.7828000000002|
| 1282.1031|
|1538.7828000000002|
| 871.4232|
| 2029.4219|
| 988.695|
| 417.5633|
+------------------+
需要使用udfs吗?
示例数据集:
ApplePrices = spark.createDataFrame(
[
('24-inch iMac','1919.01','Australia','AUD')
,('AirPods Max','908.47','Australia','AUD')
,('24-inch iMac','1597.97','Canada','CAD')
], ['Model_name', 'Price', 'Country', 'Currency']
)
currencyConversion = spark.createDataFrame(
[
('AUD','1.4')
,('CAD','1.27')
], ['ISO_4217', 'Dollar_To_Curr_Ratio']
)
使用 Spark DF:
from pyspark.sql.functions import *
ApplePrices_mod = ApplePrices\
.join(currencyConversion.withColumnRenamed('ISO_4217', 'Currency'), 'Currency', 'left')\
.withColumn('Price_USD',round(col('Price')/col('Dollar_To_Curr_Ratio'),2))\
.select('Currency','Model_name','Price_USD','Country')
ApplePrices_mod.show()
输出:
+--------+------------+---------+---------+
|Currency| Model_name|Price_USD| Country|
+--------+------------+---------+---------+
| AUD|24-inch iMac| 1370.72|Australia|
| AUD| AirPods Max| 648.91|Australia|
| CAD|24-inch iMac| 1258.24| Canada|
+--------+------------+---------+---------+
使用 jupyter notebook,我有 2 个数据帧:
Apple 价格:
+--------------------+-------+---------+--------+
| Model_name| Price| Country|Currency|
+--------------------+-------+---------+--------+
| 24-inch iMac|1919.01|Australia| AUD|
| AirPods Max| 908.47|Australia| AUD|
| AirPods Pro| 403.2|Australia| AUD|
|AirPods(2nd gener...| 221.31|Australia| AUD|
|AirPods(3rd gener...| 281.94|Australia| AUD|
|Apple Pencil (2nd...| 201.1|Australia| AUD|
| Apple TV 4K| 251.62|Australia| AUD|
| Apple Watch SE| 433.52|Australia| AUD|
|Apple Watch Series 3| 302.15|Australia| AUD|
| MacBook Air| 1514.8|Australia| AUD|
| Magic Mouse| 110.15|Australia| AUD|
| Sport Band| 69.73|Australia| AUD|
| iPad| 504.26|Australia| AUD|
| iPad Pro|1211.64|Australia| AUD|
| iPhone 12|1009.53|Australia| AUD|
| iPhone 13|1211.64|Australia| AUD|
| iPhone SE| 686.16|Australia| AUD|
| 24-inch iMac|1597.97| Canada| CAD|
| AirPods Max| 778.5| Canada| CAD|
| AirPods Pro| 328.79| Canada| CAD|
+--------------------+-------+---------+--------+
货币转换:(比率 1$ = xx 货币)
+--------+--------------------+
|ISO_4217|Dollar_To_Curr_Ratio|
+--------+--------------------+
| EUR| 0.89|
| CAD| 1.27|
| CZK| 21.7|
| DKK| 6.59|
| HUF| 319.05|
| INR| 74.42|
| MXN| 20.5|
| NOK| 8.9|
| PHP| 51.22|
| PLN| 4.03|
| RUB| 76.28|
| SEK| 9.15|
| THB| 33.57|
| USD| 1.0|
| AUD| 1.4|
+--------+--------------------+
我的目标是将 ApplePrices 中包含的所有价格转换为美元。 所以我使用了 UDF :
def convertPriceToDollar(price, currency):
ratio = currencyConversion.select("Dollar_To_Curr_Ratio").where(col("ISO_4217") == currency)
return price * ratio
toDollarConverter = udf(convertPriceToDollar)
和 select 通过传递 2 列来调用函数:
applePrices.select(toDollarConverter('Price', 'Currency').alias("Price", "truc")).show()
但是我得到了这个错误:PicklingError: Could not serialize object: TypeError: cannot pickle '_thread.RLock' object
如果我像这样将一列传递给函数就可以正常工作:
def convertPriceToDollar(price):
return price * 1.27
toDollarConverter = udf(convertPriceToDollar)
applePrices.select(toDollarConverter('Price').alias("Price")).show()
结果:
+------------------+
| Price|
+------------------+
| 2437.1427|
| 1153.7569|
| 512.064|
| 281.0637|
| 358.0638|
| 255.397|
|319.55740000000003|
| 550.5704|
|383.73049999999995|
| 1923.796|
| 139.8905|
| 88.5571|
| 640.4102|
|1538.7828000000002|
| 1282.1031|
|1538.7828000000002|
| 871.4232|
| 2029.4219|
| 988.695|
| 417.5633|
+------------------+
需要使用udfs吗?
示例数据集:
ApplePrices = spark.createDataFrame(
[
('24-inch iMac','1919.01','Australia','AUD')
,('AirPods Max','908.47','Australia','AUD')
,('24-inch iMac','1597.97','Canada','CAD')
], ['Model_name', 'Price', 'Country', 'Currency']
)
currencyConversion = spark.createDataFrame(
[
('AUD','1.4')
,('CAD','1.27')
], ['ISO_4217', 'Dollar_To_Curr_Ratio']
)
使用 Spark DF:
from pyspark.sql.functions import *
ApplePrices_mod = ApplePrices\
.join(currencyConversion.withColumnRenamed('ISO_4217', 'Currency'), 'Currency', 'left')\
.withColumn('Price_USD',round(col('Price')/col('Dollar_To_Curr_Ratio'),2))\
.select('Currency','Model_name','Price_USD','Country')
ApplePrices_mod.show()
输出:
+--------+------------+---------+---------+
|Currency| Model_name|Price_USD| Country|
+--------+------------+---------+---------+
| AUD|24-inch iMac| 1370.72|Australia|
| AUD| AirPods Max| 648.91|Australia|
| CAD|24-inch iMac| 1258.24| Canada|
+--------+------------+---------+---------+