以编程方式将列名添加到从 RDD 构建的 Spark DataFrame

Programmatically add column names to Spark DataFrame built from an RDD

我有一个没有 header 的 pipe-delimited 文本文件,并且行的列数不同(一些行的类型为 A,有 400 列,其他行的类型为 B 和 200,所以我需要先把它们分开):

val textFileRaw = sc.textFile("./data.txt")
val textFile = textFileRaw.map(line => line.split("\|", -1))
val dataA = textFile.filter(line => line(0) == "A")
val dataB = textFile.filter(line => line(0) == "B")

现在我想将这些 RDD 转换为 Spark DataFrame,但拆分返回的是单个数组,而不是 400 或 200 个不同的值。这会导致以下错误:

# ANames are my column names, length=400
val ANames = Array("Row ID", "City", "State", ...)
val dataADF = dataA.toDF(ANames: _*)

Name: java.lang.IllegalArgumentException
Message: requirement failed: The number of columns doesn't match.
Old column names (1): value
New column names (400): Row ID, City, State ...

面临同样的问题,但所有答案都建议手动指定从数组到元组的映射,这在有数百列的情况下不是很好。

我认为如果我使用 Spark's csv loader,我可以让它工作,但这对我的数据不起作用,因为行具有不同数量的字段(它不是真正的 csv 文件)。 work-around 将首先拆分文件,写入 well-formed csv 的新文件,然后使用 csv 加载程序,但我想尽可能避免这种情况。如何将这些 RDD 转换为具有命名列的数据帧?

您应该创建一个架构并使用 SQLContext.createDataFrame api as

val dataA = textFile.filter(line => line(0) == "A")
val ANames = Array("Row ID", "City", "State", "kjl")
val dataADF = sqlContext.createDataFrame(dataA.map(Row.fromSeq(_)), StructType(ANames.map(StructField(_, StringType, true))))

应该可以。但请注意,我已将所有 数据类型 用作 StringType()。你可以根据自己的需要改变。