使用Apache Spark实现python功能
Use Apache Spark to implement the python function
我有一个 python 代码要在 Spark 中实现,但是我无法获得在 Spark 1.1 版本中实现的 RDD 工作逻辑。此代码在 Python 中完美运行,但我想使用此代码在 Spark 中实现。
import lxml.etree
import csv
sc = SparkContext
data = sc.textFile("pain001.xml")
rdd = sc.parallelize(data)
# compile xpath selectors for ele ment text
selectors = ('GrpHdr/MsgId', 'GrpHdr/CreDtTm') # etc...
xpath = [lxml.etree.XPath('{}/text()'.format(s)) for s in selectors]
# open result csv file
with open('pain.csv', 'w') as paincsv:
writer = csv.writer(paincsv)
# read file with 1 'CstmrCdtTrfInitn' record per line
with open(rdd) as painxml:
# process each record
for index, line in enumerate(painxml):
if not line.strip(): # allow empty lines
continue
try:
# each line is an xml doc
pain001 = lxml.etree.fromstring(line)
# move to the customer elem
elem = pain001.find('CstmrCdtTrfInitn')
# select each value and write to csv
writer.writerow([xp(elem)[0].strip() for xp in xpath])
except Exception, e:
# give a hint where things go bad
sys.stderr.write("Error line {}, {}".format(index, str(e)))
raise
I am getting error as RDD not iteratable
- 我想将这段代码实现为一个函数,并在 Spark 中实现为一个独立的程序
- 我希望使用 python 模块在 HDFS 和 Spark 本地模式中处理输入文件。
感谢问题的回复。
您得到的错误信息非常丰富,当您执行 with open(rdd) as painxml:
之后,您尝试 iterate
覆盖 RDD
就好像它是正常的 List
或 python 中的 Tuple
,而 RDD
不是 iterable
,此外,如果您阅读 textFile 文档,您会注意到它 returns一个RDD
。
我认为你遇到的问题是你试图以经典的方式实现这一点,你必须在 MapReduce
范式内处理它,如果你真的是 Apache Spark
的新手,您可以旁听这门课程Scalable Machine Learning with Apache Spark,此外我建议您将您的 spark 版本更新到 1.5 或 1.6(即将发布)。
举个小例子(但不使用xmls):
导入需要的文件
import re
import csv
读取输入文件
content = sc.textFile("../test")
content.collect()
# Out[8]: [u'1st record-1', u'2nd record-2', u'3rd record-3', u'4th record-4']
Map
RDD
操作每一行
# Map it and convert it to tuples
rdd = content.map(lambda s: tuple(re.split("-+",s)))
rdd.collect()
# Out[9]: [(u'1st record', u'1'),
# (u'2nd record', u'2'),
# (u'3rd record', u'3'),
# (u'4th record', u'4')]
写下你的数据
with open("../test.csv", "w") as fw:
writer = csv.writer(fw)
for r1 in rdd.toLocalIterator():
writer.writerow(r1)
看看...
$ cat test.csv
1st record,1
2nd record,2
3rd record,3
4th record,4
注意:如果您想用 Apache Spark
阅读 xml
,GitHub 中有一些库,例如 spark-xml; you can also find this question interesting xml processing in spark。
我有一个 python 代码要在 Spark 中实现,但是我无法获得在 Spark 1.1 版本中实现的 RDD 工作逻辑。此代码在 Python 中完美运行,但我想使用此代码在 Spark 中实现。
import lxml.etree
import csv
sc = SparkContext
data = sc.textFile("pain001.xml")
rdd = sc.parallelize(data)
# compile xpath selectors for ele ment text
selectors = ('GrpHdr/MsgId', 'GrpHdr/CreDtTm') # etc...
xpath = [lxml.etree.XPath('{}/text()'.format(s)) for s in selectors]
# open result csv file
with open('pain.csv', 'w') as paincsv:
writer = csv.writer(paincsv)
# read file with 1 'CstmrCdtTrfInitn' record per line
with open(rdd) as painxml:
# process each record
for index, line in enumerate(painxml):
if not line.strip(): # allow empty lines
continue
try:
# each line is an xml doc
pain001 = lxml.etree.fromstring(line)
# move to the customer elem
elem = pain001.find('CstmrCdtTrfInitn')
# select each value and write to csv
writer.writerow([xp(elem)[0].strip() for xp in xpath])
except Exception, e:
# give a hint where things go bad
sys.stderr.write("Error line {}, {}".format(index, str(e)))
raise
I am getting error as RDD not iteratable
- 我想将这段代码实现为一个函数,并在 Spark 中实现为一个独立的程序
- 我希望使用 python 模块在 HDFS 和 Spark 本地模式中处理输入文件。
感谢问题的回复。
您得到的错误信息非常丰富,当您执行 with open(rdd) as painxml:
之后,您尝试 iterate
覆盖 RDD
就好像它是正常的 List
或 python 中的 Tuple
,而 RDD
不是 iterable
,此外,如果您阅读 textFile 文档,您会注意到它 returns一个RDD
。
我认为你遇到的问题是你试图以经典的方式实现这一点,你必须在 MapReduce
范式内处理它,如果你真的是 Apache Spark
的新手,您可以旁听这门课程Scalable Machine Learning with Apache Spark,此外我建议您将您的 spark 版本更新到 1.5 或 1.6(即将发布)。
举个小例子(但不使用xmls):
导入需要的文件
import re import csv
读取输入文件
content = sc.textFile("../test") content.collect() # Out[8]: [u'1st record-1', u'2nd record-2', u'3rd record-3', u'4th record-4']
Map
RDD
操作每一行# Map it and convert it to tuples rdd = content.map(lambda s: tuple(re.split("-+",s))) rdd.collect() # Out[9]: [(u'1st record', u'1'), # (u'2nd record', u'2'), # (u'3rd record', u'3'), # (u'4th record', u'4')]
写下你的数据
with open("../test.csv", "w") as fw: writer = csv.writer(fw) for r1 in rdd.toLocalIterator(): writer.writerow(r1)
看看...
$ cat test.csv 1st record,1 2nd record,2 3rd record,3 4th record,4
注意:如果您想用 Apache Spark
阅读 xml
,GitHub 中有一些库,例如 spark-xml; you can also find this question interesting xml processing in spark。