将现有函数用作 UDF 修改 Spark Dataframe 列时出错
Error when existing function is used as UDF to modify a Spark Dataframe Column
我有一个数据框,其中有一列包含纯文本的字符串类型,我想使用 pyspark.sql.functions.udf
(或 pyspark.sql.functions.UserDefinedFunction
?)修改此列。
我在 OSX 10.11.4 上使用 Python 2.7、Pyspark 1.6.1 和 Flask 0.10.1。
当我使用 lambda 表达式时似乎工作正常:
@spark.route('/')
def run():
df = ... # my dataframe
myUDF = udf(lambda r: len(r), IntegerType())
df = df.withColumn('new_'+column, myUDF(df[column]))
return render_template('index.html', data=df.take(1000))
一旦我尝试将 lambda 表达式移动到命名函数中:
def my_function(x):
return len(x)
@spark.route('/')
def run():
df = ... # my dataframe
myUDF = udf(my_function, IntegerType())
df = df.withColumn('new_'+column, myUDF(df[column]))
return render_template('index.html', data=df.take(1000))
我收到以下错误:
Py4JJavaError: An error occurred while calling z:org.apache.spark.sql.execution.EvaluatePython.takeAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 98, in main
command = pickleSer._read_with_length(infile)
File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
return self.loads(obj)
File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 422, in loads
return pickle.loads(obj)
File "app/__init__.py", line 19, in <module>
from app.controllers.main import main
File "app/controllers/main/__init__.py", line 5, in <module>
import default, source
File "app/controllers/main/default.py", line 3, in <module>
from app.controllers.main.source import file
File "app/controllers/main/source/__init__.py", line 2, in <module>
import file, online, database
File "app/controllers/main/source/database.py", line 1, in <module>
from app.controllers.spark import sqlContext
File "app/controllers/spark/__init__.py", line 18, in <module>
import default, grid #, pivot
File "app/controllers/spark/default.py", line 2, in <module>
from app.controllers.spark import spark, sc, sqlContext, grid as gridController
File "app/controllers/spark/grid.py", line 14, in <module>
from pyspark.ml import Pipeline
File "/opt/spark/python/lib/pyspark.zip/pyspark/ml/__init__.py", line 18, in <module>
File "/opt/spark/python/lib/pyspark.zip/pyspark/ml/pipeline.py", line 23, in <module>
File "/opt/spark/python/lib/pyspark.zip/pyspark/mllib/__init__.py", line 25, in <module>
ImportError: No module named numpy
at org.apache.spark.api.python.PythonRunner$$anon.read(PythonRDD.scala:166)
at org.apache.spark.api.python.PythonRunner$$anon.<init>(PythonRDD.scala:207)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
at org.apache.spark.sql.execution.BatchPythonEvaluation$$anonfun$doExecute.apply(python.scala:398)
at org.apache.spark.sql.execution.BatchPythonEvaluation$$anonfun$doExecute.apply(python.scala:363)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$$anonfun$apply.apply(RDD.scala:710)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$$anonfun$apply.apply(RDD.scala:710)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Numpy 已安装。删除 mllib 导入并没有解决问题。
如果在 'run' 函数体内声明 'my_function' 的所有函数体,它通常会起作用。
否则我还没有找到如何像你的情况一样调用外部函数。
我有一个数据框,其中有一列包含纯文本的字符串类型,我想使用 pyspark.sql.functions.udf
(或 pyspark.sql.functions.UserDefinedFunction
?)修改此列。
我在 OSX 10.11.4 上使用 Python 2.7、Pyspark 1.6.1 和 Flask 0.10.1。
当我使用 lambda 表达式时似乎工作正常:
@spark.route('/')
def run():
df = ... # my dataframe
myUDF = udf(lambda r: len(r), IntegerType())
df = df.withColumn('new_'+column, myUDF(df[column]))
return render_template('index.html', data=df.take(1000))
一旦我尝试将 lambda 表达式移动到命名函数中:
def my_function(x):
return len(x)
@spark.route('/')
def run():
df = ... # my dataframe
myUDF = udf(my_function, IntegerType())
df = df.withColumn('new_'+column, myUDF(df[column]))
return render_template('index.html', data=df.take(1000))
我收到以下错误:
Py4JJavaError: An error occurred while calling z:org.apache.spark.sql.execution.EvaluatePython.takeAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 98, in main
command = pickleSer._read_with_length(infile)
File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
return self.loads(obj)
File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 422, in loads
return pickle.loads(obj)
File "app/__init__.py", line 19, in <module>
from app.controllers.main import main
File "app/controllers/main/__init__.py", line 5, in <module>
import default, source
File "app/controllers/main/default.py", line 3, in <module>
from app.controllers.main.source import file
File "app/controllers/main/source/__init__.py", line 2, in <module>
import file, online, database
File "app/controllers/main/source/database.py", line 1, in <module>
from app.controllers.spark import sqlContext
File "app/controllers/spark/__init__.py", line 18, in <module>
import default, grid #, pivot
File "app/controllers/spark/default.py", line 2, in <module>
from app.controllers.spark import spark, sc, sqlContext, grid as gridController
File "app/controllers/spark/grid.py", line 14, in <module>
from pyspark.ml import Pipeline
File "/opt/spark/python/lib/pyspark.zip/pyspark/ml/__init__.py", line 18, in <module>
File "/opt/spark/python/lib/pyspark.zip/pyspark/ml/pipeline.py", line 23, in <module>
File "/opt/spark/python/lib/pyspark.zip/pyspark/mllib/__init__.py", line 25, in <module>
ImportError: No module named numpy
at org.apache.spark.api.python.PythonRunner$$anon.read(PythonRDD.scala:166)
at org.apache.spark.api.python.PythonRunner$$anon.<init>(PythonRDD.scala:207)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
at org.apache.spark.sql.execution.BatchPythonEvaluation$$anonfun$doExecute.apply(python.scala:398)
at org.apache.spark.sql.execution.BatchPythonEvaluation$$anonfun$doExecute.apply(python.scala:363)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$$anonfun$apply.apply(RDD.scala:710)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$$anonfun$apply.apply(RDD.scala:710)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Numpy 已安装。删除 mllib 导入并没有解决问题。
如果在 'run' 函数体内声明 'my_function' 的所有函数体,它通常会起作用。 否则我还没有找到如何像你的情况一样调用外部函数。