无法使用 onComplete & andThen 打印 Scala Future 的值
Unable to print values of a Scala Future by using onComplete & andThen
我正在尝试使用 Scala-Spark 从我的数据源中读取增量数据。在访问源表之前,我试图计算我在 Future 中的代码中使用的分区列的最小值和最大值,它出现在 class: GetSourceMeta
中,如下所示。
def getBounds(keyIdMap:scala.collection.mutable.Map[String, String]): Future[scala.collection.mutable.Map[String, String]] = Future {
var boundsMap = scala.collection.mutable.Map[String, String]()
keyIdMap.keys.foreach(table => if(!keyIdMap(table).contains("Invalid")) {
val minMax = s"select max(insert_tms) maxTms, min(insert_tms) minTms from schema.${table} where source='DB2' and key_id in (${keyIdMap(table)})"
println("MinMax: " + minMax)
val boundsDF = spark.read.format("jdbc").option("url", con.getConUrl()).option("dbtable", s"(${minMax}) as ctids").option("user", con.getUserName()).option("password", con.getPwd()).load()
try {
val maxTms = boundsDF.select("minTms").head.getTimestamp(0).toString + "," + boundsDF.select("maxTms").head.getTimestamp(0).toString
println("Bounds: " + maxTms)
boundsMap += (table -> maxTms)
} catch {
case np: java.lang.NullPointerException => { println("No data found") }
case e: Exception => { println(s"Unknown exception: $e") }
}
}
)
boundsMap.foreach(println)
boundsMap
}
我在我的主要方法中调用上面的方法:
object LoadToCopyDB {
val conf = new SparkConf().setAppName("TEST_YEAR").set("some parameters")
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().config(conf).master("yarn").enableHiveSupport().config("hive.exec.dynamic.partition", "true").config("hive.exec.dynamic.partition.mode", "nonstrict").getOrCreate()
val gsm = new GetSourceMeta()
val minMaxKeyMap = gsm.getBounds(keyIdMap).onComplete {
case Success(values) => values.foreach(println)
case Failure(f) => f.printStackTrace
}
.
.
.
}
嗯,onComplete
没有打印任何值,所以我使用 andThen
如下,但也没有帮助。
val bounds: Future[scala.collection.mutable.Map[String, String]] = gpMetaData.getBounds(incrementalIds) andThen {
case Success(outval) => outval.foreach(println)
case Failure(e) => println(e)
}
早些时候主线程退出,没有让 Future: getBounds 执行。因此,我找不到终端上显示的 Future 的任何 println 语句。我发现我需要让主线程等待才能完成 Future。但是当我在 main 中使用 Await 和 onComplete 时:
Await.result(bounds, Duration.Inf)
编译报错:
Type mismatch, expected: Awaitable[NotInferedT], actual:Unit
如果我将 val minMaxKeyMap 声明为 Future[scala.collection.mutable.Map[String, String]
,编译器会说:Expression of type Unit doesn't conform to expected type Future[mutable.map[String,String]]
我试图在 Await 语句之后打印 bounds
的值,但它只打印了一个空的 Map。
我不明白如何解决这个问题。任何人都可以让我知道我该怎么做才能正确地制作 Future 运行?
遇到这种情况,还是跟着类型走比较好。 onComplete 方法仅 returns Unit,它不会 return 未来因此不能使用 Await 传递。
如果你想 return 任何类型的未来,你将必须映射或平面映射值和 return 一个选项,例如。在这种情况下,不管你 return 是什么,你只需要 Await 方法等待这个结果并打印一条轨迹。您可以在恢复中处理可能的异常。在您的代码中就像这样:
val minMaxKeyMap:Future[Option[Any] = gsm.getBounds(keyIdMap).map { values =>
values.foreach(println)
None
}.recover{
case e: Throwable =>
e. printStackTrace
None
}
请注意,恢复部分必须 return 该类型的一个实例。
之后,您可以将 Await 应用于 Future,您将打印结果。这不是最漂亮的解决方案,但它适用于您的情况。
我正在尝试使用 Scala-Spark 从我的数据源中读取增量数据。在访问源表之前,我试图计算我在 Future 中的代码中使用的分区列的最小值和最大值,它出现在 class: GetSourceMeta
中,如下所示。
def getBounds(keyIdMap:scala.collection.mutable.Map[String, String]): Future[scala.collection.mutable.Map[String, String]] = Future {
var boundsMap = scala.collection.mutable.Map[String, String]()
keyIdMap.keys.foreach(table => if(!keyIdMap(table).contains("Invalid")) {
val minMax = s"select max(insert_tms) maxTms, min(insert_tms) minTms from schema.${table} where source='DB2' and key_id in (${keyIdMap(table)})"
println("MinMax: " + minMax)
val boundsDF = spark.read.format("jdbc").option("url", con.getConUrl()).option("dbtable", s"(${minMax}) as ctids").option("user", con.getUserName()).option("password", con.getPwd()).load()
try {
val maxTms = boundsDF.select("minTms").head.getTimestamp(0).toString + "," + boundsDF.select("maxTms").head.getTimestamp(0).toString
println("Bounds: " + maxTms)
boundsMap += (table -> maxTms)
} catch {
case np: java.lang.NullPointerException => { println("No data found") }
case e: Exception => { println(s"Unknown exception: $e") }
}
}
)
boundsMap.foreach(println)
boundsMap
}
我在我的主要方法中调用上面的方法:
object LoadToCopyDB {
val conf = new SparkConf().setAppName("TEST_YEAR").set("some parameters")
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().config(conf).master("yarn").enableHiveSupport().config("hive.exec.dynamic.partition", "true").config("hive.exec.dynamic.partition.mode", "nonstrict").getOrCreate()
val gsm = new GetSourceMeta()
val minMaxKeyMap = gsm.getBounds(keyIdMap).onComplete {
case Success(values) => values.foreach(println)
case Failure(f) => f.printStackTrace
}
.
.
.
}
嗯,onComplete
没有打印任何值,所以我使用 andThen
如下,但也没有帮助。
val bounds: Future[scala.collection.mutable.Map[String, String]] = gpMetaData.getBounds(incrementalIds) andThen {
case Success(outval) => outval.foreach(println)
case Failure(e) => println(e)
}
早些时候主线程退出,没有让 Future: getBounds 执行。因此,我找不到终端上显示的 Future 的任何 println 语句。我发现我需要让主线程等待才能完成 Future。但是当我在 main 中使用 Await 和 onComplete 时:
Await.result(bounds, Duration.Inf)
编译报错:
Type mismatch, expected: Awaitable[NotInferedT], actual:Unit
如果我将 val minMaxKeyMap 声明为 Future[scala.collection.mutable.Map[String, String]
,编译器会说:Expression of type Unit doesn't conform to expected type Future[mutable.map[String,String]]
我试图在 Await 语句之后打印 bounds
的值,但它只打印了一个空的 Map。
我不明白如何解决这个问题。任何人都可以让我知道我该怎么做才能正确地制作 Future 运行?
遇到这种情况,还是跟着类型走比较好。 onComplete 方法仅 returns Unit,它不会 return 未来因此不能使用 Await 传递。
如果你想 return 任何类型的未来,你将必须映射或平面映射值和 return 一个选项,例如。在这种情况下,不管你 return 是什么,你只需要 Await 方法等待这个结果并打印一条轨迹。您可以在恢复中处理可能的异常。在您的代码中就像这样:
val minMaxKeyMap:Future[Option[Any] = gsm.getBounds(keyIdMap).map { values =>
values.foreach(println)
None
}.recover{
case e: Throwable =>
e. printStackTrace
None
}
请注意,恢复部分必须 return 该类型的一个实例。 之后,您可以将 Await 应用于 Future,您将打印结果。这不是最漂亮的解决方案,但它适用于您的情况。