如何连接 spark scala 数据框上的转换?
How to concatenate transformations on a spark scala dataframe?
我正在自学 scala(以便将它与 Apache Spark 一起使用)并想知道是否有某种方法可以在 Spark DataFrame 上连接一系列转换。例如。假设我们有一个转换列表
l: List[(String, String)] = List(("field1", "nonEmpty"), ("field2", "notNull"))
和一个 Spark DataFrame
df
,这样期望的结果就是
df.filter(df("field1") =!= "").filter(df("field2").isNotNull)
.
我在想也许这可以使用函数组合或列表折叠之类的方法来完成,但我真的不知道怎么做。任何帮助将不胜感激。
谢谢!
是的,完全有可能。但这取决于你真的想要,我的意思是,Spark 提供了 Pipelines, that allows to compose your transformations and create a pipeline that can be serialized. You can create your custom transformers, here 一个例子。您可以在自定义转换中包含您的“过滤器”阶段,您稍后将能够使用,例如,在 Spark 结构化流中。
其他选项是使用 Spark 数据集并使用 transform api。这看起来更实用和优雅。
Scala 有很多创建您自己的方法的可能性 api,但请先看看这些方法。
是的,您可以折叠现有的 Dataframe
。您可以将所有列保存在一个列表中,而不必理会其他中间类型:
val df =
???
val columns =
List(
col("1") =!= "",
col("2").isNotNull,
col("3") > 10
)
val filtered =
columns.foldLeft(df)((df, col) => df.filter(col))
我正在自学 scala(以便将它与 Apache Spark 一起使用)并想知道是否有某种方法可以在 Spark DataFrame 上连接一系列转换。例如。假设我们有一个转换列表
l: List[(String, String)] = List(("field1", "nonEmpty"), ("field2", "notNull"))
和一个 Spark DataFrame
df
,这样期望的结果就是
df.filter(df("field1") =!= "").filter(df("field2").isNotNull)
.
我在想也许这可以使用函数组合或列表折叠之类的方法来完成,但我真的不知道怎么做。任何帮助将不胜感激。
谢谢!
是的,完全有可能。但这取决于你真的想要,我的意思是,Spark 提供了 Pipelines, that allows to compose your transformations and create a pipeline that can be serialized. You can create your custom transformers, here 一个例子。您可以在自定义转换中包含您的“过滤器”阶段,您稍后将能够使用,例如,在 Spark 结构化流中。
其他选项是使用 Spark 数据集并使用 transform api。这看起来更实用和优雅。
Scala 有很多创建您自己的方法的可能性 api,但请先看看这些方法。
是的,您可以折叠现有的 Dataframe
。您可以将所有列保存在一个列表中,而不必理会其他中间类型:
val df =
???
val columns =
List(
col("1") =!= "",
col("2").isNotNull,
col("3") > 10
)
val filtered =
columns.foldLeft(df)((df, col) => df.filter(col))