Pyspark 中的 Pickle 错误

Pickle error in Pyspark

我正在尝试在 pyspark 中解析 xml。我有一个包含许多小 xml 文件的目录,我想解析所有 xml 并将其放入 hdfs 中,为此我在下面编写了代码。

代码:

import xml.etree.ElementTree as ET
from subprocess import Popen, PIPE
import pickle
filenme = sc.wholeTextFiles("/user/root/CD")
dumpoff1 = Popen(["hadoop", "fs", "-put", "-", "/user/cloudera/Demo/Demo.txt"],stdin=PIPE)

def getname(filenm):
   return filenm[1]

def add_hk(filenm):
   source=[]
   global dumpoff1 
   doc = ET.fromstring(filenm)
   for elem1 in doc.findall('.//documentInfo/source'):
       source.append(elem1.text)
       print source[0]
       dumpoff1.stdin.write("%s\n" % source[0]) 

filenme.map(getname).foreach(add_hk)

但是当我 运行 出现这个错误时。

错误:

File "/opt/cloudera/parcels/CDH-5.11.0-1.cdh5.11.0.p0.34/lib/spark/python/pyspark/cloudpickle.py", line 582, in save_file raise pickle.PicklingError("Cannot pickle files that are not opened for reading") pickle.PicklingError: Cannot pickle files that are not opened for reading

我尝试在 add_hk 中写入 Popen 然后我没有收到 pickle 错误但是 Demo.txt正在被覆盖,只有最新的文件值。请帮忙

您应该使用 spark SQL 加载您的 xml 文件,然后将它们写入 hdfs:

假设 /user/root/CD/ 是本地路径(否则删除 file://):

df = spark.read.format('com.databricks.spark.xml').options(rowTag='page').load('file:///user/root/CD/*')

你可以写成parquet:

df.write.parquet([HDFS path])