如何使用 Apache Spark 加载带有嵌套列的 csv
How to load csv with nested columns using Apache Spark
我有一个 csv 文件:
name,age,phonenumbers
Tom,20,"[{number:100200, area_code:555},{number:100300, area_code:444}]"
Harry,20,"[{number:100400, area_code:555},{number:100500, area_code:666}]"
如何在 Spark 中将此文件加载到 RDD/Dataset Person,其中 Person 对象如下所示:
class Person {
String name;
Integer age;
List<Phone> phonenumbers;
class Phone {
int number;
int area_code;
}
}
不幸的是,嵌套对象的列名在您的示例中没有引号。真的是这样吗?因为如果他们确实有引号(例如格式正确的 JSON),那么您可以很容易地使用 from_json
函数,如下所示:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
val schema = new ArrayType(new StructType()
.add("number", IntegerType)
.add("area_code", IntegerType), false)
val converted = input.withColumn("phones", from_json('phonenumbers, schema))
如果不是这样,那么您需要使用自己的逻辑将字符串转换为实际的嵌套对象,例如:
import org.apache.spark.sql.functions._
case class Phone(number: Int, area_code:Int)
case class Person(name: String, age: Int, phonenumbers: Array[Phone])
val converted = input.map {
case Row(name: String, age: Int, phonenumbers: String) => {
import scala.util.matching.Regex
val phoneFormat = raw"\{number:(\d{6}), area_code:(\d{3})\}".r
val phones = for (m <- phoneFormat.findAllMatchIn(phonenumbers)) yield Phone(m.group(1).toInt, m.group(2).toInt)
Person(name, age, phones.toArray)
}
}
我有一个 csv 文件:
name,age,phonenumbers
Tom,20,"[{number:100200, area_code:555},{number:100300, area_code:444}]"
Harry,20,"[{number:100400, area_code:555},{number:100500, area_code:666}]"
如何在 Spark 中将此文件加载到 RDD/Dataset Person,其中 Person 对象如下所示:
class Person {
String name;
Integer age;
List<Phone> phonenumbers;
class Phone {
int number;
int area_code;
}
}
不幸的是,嵌套对象的列名在您的示例中没有引号。真的是这样吗?因为如果他们确实有引号(例如格式正确的 JSON),那么您可以很容易地使用 from_json
函数,如下所示:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
val schema = new ArrayType(new StructType()
.add("number", IntegerType)
.add("area_code", IntegerType), false)
val converted = input.withColumn("phones", from_json('phonenumbers, schema))
如果不是这样,那么您需要使用自己的逻辑将字符串转换为实际的嵌套对象,例如:
import org.apache.spark.sql.functions._
case class Phone(number: Int, area_code:Int)
case class Person(name: String, age: Int, phonenumbers: Array[Phone])
val converted = input.map {
case Row(name: String, age: Int, phonenumbers: String) => {
import scala.util.matching.Regex
val phoneFormat = raw"\{number:(\d{6}), area_code:(\d{3})\}".r
val phones = for (m <- phoneFormat.findAllMatchIn(phonenumbers)) yield Phone(m.group(1).toInt, m.group(2).toInt)
Person(name, age, phones.toArray)
}
}