火花过滤不起作用

spark filtering not working

我正在尝试过滤我的 Dataframe[Row] 上的一些值。问题如下:

var index : Int = 0
var set = SetBuilding(features, 3)
val soglia : Int = 30
var exit : Boolean = false

while (!exit && index<set.length){
 val attributes = set(index).split(",")
 var r = scala.util.Random
 var i = r.nextInt(attributes.length)
 var previousI = i
 var j : Int = 8
 var maxprojections : Int = 5 
 var dataframe = sqlContext.sql("SELECT "+set(index)+" FROM table").cache()
println("**************VALUTAZIONE SOTTOINSIEME: "+ set(index)+"***********************" )

 while ( j!=0 && maxprojections >=0){

var filtered = dataframe.filter((elem : Row) => ReturnType(elem.get(elem.fieldIndex(attributes(i)))).>(soglia))
println ( "proiezione su attributo " + attributes (i))
for (elem <- filtered){
  println(elem)
}
if ( attributes.size != 1){
 do{
   i = r.nextInt(attributes.length)
 }while ( i == previousI )
}
     println ( "*********valore di previousI = "+ attributes(previousI)+ "******************************")

 previousI = i

 j = filtered.count().toInt   
 println ( "*********valore di j = "+ j+ "******************************")
 maxprojections = maxprojections - 1
 println ( "*********valore di maxproj = "+ maxprojections+ "******************************")
}
 index+=1
 if ( index >= 4)
   exit = true
}

问题是,如果我将我的数据结构维护为 spark.DataFrame 并调用 filter(),我希望在某些属性上返回一个空数据帧,但调用 filteredData.count 的值是!= 0,但我确信这些值小于阈值。

当我在 filteredData 上调用 collect() 时,问题没有发生。

是否有解决方案包括将 filteredData 维护为 DataFrame

我希望现在查询是正确的

问题上下文中的预期似乎是每个 filter 操作都会改变 Dataframe 有效地从中删除元素。

这是一个错误的假设。

每个数据帧代表一组不可变的数据。 在循环的每次迭代中,我们都会获得一个新的数据帧,它是使用单个条件进行过滤的结果。 所以在每个循环中,结果如下所示:

/** loop 1 **/ var filtered = dataframe.filter(attibute_1 > threshold)
/** loop 2 **/ var filtered = dataframe.filter(attibute_2 > threshold)
...
/** loop n **/ var filtered = dataframe.filter(attibute_n > threshold)

我们最后观察到的是最终过滤操作覆盖变量的结果var filtered = dataframe.filter(attibute_n > threshold)所有其他过滤操作都丢失了。

如果我们想在此上下文中迭代地删除元素,实现此目的的快速更改是通过利用可变变量在循环中堆叠过滤器。像这样:

var filtered = dataframe
while (cond) {
    filtered = filtered.filter(attibute_i > threshold)
}

这里生成的 filtered 数据帧有一个逻辑计划,由多个过滤器操作组成。 它将等同于:dataframe.filter(attibute1 > threshold).filter(attribute2 > threshold).filter(attribute3 > threshold)...