将映射函数应用于 Spark 数据框中列的所有元素
Applying a map function to all elements of column in a Spark dataframe
我正在尝试将函数应用于 Scala 中 Spark 数据帧中列的所有元素。输入是一个看起来像“{count:10}”的字符串,我只想 return Int 部分——在这个例子中是 10。我可以在玩具示例中这样做:
val x = List("{\"count\": 107}", "{\"count\": 9}", "{\"count\": 456}")
val _list = x.map(x => x.substring(10,x.length-1).toInt)
但是当我尝试将 udf 应用于我的数据框时出现错误:
val getCounts: String => Int = _.substring(10,x.length-1).toInt
import org.apache.spark.sql.functions.udf
val myUDF = udf(getCounts)
df.withColumn("post_shares_int", myUDF('post_shares)).show
错误输出:
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2060)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions.apply(RDD.scala:707)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions.apply(RDD.scala:706)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:706)
at org.apache.spark.sql.execution.ConvertToSafe.doExecute(rowFormatConverters.scala:56)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:132)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:130)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:187)
at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:165)
at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174)
at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute.apply(DataFrame.scala:1499)
at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute.apply(DataFrame.scala:1499)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
....
任何有关如何执行此操作的帮助将不胜感激。
忘记自定义 UDF,已经有一个函数可用于该任务,即 regexp_extract
,已记录在案 here
df.withColumn(
"post_shares_int",
regexp_extract(df("post_shares"), '^{\w+:(\d+)}$', 1)
).show
根据下面的评论,最好使用 get_json_object
来解析 json 字符串
df.withColumn(
"post_shares_int",
get_json_object(df("post_shares"), '$.count')
).show
我正在尝试将函数应用于 Scala 中 Spark 数据帧中列的所有元素。输入是一个看起来像“{count:10}”的字符串,我只想 return Int 部分——在这个例子中是 10。我可以在玩具示例中这样做:
val x = List("{\"count\": 107}", "{\"count\": 9}", "{\"count\": 456}")
val _list = x.map(x => x.substring(10,x.length-1).toInt)
但是当我尝试将 udf 应用于我的数据框时出现错误:
val getCounts: String => Int = _.substring(10,x.length-1).toInt
import org.apache.spark.sql.functions.udf
val myUDF = udf(getCounts)
df.withColumn("post_shares_int", myUDF('post_shares)).show
错误输出:
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2060)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions.apply(RDD.scala:707)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions.apply(RDD.scala:706)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:706)
at org.apache.spark.sql.execution.ConvertToSafe.doExecute(rowFormatConverters.scala:56)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:132)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:130)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:187)
at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:165)
at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174)
at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute.apply(DataFrame.scala:1499)
at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute.apply(DataFrame.scala:1499)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
....
任何有关如何执行此操作的帮助将不胜感激。
忘记自定义 UDF,已经有一个函数可用于该任务,即 regexp_extract
,已记录在案 here
df.withColumn(
"post_shares_int",
regexp_extract(df("post_shares"), '^{\w+:(\d+)}$', 1)
).show
根据下面的评论,最好使用 get_json_object
来解析 json 字符串
df.withColumn(
"post_shares_int",
get_json_object(df("post_shares"), '$.count')
).show