Spark 过滤器数据框 returns 空结果

Spark filter dataframe returns empty result

我正在使用 Scala 和 Spark 处理存储在 HDFS 中的文件。这些文件每天早上都会登陆 HDFS。我有一份工作,每天从 HDFS 读取该文件,对其进行处理,然后将结果写入 HDFS。在我将文件转换为 Dataframe 后,此作业执行过滤器以仅获取包含时间戳高于上一个文件中处理的最高时间戳的行。此过滤器仅在几天内具有未知行为。有些日子按预期工作,而其他日子尽管新文件包含与该过滤器匹配的行,但过滤器结果为空。当同一个文件在 TEST 环境中执行时,这种情况一直发生,但在我的本地工作中,使用具有相同 HDFS 连接的同一个文件按预期工作。

我尝试过以不同的方式进行过滤,但 none 然后在该环境中针对某些特定文件工作,但所有这些都在我的 LOCAL 中工作正常: 1) 火花 sql

val diff = fp.spark.sql("select * from curr " +
s"where TO_DATE(CAST(UNIX_TIMESTAMP(substring(${updtDtCol}, 
${substrStart},${substrEnd}),'${dateFormat}') as TIMESTAMP))" +
s" > TO_DATE(CAST(UNIX_TIMESTAMP('${prevDate.substring(0,10)}' 
,'${dateFormat}') as TIMESTAMP))")

2) Spark 过滤器函数

val diff = df.filter(date_format(unix_timestamp(substring(col(updtDtCol),0,10),dateFormat).cast("timestamp"),dateFormat).gt(date_format(unix_timestamp(substring(col("PrevDate"),0,10),dateFormat).cast("timestamp"),dateFormat)))

3) 添加带有过滤结果的额外列,然后按此新列过滤

val test2 = df.withColumn("PrevDate", lit(prevDate.substring(0,10)))
      .withColumn("DatePre", date_format(unix_timestamp(substring(col("PrevDate"),0,10),dateFormat).cast("timestamp"),dateFormat))
      .withColumn("Result", date_format(unix_timestamp(substring(col(updtDtCol),0,10),dateFormat).cast("timestamp"),dateFormat).gt(date_format(unix_timestamp(substring(col("PrevDate"),0,10),dateFormat).cast("timestamp"),dateFormat)))
      .withColumn("x", when(date_format(unix_timestamp(substring(col(updtDtCol),0,10),dateFormat).cast("timestamp"),dateFormat).gt(date_format(unix_timestamp(substring(col("PrevDate"),0,10),dateFormat).cast("timestamp"),dateFormat)), lit(1)).otherwise(lit(0)))

val diff = test2.filter("x == 1")

我认为问题不是由过滤器本身或文件引起的,但我想收到有关我应该检查什么或者是否有人以前遇到过这个问题的反馈。

请在此处告诉我哪些信息可能对 post 有用,以便收到一些反馈。

部分文件示例如下所示:

|TIMESTAMP                 |Result|x|
|2017-11-30-06.46.41.288395|true  |1|
|2017-11-28-08.29.36.188395|false |0|

TIMESTAMP 值与 previousDate(例如:2017-11-29)进行比较,我创建了一个名为 'Result' 的列,该比较结果始终适用于环境和另一列调用 'x' 结果相同。

正如我之前提到的,如果我在两个日期之间使用比较函数或在 'Result' 或 'x' 列中的结果来过滤数据框,有时结果是一个空数据框但在本地使用相同的 HDFS 和文件,结果包含数据。

我怀疑这是 data/date 格式问题。您是否有机会验证转换的日期是否符合预期?

如果两列的日期字符串都包含时区,则该行为是可预测的。

如果其中只有一个包含时区,那么本地和远程执行的结果会不同。这完全取决于集群的时区。

为了调试问题,我建议您使用额外的列来捕获相应日期字符串的 unix_timestamp(..)/millis,并使用额外的列来捕获两列的差异. diff 列应该有助于找出转换出错的位置和原因。希望这有帮助。

万一有人想知道这个问题发生了什么,以及我是如何最终找到错误原因的,这里就是解释。基本上这是由执行作业的机器(本地机器和测试服务器)的不同时区引起的。 unix_timestamp 函数在考虑服务器时区的情况下返回了正确的值。基本上最后我不必使用 unix_timestamp 函数,也不需要使用日期字段的全部内容。下次我会仔细检查一下。