如何在 Spark 和 Scala 中处理异常
How to handle exceptions in Spark and Scala
我正在尝试处理 Spark 中的常见异常,例如 .map 操作无法对数据的所有元素正常工作或 FileNotFound 异常。我已阅读所有现有问题和以下两个帖子:
https://www.nicolaferraro.me/2016/02/18/exception-handling-in-apache-spark
我已经尝试在 attributes => mHealthUser(attributes(0).toDouble, attributes(1).toDouble, attributes(2).toDouble
行中使用 Try 语句
所以它显示 attributes => Try(mHealthUser(attributes(0).toDouble, attributes(1).toDouble, attributes(2).toDouble)
但是那不会编译;编译器稍后将无法识别 .toDF()
语句。我也尝试过类似 Java 的 Try { Catch {}} 块,但无法获得正确的范围; df
然后不返回。有谁知道如何正确地做到这一点?我什至需要处理这些异常吗,因为 Spark 框架似乎已经处理了一个 FileNotFound 异常,而我没有添加一个。但是,例如,如果输入文件的列数错误,我想生成模式中字段数的错误。
代码如下:
object DataLoadTest extends SparkSessionWrapper {
/** Helper function to create a DataFrame from a textfile, re-used in subsequent tests */
def createDataFrame(fileName: String): DataFrame = {
import spark.implicits._
//try {
val df = spark.sparkContext
.textFile("/path/to/file" + fileName)
.map(_.split("\t"))
//mHealth user is the case class which defines the data schema
.map(attributes => mHealthUser(attributes(0).toDouble, attributes(1).toDouble, attributes(2).toDouble,
attributes(3).toDouble, attributes(4).toDouble,
attributes(5).toDouble, attributes(6).toDouble, attributes(7).toDouble,
attributes(8).toDouble, attributes(9).toDouble, attributes(10).toDouble,
attributes(11).toDouble, attributes(12).toDouble, attributes(13).toDouble,
attributes(14).toDouble, attributes(15).toDouble, attributes(16).toDouble,
attributes(17).toDouble, attributes(18).toDouble, attributes(19).toDouble,
attributes(20).toDouble, attributes(21).toDouble, attributes(22).toDouble,
attributes(23).toInt))
.toDF()
.cache()
df
} catch {
case ex: FileNotFoundException => println(s"File $fileName not found")
case unknown: Exception => println(s"Unknown exception: $unknown")
}
}
感谢所有建议。谢谢!
要么让异常从 createDataFrame
方法中抛出(并在外部处理),要么将签名更改为 return Option[DataFrame]
:
def createDataFrame(fileName: String): Option[DataFrame] = {
import spark.implicits._
try {
val df = spark.sparkContext
.textFile("/path/to/file" + fileName)
.map(_.split("\t"))
//mHealth user is the case class which defines the data schema
.map(attributes => mHealthUser(attributes(0).toDouble, attributes(1).toDouble, attributes(2).toDouble,
attributes(3).toDouble, attributes(4).toDouble,
attributes(5).toDouble, attributes(6).toDouble, attributes(7).toDouble,
attributes(8).toDouble, attributes(9).toDouble, attributes(10).toDouble,
attributes(11).toDouble, attributes(12).toDouble, attributes(13).toDouble,
attributes(14).toDouble, attributes(15).toDouble, attributes(16).toDouble,
attributes(17).toDouble, attributes(18).toDouble, attributes(19).toDouble,
attributes(20).toDouble, attributes(21).toDouble, attributes(22).toDouble,
attributes(23).toInt))
.toDF()
.cache()
Some(df)
} catch {
case ex: FileNotFoundException => {
println(s"File $fileName not found")
None
}
case unknown: Exception => {
println(s"Unknown exception: $unknown")
None
}
}
}
编辑:在 createDataFrame 的调用方有几种模式。如果您正在处理多个文件名,您可以例如做:
val dfs : Seq[DataFrame] = Seq("file1","file2","file3").map(createDataFrame).flatten
如果您正在处理单个文件名,您可以这样做:
createDataFrame("file1.csv") match {
case Some(df) => {
// proceed with your pipeline
val df2 = df.filter($"activityLabel" > 0).withColumn("binaryLabel", when($"activityLabel".between(1, 3), 0).otherwise(1))
}
case None => println("could not create dataframe")
}
另一种选择是在 scala 中使用 Try 类型。
例如:
def createDataFrame(fileName: String): Try[DataFrame] = {
try {
//create dataframe df
Success(df)
} catch {
case ex: FileNotFoundException => {
println(s"File $fileName not found")
Failure(ex)
}
case unknown: Exception => {
println(s"Unknown exception: $unknown")
Failure(unknown)
}
}
}
现在,在调用方,处理如下:
createDataFrame("file1.csv") match {
case Success(df) => {
// proceed with your pipeline
}
case Failure(ex) => //handle exception
}
这比使用 Option 稍微好一些,因为调用者会知道失败的原因并且可以更好地处理。
在数据框列上应用 try 和 catch 块:
(try{$"credit.amount"} catch{case e:Exception=> lit(0)}).as("credit_amount")
我正在尝试处理 Spark 中的常见异常,例如 .map 操作无法对数据的所有元素正常工作或 FileNotFound 异常。我已阅读所有现有问题和以下两个帖子:
https://www.nicolaferraro.me/2016/02/18/exception-handling-in-apache-spark
我已经尝试在 attributes => mHealthUser(attributes(0).toDouble, attributes(1).toDouble, attributes(2).toDouble
行中使用 Try 语句
所以它显示 attributes => Try(mHealthUser(attributes(0).toDouble, attributes(1).toDouble, attributes(2).toDouble)
但是那不会编译;编译器稍后将无法识别 .toDF()
语句。我也尝试过类似 Java 的 Try { Catch {}} 块,但无法获得正确的范围; df
然后不返回。有谁知道如何正确地做到这一点?我什至需要处理这些异常吗,因为 Spark 框架似乎已经处理了一个 FileNotFound 异常,而我没有添加一个。但是,例如,如果输入文件的列数错误,我想生成模式中字段数的错误。
代码如下:
object DataLoadTest extends SparkSessionWrapper {
/** Helper function to create a DataFrame from a textfile, re-used in subsequent tests */
def createDataFrame(fileName: String): DataFrame = {
import spark.implicits._
//try {
val df = spark.sparkContext
.textFile("/path/to/file" + fileName)
.map(_.split("\t"))
//mHealth user is the case class which defines the data schema
.map(attributes => mHealthUser(attributes(0).toDouble, attributes(1).toDouble, attributes(2).toDouble,
attributes(3).toDouble, attributes(4).toDouble,
attributes(5).toDouble, attributes(6).toDouble, attributes(7).toDouble,
attributes(8).toDouble, attributes(9).toDouble, attributes(10).toDouble,
attributes(11).toDouble, attributes(12).toDouble, attributes(13).toDouble,
attributes(14).toDouble, attributes(15).toDouble, attributes(16).toDouble,
attributes(17).toDouble, attributes(18).toDouble, attributes(19).toDouble,
attributes(20).toDouble, attributes(21).toDouble, attributes(22).toDouble,
attributes(23).toInt))
.toDF()
.cache()
df
} catch {
case ex: FileNotFoundException => println(s"File $fileName not found")
case unknown: Exception => println(s"Unknown exception: $unknown")
}
}
感谢所有建议。谢谢!
要么让异常从 createDataFrame
方法中抛出(并在外部处理),要么将签名更改为 return Option[DataFrame]
:
def createDataFrame(fileName: String): Option[DataFrame] = {
import spark.implicits._
try {
val df = spark.sparkContext
.textFile("/path/to/file" + fileName)
.map(_.split("\t"))
//mHealth user is the case class which defines the data schema
.map(attributes => mHealthUser(attributes(0).toDouble, attributes(1).toDouble, attributes(2).toDouble,
attributes(3).toDouble, attributes(4).toDouble,
attributes(5).toDouble, attributes(6).toDouble, attributes(7).toDouble,
attributes(8).toDouble, attributes(9).toDouble, attributes(10).toDouble,
attributes(11).toDouble, attributes(12).toDouble, attributes(13).toDouble,
attributes(14).toDouble, attributes(15).toDouble, attributes(16).toDouble,
attributes(17).toDouble, attributes(18).toDouble, attributes(19).toDouble,
attributes(20).toDouble, attributes(21).toDouble, attributes(22).toDouble,
attributes(23).toInt))
.toDF()
.cache()
Some(df)
} catch {
case ex: FileNotFoundException => {
println(s"File $fileName not found")
None
}
case unknown: Exception => {
println(s"Unknown exception: $unknown")
None
}
}
}
编辑:在 createDataFrame 的调用方有几种模式。如果您正在处理多个文件名,您可以例如做:
val dfs : Seq[DataFrame] = Seq("file1","file2","file3").map(createDataFrame).flatten
如果您正在处理单个文件名,您可以这样做:
createDataFrame("file1.csv") match {
case Some(df) => {
// proceed with your pipeline
val df2 = df.filter($"activityLabel" > 0).withColumn("binaryLabel", when($"activityLabel".between(1, 3), 0).otherwise(1))
}
case None => println("could not create dataframe")
}
另一种选择是在 scala 中使用 Try 类型。
例如:
def createDataFrame(fileName: String): Try[DataFrame] = {
try {
//create dataframe df
Success(df)
} catch {
case ex: FileNotFoundException => {
println(s"File $fileName not found")
Failure(ex)
}
case unknown: Exception => {
println(s"Unknown exception: $unknown")
Failure(unknown)
}
}
}
现在,在调用方,处理如下:
createDataFrame("file1.csv") match {
case Success(df) => {
// proceed with your pipeline
}
case Failure(ex) => //handle exception
}
这比使用 Option 稍微好一些,因为调用者会知道失败的原因并且可以更好地处理。
在数据框列上应用 try 和 catch 块:
(try{$"credit.amount"} catch{case e:Exception=> lit(0)}).as("credit_amount")