Java Spark flatMap 似乎丢失了 ArrayList 中的项目
Java Spark flatMap seems to be losing items in ArrayList
我正在使用 spark/cassandra 驱动程序在 cassandra 中迭代数十亿行,并将数据提取到 运行 统计信息。为了实现这一点,我 运行 对每一行数据进行 FOR
循环,如果它符合我正在调用的数据桶的标准 "channel" 然后我将它添加到一个以K、V对通道、功率形式的ArrayList。
[[频道,功率]]
基于for循环的迭代增量,通道应该是静态的。例如,如果我的频道范围是 0 到 10,增量为 2,那么频道将为 0,2,4,6,8,10
FOR
在当前数据行上循环 运行s 并检查数据是否落在通道内,如果是,则将其添加到 ArrayList Data 中,格式为
[[频道,功率]]
然后进入下一行并做同样的事情。一旦遍历所有行,它就会递增到下一个通道并重复该过程。
问题是有数十亿行符合同一频道的条件,所以我不确定我是否应该使用 ArrayList
和 flatMap
或其他东西,因为我的结果是每次我 运行 它都略有不同,频道也不是静态的,因为它们应该是静态的。
一小部分数据 [[Channel,Power]] 将是:
[[2,5]]
[[2,10]]
[[2,5]]
[[2,15]]
[[2,5]]
请注意,我有一些重复的项目需要保留,因为我在每个频道上 运行 最小、最大、平均统计数据。
频道 2: 最少 5 次,最多 15 次,平均 8 次
我的代码如下:
JavaRDD<MeasuredValue> rdd = javaFunctions(sc).cassandraTable("SparkTestB", "Measured_Value", mapRowTo )
.select("Start_Frequency","Bandwidth","Power");
JavaRDD<Value> valueRdd = rdd.flatMap(new FlatMapFunction<MeasuredValue, Value>(){
@Override
public Iterable<Value> call(MeasuredValue row) throws Exception {
long start_frequency = row.getStart_frequency();
float power = row.getPower();
long bandwidth = row.getBandwidth();
// Define Variable
long channel,channel_end, increment;
// Initialize Variables
channel_end = 10;
increment = 2;
List<Value> list = new ArrayList<>();
// Create Channel Power Buckets
for(channel = 0; channel <= channel_end; ){
if( (channel >= start_frequency) && (channel <= (start_frequency + bandwidth)) ) {
list.add(new Value(channel, power));
} // end if
channel+=increment;
} // end for
return list;
}
});
sqlContext.createDataFrame(valueRdd, Value.class).groupBy(col("channel"))
.agg(min("power"), max("power"), avg("power"))
.write().mode(SaveMode.Append)
.option("table", "results")
.option("keyspace", "model")
.format("org.apache.spark.sql.cassandra").save();
我的 类 以下为反思:
public class Value implements Serializable {
public Value(Long channel, Float power) {
this.channel = channel;
this.power = power;
}
Long channel;
Float power;
public void setChannel(Long channel) {
this.channel = channel;
}
public void setPower(Float power) {
this.power = power;
}
public Long getChannel() {
return channel;
}
public Float getPower() {
return power;
}
@Override
public String toString() {
return "[" +channel +","+power+"]";
}
}
public static class MeasuredValue implements Serializable {
public MeasuredValue() { }
public long start_frequency;
public long getStart_frequency() { return start_frequency; }
public void setStart_frequency(long start_frequency) { this.start_frequency = start_frequency; }
public long bandwidth ;
public long getBandwidth() { return bandwidth; }
public void setBandwidth(long bandwidth) { this.bandwidth = bandwidth; }
public float power;
public float getPower() { return power; }
public void setPower(float power) { this.power = power; }
}
我发现我的信道化算法存在差异。我用下面的替换来解决问题。
// Create Channel Power Buckets
for(; channel <= channel_end; channel+=increment ){
//Initial Bucket
while((start_frequency >= channel) && (start_frequency < (channel + increment))){
list.add(new Value(channel, power));
channel+=increment;
}
//Buckets to Accomodate for Bandwidth
while ((channel <= channel_end) && (channel >= start_frequency) && (start_frequency + bandwidth) >= channel){
list.add(new Value(channel, power));
channel+=increment;
}
}
我正在使用 spark/cassandra 驱动程序在 cassandra 中迭代数十亿行,并将数据提取到 运行 统计信息。为了实现这一点,我 运行 对每一行数据进行 FOR
循环,如果它符合我正在调用的数据桶的标准 "channel" 然后我将它添加到一个以K、V对通道、功率形式的ArrayList。
[[频道,功率]]
基于for循环的迭代增量,通道应该是静态的。例如,如果我的频道范围是 0 到 10,增量为 2,那么频道将为 0,2,4,6,8,10
FOR
在当前数据行上循环 运行s 并检查数据是否落在通道内,如果是,则将其添加到 ArrayList Data 中,格式为
[[频道,功率]]
然后进入下一行并做同样的事情。一旦遍历所有行,它就会递增到下一个通道并重复该过程。
问题是有数十亿行符合同一频道的条件,所以我不确定我是否应该使用 ArrayList
和 flatMap
或其他东西,因为我的结果是每次我 运行 它都略有不同,频道也不是静态的,因为它们应该是静态的。
一小部分数据 [[Channel,Power]] 将是:
[[2,5]]
[[2,10]]
[[2,5]]
[[2,15]]
[[2,5]]
请注意,我有一些重复的项目需要保留,因为我在每个频道上 运行 最小、最大、平均统计数据。
频道 2: 最少 5 次,最多 15 次,平均 8 次
我的代码如下:
JavaRDD<MeasuredValue> rdd = javaFunctions(sc).cassandraTable("SparkTestB", "Measured_Value", mapRowTo )
.select("Start_Frequency","Bandwidth","Power");
JavaRDD<Value> valueRdd = rdd.flatMap(new FlatMapFunction<MeasuredValue, Value>(){
@Override
public Iterable<Value> call(MeasuredValue row) throws Exception {
long start_frequency = row.getStart_frequency();
float power = row.getPower();
long bandwidth = row.getBandwidth();
// Define Variable
long channel,channel_end, increment;
// Initialize Variables
channel_end = 10;
increment = 2;
List<Value> list = new ArrayList<>();
// Create Channel Power Buckets
for(channel = 0; channel <= channel_end; ){
if( (channel >= start_frequency) && (channel <= (start_frequency + bandwidth)) ) {
list.add(new Value(channel, power));
} // end if
channel+=increment;
} // end for
return list;
}
});
sqlContext.createDataFrame(valueRdd, Value.class).groupBy(col("channel"))
.agg(min("power"), max("power"), avg("power"))
.write().mode(SaveMode.Append)
.option("table", "results")
.option("keyspace", "model")
.format("org.apache.spark.sql.cassandra").save();
我的 类 以下为反思:
public class Value implements Serializable {
public Value(Long channel, Float power) {
this.channel = channel;
this.power = power;
}
Long channel;
Float power;
public void setChannel(Long channel) {
this.channel = channel;
}
public void setPower(Float power) {
this.power = power;
}
public Long getChannel() {
return channel;
}
public Float getPower() {
return power;
}
@Override
public String toString() {
return "[" +channel +","+power+"]";
}
}
public static class MeasuredValue implements Serializable {
public MeasuredValue() { }
public long start_frequency;
public long getStart_frequency() { return start_frequency; }
public void setStart_frequency(long start_frequency) { this.start_frequency = start_frequency; }
public long bandwidth ;
public long getBandwidth() { return bandwidth; }
public void setBandwidth(long bandwidth) { this.bandwidth = bandwidth; }
public float power;
public float getPower() { return power; }
public void setPower(float power) { this.power = power; }
}
我发现我的信道化算法存在差异。我用下面的替换来解决问题。
// Create Channel Power Buckets
for(; channel <= channel_end; channel+=increment ){
//Initial Bucket
while((start_frequency >= channel) && (start_frequency < (channel + increment))){
list.add(new Value(channel, power));
channel+=increment;
}
//Buckets to Accomodate for Bandwidth
while ((channel <= channel_end) && (channel >= start_frequency) && (start_frequency + bandwidth) >= channel){
list.add(new Value(channel, power));
channel+=increment;
}
}