如何在 RDD [(String, Int)] 上保存 AsTextFile 时删除记录周围的括号?

How to remove the parentheses around records when saveAsTextFile on RDD[(String, Int)]?

如何从以下 spark 作业的输出中删除括号“(”和“)”?

当我尝试使用 PigScript 读取 spark 输出时出现问题。

我的代码:

scala> val words = Array("HI","HOW","ARE")
words: Array[String] = Array(HI, HOW, ARE)

scala> val wordsRDD = sc.parallelize(words)
wordsRDD: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at    parallelize at <console>:23

scala> val keyvalueRDD = wordsRDD.map(elem => (elem,1))
keyvalueRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[1] at map at <console>:25

scala> val wordcountRDD = keyvalueRDD.reduceByKey((x,y) => x+y)
wordcountRDD: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[2] at reduceByKey at <console>:27

scala> wordcountRDD.saveAsTextFile("/user/cloudera/outputfiles")

按照上面的代码输出:

 hadoop dfs -cat /user/cloudera/outputfiles/part*

(HOW,1)
(ARE,1)
(HI,1)

但我希望 spark 的输出不带括号地存储如下

HOW,1
ARE,1
HI,1

现在我想使用 PigScript 读取上面的输出。

Pigscript 中的 LOAD 语句将“(HOW”视为第一个原子,将“1)”视为第二个原子

我们是否可以摆脱 spark 代码本身的括号,因为我不想在 pigscript 中应用此修复程序..

猪脚本:

records = LOAD '/user/cloudera/outputfiles' USING PigStorage(',') AS (word:chararray);
dump records;

猪产量:

 ((HOW)
 ((ARE)
 ((HI)

这种格式是元组的一种格式。您可以手动定义格式:

val wordcountRDD = keyvalueRDD.reduceByKey((x,y) => x+y)
                              // here we set custom format
                              .map(x => x._1 + "," + x._2)
wordcountRDD.saveAsTextFile("/user/cloudera/outputfiles")

在将记录保存到 outputfiles 目录之前使用 map 转换,例如

wordcountRDD.map { case (k, v) => s"$k, $v" }.saveAsTextFile("/user/cloudera/outputfiles")

参见 Spark's documentation about map


我强烈建议改用数据集。

scala> words.toSeq.toDS.groupBy("value").count().show
+-----+-----+
|value|count|
+-----+-----+
|  HOW|    1|
|  ARE|    1|
|   HI|    1|
+-----+-----+

scala> words.toSeq.toDS.groupBy("value").count.write.csv("outputfiles")

$ cat outputfiles/part-00199-aa752576-2f65-481b-b4dd-813262abb6c2-c000.csv
HI,1

参见 Spark SQL, DataFrames and Datasets Guide