如何在 SparkSQL 中使用 Dataframe 获取行的迭代器
How to get an Iterator of Rows using Dataframe in SparkSQL
我在 SparkSQL 中有一个应用程序,其中 returns 大量的行很难放入内存中,所以我将无法在 DataFrame 上使用 collect 函数,有什么方法可以让我使用将所有这些行作为一个 Iterable instaed 将整个行作为列表。
我正在使用 yarn-client 执行这个 SparkSQL 应用程序。
一般来说,将所有数据传输到驱动程序看起来是个很糟糕的主意,大多数时候有更好的解决方案,但如果你真的想这样做,你可以使用 toLocalIterator
方法一个 RDD:
val df: org.apache.spark.sql.DataFrame = ???
df.cache // Optional, to avoid repeated computation, see docs for details
val iter: Iterator[org.apache.spark.sql.Row] = df.rdd.toLocalIterator
其实你可以直接使用:df.toLocalIterator
,这里是Spark源码中的参考:
/**
* Return an iterator that contains all of [[Row]]s in this Dataset.
*
* The iterator will consume as much memory as the largest partition in this Dataset.
*
* Note: this results in multiple Spark jobs, and if the input Dataset is the result
* of a wide transformation (e.g. join with different partitioners), to avoid
* recomputing the input Dataset should be cached first.
*
* @group action
* @since 2.0.0
*/
def toLocalIterator(): java.util.Iterator[T] = withCallback("toLocalIterator", toDF()) { _ =>
withNewExecutionId {
queryExecution.executedPlan.executeToIterator().map(boundEnc.fromRow).asJava
}
}
我在 SparkSQL 中有一个应用程序,其中 returns 大量的行很难放入内存中,所以我将无法在 DataFrame 上使用 collect 函数,有什么方法可以让我使用将所有这些行作为一个 Iterable instaed 将整个行作为列表。
我正在使用 yarn-client 执行这个 SparkSQL 应用程序。
一般来说,将所有数据传输到驱动程序看起来是个很糟糕的主意,大多数时候有更好的解决方案,但如果你真的想这样做,你可以使用 toLocalIterator
方法一个 RDD:
val df: org.apache.spark.sql.DataFrame = ???
df.cache // Optional, to avoid repeated computation, see docs for details
val iter: Iterator[org.apache.spark.sql.Row] = df.rdd.toLocalIterator
其实你可以直接使用:df.toLocalIterator
,这里是Spark源码中的参考:
/**
* Return an iterator that contains all of [[Row]]s in this Dataset.
*
* The iterator will consume as much memory as the largest partition in this Dataset.
*
* Note: this results in multiple Spark jobs, and if the input Dataset is the result
* of a wide transformation (e.g. join with different partitioners), to avoid
* recomputing the input Dataset should be cached first.
*
* @group action
* @since 2.0.0
*/
def toLocalIterator(): java.util.Iterator[T] = withCallback("toLocalIterator", toDF()) { _ =>
withNewExecutionId {
queryExecution.executedPlan.executeToIterator().map(boundEnc.fromRow).asJava
}
}