使用数据帧时如何下推 Cassandra 的限制谓词?

How to pushdown limit predicate for Cassandra when you use dataframes?

我有大 Cassandra table。我只想从 Cassandra 加载 50 行。 以下代码

val ds = sparkSession.read
      .format("org.apache.spark.sql.cassandra")
      .options(Map("table" -> s"$Aggregates", "keyspace" -> s"$KeySpace"))
      .load()
      .where(col("aggregate_type") === "DAY")
      .where(col("start_time") <= "2018-03-28")
      .limit(50).collect()

以下代码从 where 方法中推送两个谓词,但不限制一个。获取整个数据(100 万条记录)是真的吗?如果不是,为什么运行这段代码的时间和没有limit(50)的代码差不多。

与 Spark Streaming 不同,Spark 本身会尽可能快地预加载尽可能多的数据,以便能够对其进行并行操作。所以预加载是惰性的,但触发时是贪婪的。然而,有 cassandra-conector 个具体因素:

  • Automatic predicate pushdown 有效 "where" 个子句。

  • 根据 this answer limit(...) 未转换为 CQL 的 LIMIT,因此其行为取决于在下载足够的数据后创建了多少获取作业.引用:

calling limit will allow Spark to skip reading some portions from the underlying DataSource. These would limit the amount of data read from Cassandra by canceling tasks from being executed.

可能的解决方案:

  • DataFrame 限制可以通过限制 numPartitions 和数据交换率 (concurrent.reads and other params) 来部分管理。如果你同意 n ~ 50 "in most cases",你也可以限制像 where(dayIndex < 50 * factor * num_records).

  • 有一种方法可以通过SparkPartitionLimit设置CQL LIMIT,直接影响每一个CQL请求(see more) - keep in mind that requests are per-spark-partition. It's available in CassandraRdd扩展class,所以您必须先转换为 RDD。

代码如下:

filteredDataFrame.rdd.asInstanceOf[CassandraRDD].limit(n).take(n).collect()

这会将 LIMIT $N 附加到每个 CQL-request。与 DataFrame 的限制不同,如果您多次指定 CassandraRDD limit (.limit(10).limit(20)) - 只有最后一个会被附加。此外,我使用 n 而不是 n / numPartitions + 1,因为它(即使 Spark 和 Cassandra 分区是 one-to-one)可能 return 更少的结果 per-partition。结果,我不得不添加 take(n) 以便将 <= numPartitions * n 减少到 n

警告 double-check 你的 where 可以翻译成 CQL(使用 explain())——否则 LIMIT 会在过滤之前应用。

P.S。您也可以尝试 运行 CQL 直接使用 sparkSession.sql(...) (like here) 并比较结果。