使用Apache Spark实现python功能

Use Apache Spark to implement the python function

我有一个 python 代码要在 Spark 中实现,但是我无法获得在 Spark 1.1 版本中实现的 RDD 工作逻辑。此代码在 Python 中完美运行,但我想使用此代码在 Spark 中实现。

import lxml.etree
import csv

sc = SparkContext
data = sc.textFile("pain001.xml")
rdd = sc.parallelize(data)
# compile xpath selectors for ele ment text
selectors = ('GrpHdr/MsgId', 'GrpHdr/CreDtTm') # etc...
xpath = [lxml.etree.XPath('{}/text()'.format(s)) for s in selectors]

# open result csv file
with open('pain.csv', 'w') as paincsv:
    writer = csv.writer(paincsv)
    # read file with 1 'CstmrCdtTrfInitn' record per line
    with open(rdd) as painxml:
        # process each record
        for index, line in enumerate(painxml):
            if not line.strip(): # allow empty lines
                continue
            try:
                # each line is an xml doc
                pain001 = lxml.etree.fromstring(line)
                # move to the customer elem
                elem = pain001.find('CstmrCdtTrfInitn')
                # select each value and write to csv
                writer.writerow([xp(elem)[0].strip() for xp in xpath])
            except Exception, e:
                # give a hint where things go bad
                sys.stderr.write("Error line {}, {}".format(index, str(e)))
                raise  

I am getting error as RDD not iteratable
  1. 我想将这段代码实现为一个函数,并在 Spark 中实现为一个独立的程序
  2. 我希望使用 python 模块在 HDFS 和 Spark 本地模式中处理输入文件。

感谢问题的回复。

您得到的错误信息非常丰富,当您执行 with open(rdd) as painxml: 之后,您尝试 iterate 覆盖 RDD 就好像它是正常的 List 或 python 中的 Tuple,而 RDD 不是 iterable,此外,如果您阅读 textFile 文档,您会注意到它 returns一个RDD

我认为你遇到的问题是你试图以经典的方式实现这一点,你必须在 MapReduce 范式内处理它,如果你真的是 Apache Spark 的新手,您可以旁听这门课程Scalable Machine Learning with Apache Spark,此外我建议您将您的 spark 版本更新到 1.5 或 1.6(即将发布)。

举个小例子(但不使用xmls):

  1. 导入需要的文件

    import re
    import csv
    
  2. 读取输入文件

    content = sc.textFile("../test")
    content.collect()
    # Out[8]: [u'1st record-1', u'2nd record-2', u'3rd record-3', u'4th record-4']
    
  3. Map RDD 操作每一行

    # Map it and convert it to tuples
    rdd = content.map(lambda s: tuple(re.split("-+",s)))
    rdd.collect()
    # Out[9]: [(u'1st record', u'1'),
    #          (u'2nd record', u'2'),
    #          (u'3rd record', u'3'),
    #          (u'4th record', u'4')]
    
  4. 写下你的数据

    with open("../test.csv", "w") as fw:
        writer = csv.writer(fw)
    
        for r1 in rdd.toLocalIterator():
            writer.writerow(r1)
    
  5. 看看...

    $ cat test.csv
    1st record,1
    2nd record,2
    3rd record,3
    4th record,4
    

注意:如果您想用 Apache Spark 阅读 xml,GitHub 中有一些库,例如 spark-xml; you can also find this question interesting xml processing in spark