在 Scala / Spark 中有条件地映射 CSV 文件中的行以生成另一个 CSV 文件
Conditionally map through rows in CSV file in Scala / Spark to produce another CSV file
我是 Scala / Spark 的新手,我已经陷入了深渊。几个星期以来,我一直在努力寻找 Scala 2.11.8 上看似简单的问题的解决方案,但一直无法找到好的解决方案。
我有一个接近 150 GB 的 csv 格式的大型数据库,其中包含大量空值,需要根据各个列的值进行减少和清理。
原始CSV文件的架构如下:
- 第 1 列:双精度
- 第 2 列:整数
- 第 3 列:双倍
- 第 4 列:双倍
- 第 5 列:整数
- 第 6 列:双倍
- 第 7 列:整数
所以,我想有条件地映射 CSV 文件的所有行,并将结果导出到另一个 CSV 文件,每行具有以下条件:
如果第 4 列的值不为空,则该行第 4、5、6 和 7 列的值应存储为名为 lastValuesOf4to7 的数组。 (数据集中如果第4列元素不为null,则第1、2、3列为null,可以忽略)
如果第 3 列的值不为空,则第 1、2 和 3 列的值以及 lastValuesOf4to7 数组中的四个元素(如上所述)应作为新行导出到另一个名为 condensed.csv 的 CSV 文件中。 (数据集中如果第3列的元素不为空,则第4、5、6、7列为空,可以忽略)
所以最后我应该得到一个名为 condensed.csv 的 csv 文件,它有 7 列。
我曾尝试在 Scala 中使用以下代码,但未能取得进一步进展:
import scala.io.Source
object structuringData {
def main(args: Array[String]) {
val data = Source.fromFile("/path/to/file.csv")
var lastValuesOf4to7 = Array("0","0","0","0")
val lines = data.getLines // Get the lines of the file
val splitLine = lines.map(s => s.split(',')).toArray // This gives an out of memory error since the original file is huge.
data.close
}
}
正如您从上面的代码中看到的那样,我尝试将其移动到一个数组中,但由于我无法单独处理每一行,因此无法进一步推进。
我很确定在 Scala / Spark 上处理 csv 文件一定有直接的解决方案。
使用 Spark-csv 包,然后使用 Sql 查询查询数据并根据您的用例制作过滤器,最后导出。
如果您使用的是 spark 2.0.0,那么 spark-csv 将出现在 spark-sql 中,否则如果您使用的是旧版本,则相应地添加依赖项。
您可以在此处找到 link 到 spark-csv。
您也可以在这里查看示例:http://blog.madhukaraphatak.com/analysing-csv-data-in-spark/
感谢您的回复。我设法使用 Bash 脚本自己创建了一个解决方案。我必须先从一个空白的 condensed.csv 文件开始。我的代码显示了实现这一点是多么容易:
#!/bin/bash
OLDIFS=$IFS
IFS=","
last1=0
last2=0
last3=0
last4=0
while read f1 f2 f3 f4 f5 f6 f7
do
if [[ $f4 != "" ]];
then
last1=$f4
last2=$f5
last3=$f6
last4=$f7
elif [[ $f3 != "" ]];
then
echo "$f1,$f2,$f3,$last1,$last2,$last3,$last4" >> path/to/condensed.csv
fi
done <
IFS=$OLDIFS
如果脚本以 extractcsv.sh 的名称保存,那么它应该是 运行 使用以下格式:
$ ./extractcsv.sh path/to/original/file.csv
这只会证实我的观察,即 ETL 在 Bash 上比在 Scala 上更容易。不过还是谢谢你的帮助。
我是 Scala / Spark 的新手,我已经陷入了深渊。几个星期以来,我一直在努力寻找 Scala 2.11.8 上看似简单的问题的解决方案,但一直无法找到好的解决方案。 我有一个接近 150 GB 的 csv 格式的大型数据库,其中包含大量空值,需要根据各个列的值进行减少和清理。
原始CSV文件的架构如下:
- 第 1 列:双精度
- 第 2 列:整数
- 第 3 列:双倍
- 第 4 列:双倍
- 第 5 列:整数
- 第 6 列:双倍
- 第 7 列:整数
所以,我想有条件地映射 CSV 文件的所有行,并将结果导出到另一个 CSV 文件,每行具有以下条件:
如果第 4 列的值不为空,则该行第 4、5、6 和 7 列的值应存储为名为 lastValuesOf4to7 的数组。 (数据集中如果第4列元素不为null,则第1、2、3列为null,可以忽略)
如果第 3 列的值不为空,则第 1、2 和 3 列的值以及 lastValuesOf4to7 数组中的四个元素(如上所述)应作为新行导出到另一个名为 condensed.csv 的 CSV 文件中。 (数据集中如果第3列的元素不为空,则第4、5、6、7列为空,可以忽略)
所以最后我应该得到一个名为 condensed.csv 的 csv 文件,它有 7 列。
我曾尝试在 Scala 中使用以下代码,但未能取得进一步进展:
import scala.io.Source
object structuringData {
def main(args: Array[String]) {
val data = Source.fromFile("/path/to/file.csv")
var lastValuesOf4to7 = Array("0","0","0","0")
val lines = data.getLines // Get the lines of the file
val splitLine = lines.map(s => s.split(',')).toArray // This gives an out of memory error since the original file is huge.
data.close
}
}
正如您从上面的代码中看到的那样,我尝试将其移动到一个数组中,但由于我无法单独处理每一行,因此无法进一步推进。
我很确定在 Scala / Spark 上处理 csv 文件一定有直接的解决方案。
使用 Spark-csv 包,然后使用 Sql 查询查询数据并根据您的用例制作过滤器,最后导出。
如果您使用的是 spark 2.0.0,那么 spark-csv 将出现在 spark-sql 中,否则如果您使用的是旧版本,则相应地添加依赖项。
您可以在此处找到 link 到 spark-csv。
您也可以在这里查看示例:http://blog.madhukaraphatak.com/analysing-csv-data-in-spark/
感谢您的回复。我设法使用 Bash 脚本自己创建了一个解决方案。我必须先从一个空白的 condensed.csv 文件开始。我的代码显示了实现这一点是多么容易:
#!/bin/bash
OLDIFS=$IFS
IFS=","
last1=0
last2=0
last3=0
last4=0
while read f1 f2 f3 f4 f5 f6 f7
do
if [[ $f4 != "" ]];
then
last1=$f4
last2=$f5
last3=$f6
last4=$f7
elif [[ $f3 != "" ]];
then
echo "$f1,$f2,$f3,$last1,$last2,$last3,$last4" >> path/to/condensed.csv
fi
done <
IFS=$OLDIFS
如果脚本以 extractcsv.sh 的名称保存,那么它应该是 运行 使用以下格式:
$ ./extractcsv.sh path/to/original/file.csv
这只会证实我的观察,即 ETL 在 Bash 上比在 Scala 上更容易。不过还是谢谢你的帮助。