在 Spark 应用程序中保存 RDD 的元素

Saving elements of RDD in Spark application

我是 运行 集群上的 spark 应用程序。我想对 RDD 中的每个元素执行一些操作并将每个元素保存到文本文件中。

我在 myRDD

上呼叫 foreach
  myRDD.foreach(process)

   def process(elements):
        // some operation that extracts the strings
        // and converts to myList
        myList = ... 

        with open("somefile.txt", "a+") as myfile:
            print "----SAVED----"
            myfile.writelines(myList)

但是,我找不到 somefile.txt,即使我确实发现打印语句没有任何 errors/warning。 somefile.txt 保存在哪里?如果我的方法不正确,如何保存RDD的单个元素?

好的,所以这是有问题的原因是 RDD 不一定在单个节点上。当您调用 foreach 时,您的 RDD 分布在您的节点上。您需要使用 collectuse the built in file writer, but this won't modify it.

将 RDD 收集到驱动程序节点

collect 可能更好,但它也是瓶颈,因为现在所有数据都被收集到单个节点(驱动程序节点)。

编辑:我将用一些代码回答您的其他问题...

def process(element):
  #process element to a list
  return myList

def writeList(myList):
  with open('somefile.txt', 'a+') as f:
    f.writelines(myList)

#in main
myListRDD = myRDD.map(process)
myListRDD.collect().foreach(writeList)

那应该在高效的同时做你想做的事。因为我们为列表处理到一个新的 RDD,所以我们能够并行进行所有处理,所以唯一的线性操作是文件写入,它需要在单个节点上发生以实现数据一致性。

myRDD.map(convertToList).saveAsTextFile(<hdfs output path>)

使用这种方法,您将能够扩展您的应用程序,如果您必须将所有数据传输到驱动程序中,那么您将保证输出数据足够小以适合驱动程序内存,否则您将开始拥有烦恼。

如果您要求所有数据仅以一个文件结尾,那么(这种方法与将所有输出传输到驱动程序有类似的问题,不可扩展):

myRDD.map(generateList).coalesce(1).saveAsTextFile(<hdfs output path>)

如果您需要在将列表存储到文件之前将其转换为字符串,则:

myRDD.map(generateList).map(listToString).saveAsTextFile(<hdfs output path>)

显然,您可以将列表转换为第一个映射中的字符串并节省额外的步骤。