如何连接 spark scala 数据框上的转换?

How to concatenate transformations on a spark scala dataframe?

我正在自学 scala(以便将它与 Apache Spark 一起使用)并想知道是否有某种方法可以在 Spark DataFrame 上连接一系列转换。例如。假设我们有一个转换列表

l: List[(String, String)] = List(("field1", "nonEmpty"), ("field2", "notNull"))

和一个 Spark DataFrame

df,这样期望的结果就是

df.filter(df("field1") =!= "").filter(df("field2").isNotNull).

我在想也许这可以使用函数组合或列表折叠之类的方法来完成,但我真的不知道怎么做。任何帮助将不胜感激。

谢谢!

是的,完全有可能。但这取决于你真的想要,我的意思是,Spark 提供了 Pipelines, that allows to compose your transformations and create a pipeline that can be serialized. You can create your custom transformers, here 一个例子。您可以在自定义转换中包含您的“过滤器”阶段,您稍后将能够使用,例如,在 Spark 结构化流中。

其他选项是使用 Spark 数据集并使用 transform api。这看起来更实用和优雅。

Scala 有很多创建您自己的方法的可能性 api,但请先看看这些方法。

是的,您可以折叠现有的 Dataframe。您可以将所有列保存在一个列表中,而不必理会其他中间类型:

val df =
 ???

val columns =
  List(
    col("1") =!= "",
    col("2").isNotNull,
    col("3") > 10
 )
  
val filtered = 
  columns.foldLeft(df)((df, col) => df.filter(col))