如何使用 Spark 读取不断更新的 HDFS 目录并根据字符串(行)将输出拆分为多个 HDFS 文件?

How to read with Spark constantly updating HDFS directory and split output to multiple HDFS files based on String (row)?

详细场景 -> HDFS 目录,其中 "fed" 包含多种银行账户 activity 的新日志数据。 每行代表一个随机 activity 类型,每行 (String) 包含文本 "ActivityType=<TheTypeHere>".

在 Spark-Scala 中,读取 HDFS 目录中的输入 file/s 并输出多个 HDFS 文件的最佳方法是什么,其中每个 ActivityType 都写入其自己的新文件?

你可以使用MultipleOutputFormat for this.Convert rdd 成键值对这样ActivityType 是 key.Spark 会为不同的文件创建不同的文件 keys.You 可以根据键决定放在哪里文件及其名称。

你可以使用 RDD 做这样的事情,我假设你有可变长度的文件,然后转换为 DF:

val rdd = sc.textFile("/FileStore/tables/activity.txt")
val rdd2 = rdd.map(_.split(","))
          .keyBy(_(0))
val rdd3 = rdd2.map(x => (x._1, x._2.mkString(",")))
val df = rdd3.toDF("K", "V")  
//df.show(false)

df.write.partitionBy("K").text("SO_QUESTION")

输入为:

ActivityType=<ACT_001>,34,56,67,89,90
ActivityType=<ACT_002>,A,1,2
ActivityType=<ACT_003>,ABC

然后我得到 3 个文件作为输出,在本例中,每条记录 1 个。有点难以像在 Databricks 中那样显示。

你可以调整你的输出格式和位置等,partitionBy是这里的关键。

修改了声明的第一个答案:

The location of the "key" string is random within the parent String, the only thing that is guaranteed is that it contains that sub-string, in this case "ActivityType" followed by some val.

问题真的是关于这个的。这里是:

// SO Question
val rdd = sc.textFile("/FileStore/tables/activitySO.txt")  
val rdd2 = rdd.map(x => (x.slice (x.indexOfSlice("ActivityType=<")+14, x.indexOfSlice(">", (x.indexOfSlice("ActivityType=<")+14))), x))
val df = rdd2.toDF("K", "V")
df.write.partitionBy("K").text("SO_QUESTION2")

输入是:

ActivityType=<ACT_001>,34,56,67,89,90
3,4,4,ActivityType=<ACT_002>,A,1,2
ABC,ActivityType=<ACT_0033>
DEF,ActivityType=<ACT_0033>

输出是 3 个文件,其中密钥是例如不是 ActivityType=,而是 ACT_001,等等。关键数据没有被剥离,它仍然存在于 String 中。如果需要,您可以修改它以及输出位置和格式。