无法使用 onComplete & andThen 打印 Scala Future 的值

Unable to print values of a Scala Future by using onComplete & andThen

我正在尝试使用 Scala-Spark 从我的数据源中读取增量数据。在访问源表之前,我试图计算我在 Future 中的代码中使用的分区列的最小值和最大值,它出现在 class: GetSourceMeta 中,如下所示。

def getBounds(keyIdMap:scala.collection.mutable.Map[String, String]): Future[scala.collection.mutable.Map[String, String]] = Future {
    var boundsMap = scala.collection.mutable.Map[String, String]()
    keyIdMap.keys.foreach(table => if(!keyIdMap(table).contains("Invalid")) {
        val minMax    = s"select max(insert_tms) maxTms, min(insert_tms) minTms from schema.${table} where source='DB2' and key_id in (${keyIdMap(table)})"
        println("MinMax: " + minMax)
        val boundsDF  = spark.read.format("jdbc").option("url", con.getConUrl()).option("dbtable", s"(${minMax}) as ctids").option("user", con.getUserName()).option("password", con.getPwd()).load()
        try {
            val maxTms = boundsDF.select("minTms").head.getTimestamp(0).toString + "," + boundsDF.select("maxTms").head.getTimestamp(0).toString
            println("Bounds: " + maxTms)
            boundsMap += (table -> maxTms)
        } catch {
            case np: java.lang.NullPointerException =>  { println("No data found") }
            case e: Exception => { println(s"Unknown exception: $e") }
        }
    }
    )
    boundsMap.foreach(println)
    boundsMap
}

我在我的主要方法中调用上面的方法:

object LoadToCopyDB {
    val conf = new SparkConf().setAppName("TEST_YEAR").set("some parameters")
    def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder().config(conf).master("yarn").enableHiveSupport().config("hive.exec.dynamic.partition", "true").config("hive.exec.dynamic.partition.mode", "nonstrict").getOrCreate()
        val gsm = new GetSourceMeta()
        val minMaxKeyMap = gsm.getBounds(keyIdMap).onComplete {
          case Success(values) => values.foreach(println)
          case Failure(f)      => f.printStackTrace
    }
.
.
.
}

嗯,onComplete 没有打印任何值,所以我使用 andThen 如下,但也没有帮助。

val bounds: Future[scala.collection.mutable.Map[String, String]] = gpMetaData.getBounds(incrementalIds) andThen {
  case Success(outval) => outval.foreach(println)
  case Failure(e)        => println(e)
}

早些时候主线程退出,没有让 Future: getBounds 执行。因此,我找不到终端上显示的 Future 的任何 println 语句。我发现我需要让主线程等待才能完成 Future。但是当我在 main 中使用 Await 和 onComplete 时:

Await.result(bounds, Duration.Inf)

编译报错:

Type mismatch, expected: Awaitable[NotInferedT], actual:Unit

如果我将 val minMaxKeyMap 声明为 Future[scala.collection.mutable.Map[String, String],编译器会说:Expression of type Unit doesn't conform to expected type Future[mutable.map[String,String]]

我试图在 Await 语句之后打印 bounds 的值,但它只打印了一个空的 Map。

我不明白如何解决这个问题。任何人都可以让我知道我该怎么做才能正确地制作 Future 运行?

遇到这种情况,还是跟着类型走比较好。 onComplete 方法仅 returns Unit,它不会 return 未来因此不能使用 Await 传递。

如果你想 return 任何类型的未来,你将必须映射或平面映射值和 return 一个选项,例如。在这种情况下,不管你 return 是什么,你只需要 Await 方法等待这个结果并打印一条轨迹。您可以在恢复中处理可能的异常。在您的代码中就像这样:

val minMaxKeyMap:Future[Option[Any] = gsm.getBounds(keyIdMap).map { values =>
   values.foreach(println)
   None
}.recover{
   case e: Throwable => 
          e. printStackTrace
          None
}

请注意,恢复部分必须 return 该类型的一个实例。 之后,您可以将 Await 应用于 Future,您将打印结果。这不是最漂亮的解决方案,但它适用于您的情况。