PySpark.RDD.first -> UnpicklingError: NEWOBJ class argument has NULL tp_new
PySpark.RDD.first -> UnpicklingError: NEWOBJ class argument has NULL tp_new
我将 python 2.7 与 spark 1.5.1 一起使用,我得到了这个:
df = sqlContext.read.parquet(".....").cache()
df = df.filter(df.foo == 1).select("a","b","c")
def myfun (row):
return pyspark.sql.Row(....)
rdd = df.map(myfun).cache()
rdd.first()
==> UnpicklingError: NEWOBJ class argument has NULL tp_new
怎么了?
我不确定你想做什么,但也许:
rdd = df.rdd.cache()
rdd.first()
.rdd 将 DataFrame 转换为 rdd
像往常一样,pickling 错误归结为 myfun
关闭了一个不可 picklable 的对象。
像往常一样,解决方案是使用mapPartitions
:
import pygeoip
def get_geo (rows):
db = pygeoip.GeoIP("/usr/share/GeoIP/GeoIPCity.dat")
for row in rows:
d = row.asDict()
d["new"] = db.record_by_addr(row.client_ip) if row.client_ip else "noIP"
yield d
rdd.mapPartitions(get_geo)
而不是map
:
import pygeoip
db = pygeoip.GeoIP("/usr/share/GeoIP/GeoIPCity.dat")
def get_geo (row):
d = row.asDict()
d["new"] = db.record_by_addr(row.client_ip) if row.client_ip else "noIP"
return d
rdd.map(get_geo)
我将 python 2.7 与 spark 1.5.1 一起使用,我得到了这个:
df = sqlContext.read.parquet(".....").cache()
df = df.filter(df.foo == 1).select("a","b","c")
def myfun (row):
return pyspark.sql.Row(....)
rdd = df.map(myfun).cache()
rdd.first()
==> UnpicklingError: NEWOBJ class argument has NULL tp_new
怎么了?
我不确定你想做什么,但也许:
rdd = df.rdd.cache()
rdd.first()
.rdd 将 DataFrame 转换为 rdd
像往常一样,pickling 错误归结为 myfun
关闭了一个不可 picklable 的对象。
像往常一样,解决方案是使用mapPartitions
:
import pygeoip
def get_geo (rows):
db = pygeoip.GeoIP("/usr/share/GeoIP/GeoIPCity.dat")
for row in rows:
d = row.asDict()
d["new"] = db.record_by_addr(row.client_ip) if row.client_ip else "noIP"
yield d
rdd.mapPartitions(get_geo)
而不是map
:
import pygeoip
db = pygeoip.GeoIP("/usr/share/GeoIP/GeoIPCity.dat")
def get_geo (row):
d = row.asDict()
d["new"] = db.record_by_addr(row.client_ip) if row.client_ip else "noIP"
return d
rdd.map(get_geo)