链接 Dataframe 函数调用

Chaining Dataframe function calls

以下代码不起作用:

val newDF = df
          .withColumn("timestamp", when(df("processingDate").isNull, lit(new Timestamp(System.currentTimeMillis))).otherwise(df("processingDate")))
          .withColumn("year", year(df("timestamp")))
          .withColumn("month", month(df("timestamp")))
          .withColumn("day", dayofmonth(df("timestamp")))

如果我运行它我会得到以下异常:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Cannot resolve column name "timestamp" among ...

问题是,虽然我添加了 "timestamp" 作为列,但它不是原始的、不可变的 "df" 的一部分。

有没有办法在调用链中引用之前的Dataframe?

我会更新我的代码如下,这样它就可以工作了,但我想知道是否有更好的方法。

val dfWithTimestamp = df.withColumn("timestamp", when(df("monBusinessDateTimestamp").isNull, lit(new Timestamp(System.currentTimeMillis))).otherwise(df("monBusinessDateTimestamp")))

val newDF = dfWithTimestamp
          .withColumn("year", year(dfWithTimestamp("timestamp")))
          .withColumn("month", month(dfWithTimestamp("timestamp")))
          .withColumn("day", dayofmonth(dfWithTimestamp("timestamp")))

我现在无法查看,但是

val newDF = df
          .withColumn("timestamp", when(df("processingDate").isNull, lit(new Timestamp(System.currentTimeMillis))).otherwise(df("processingDate")))
          .withColumn("year", year($"timestamp"))
          .withColumn("month", month($"timestamp"))
          .withColumn("day", dayofmonth($"timestamp"))

可能有用。