火花过滤不起作用
spark filtering not working
我正在尝试过滤我的 Dataframe[Row] 上的一些值。问题如下:
var index : Int = 0
var set = SetBuilding(features, 3)
val soglia : Int = 30
var exit : Boolean = false
while (!exit && index<set.length){
val attributes = set(index).split(",")
var r = scala.util.Random
var i = r.nextInt(attributes.length)
var previousI = i
var j : Int = 8
var maxprojections : Int = 5
var dataframe = sqlContext.sql("SELECT "+set(index)+" FROM table").cache()
println("**************VALUTAZIONE SOTTOINSIEME: "+ set(index)+"***********************" )
while ( j!=0 && maxprojections >=0){
var filtered = dataframe.filter((elem : Row) => ReturnType(elem.get(elem.fieldIndex(attributes(i)))).>(soglia))
println ( "proiezione su attributo " + attributes (i))
for (elem <- filtered){
println(elem)
}
if ( attributes.size != 1){
do{
i = r.nextInt(attributes.length)
}while ( i == previousI )
}
println ( "*********valore di previousI = "+ attributes(previousI)+ "******************************")
previousI = i
j = filtered.count().toInt
println ( "*********valore di j = "+ j+ "******************************")
maxprojections = maxprojections - 1
println ( "*********valore di maxproj = "+ maxprojections+ "******************************")
}
index+=1
if ( index >= 4)
exit = true
}
问题是,如果我将我的数据结构维护为 spark.DataFrame 并调用 filter(),我希望在某些属性上返回一个空数据帧,但调用 filteredData.count 的值是!= 0,但我确信这些值小于阈值。
当我在 filteredData 上调用 collect() 时,问题没有发生。
是否有解决方案包括将 filteredData 维护为 DataFrame?
我希望现在查询是正确的
问题上下文中的预期似乎是每个 filter
操作都会改变 Dataframe
有效地从中删除元素。
这是一个错误的假设。
每个数据帧代表一组不可变的数据。
在循环的每次迭代中,我们都会获得一个新的数据帧,它是使用单个条件进行过滤的结果。
所以在每个循环中,结果如下所示:
/** loop 1 **/ var filtered = dataframe.filter(attibute_1 > threshold)
/** loop 2 **/ var filtered = dataframe.filter(attibute_2 > threshold)
...
/** loop n **/ var filtered = dataframe.filter(attibute_n > threshold)
我们最后观察到的是最终过滤操作覆盖变量的结果var filtered = dataframe.filter(attibute_n > threshold)
所有其他过滤操作都丢失了。
如果我们想在此上下文中迭代地删除元素,实现此目的的快速更改是通过利用可变变量在循环中堆叠过滤器。像这样:
var filtered = dataframe
while (cond) {
filtered = filtered.filter(attibute_i > threshold)
}
这里生成的 filtered
数据帧有一个逻辑计划,由多个过滤器操作组成。
它将等同于:dataframe.filter(attibute1 > threshold).filter(attribute2 > threshold).filter(attribute3 > threshold)...
我正在尝试过滤我的 Dataframe[Row] 上的一些值。问题如下:
var index : Int = 0
var set = SetBuilding(features, 3)
val soglia : Int = 30
var exit : Boolean = false
while (!exit && index<set.length){
val attributes = set(index).split(",")
var r = scala.util.Random
var i = r.nextInt(attributes.length)
var previousI = i
var j : Int = 8
var maxprojections : Int = 5
var dataframe = sqlContext.sql("SELECT "+set(index)+" FROM table").cache()
println("**************VALUTAZIONE SOTTOINSIEME: "+ set(index)+"***********************" )
while ( j!=0 && maxprojections >=0){
var filtered = dataframe.filter((elem : Row) => ReturnType(elem.get(elem.fieldIndex(attributes(i)))).>(soglia))
println ( "proiezione su attributo " + attributes (i))
for (elem <- filtered){
println(elem)
}
if ( attributes.size != 1){
do{
i = r.nextInt(attributes.length)
}while ( i == previousI )
}
println ( "*********valore di previousI = "+ attributes(previousI)+ "******************************")
previousI = i
j = filtered.count().toInt
println ( "*********valore di j = "+ j+ "******************************")
maxprojections = maxprojections - 1
println ( "*********valore di maxproj = "+ maxprojections+ "******************************")
}
index+=1
if ( index >= 4)
exit = true
}
问题是,如果我将我的数据结构维护为 spark.DataFrame 并调用 filter(),我希望在某些属性上返回一个空数据帧,但调用 filteredData.count 的值是!= 0,但我确信这些值小于阈值。
当我在 filteredData 上调用 collect() 时,问题没有发生。
是否有解决方案包括将 filteredData 维护为 DataFrame?
我希望现在查询是正确的
问题上下文中的预期似乎是每个 filter
操作都会改变 Dataframe
有效地从中删除元素。
这是一个错误的假设。
每个数据帧代表一组不可变的数据。 在循环的每次迭代中,我们都会获得一个新的数据帧,它是使用单个条件进行过滤的结果。 所以在每个循环中,结果如下所示:
/** loop 1 **/ var filtered = dataframe.filter(attibute_1 > threshold)
/** loop 2 **/ var filtered = dataframe.filter(attibute_2 > threshold)
...
/** loop n **/ var filtered = dataframe.filter(attibute_n > threshold)
我们最后观察到的是最终过滤操作覆盖变量的结果var filtered = dataframe.filter(attibute_n > threshold)
所有其他过滤操作都丢失了。
如果我们想在此上下文中迭代地删除元素,实现此目的的快速更改是通过利用可变变量在循环中堆叠过滤器。像这样:
var filtered = dataframe
while (cond) {
filtered = filtered.filter(attibute_i > threshold)
}
这里生成的 filtered
数据帧有一个逻辑计划,由多个过滤器操作组成。
它将等同于:dataframe.filter(attibute1 > threshold).filter(attribute2 > threshold).filter(attribute3 > threshold)...