如何用 Direct Spark Cassandra Table 数据操作替换 JAVA 循环

How to Replace JAVA loop with Direct Spark Cassandra Table Data Manipulation

我正在努力提高我的代码效率,因为我必须在 cassandra 中处理数十亿行数据。我目前在 Datastax Cassandra Spark Connector 中使用 JAVA 循环来提取数据并把它变成我熟悉的格式 以便让 spark 进行操作。我希望能够用 cassandra 的直接 spark 操作替换这个 Multimap 循环 table 以节省时间并使一切更有效率。我非常感谢任何代码建议来实现这一点。这是我现有的代码:

        Statement stmt = new SimpleStatement("SELECT \"Power\",\"Bandwidth\",\"Start_Frequency\" FROM \"SB1000_49552019\".\"Measured_Value\";");
        stmt.setFetchSize(2000000);
        ResultSet results = session.execute(stmt);

// Get the Variables from each Row of Cassandra Data        
        Multimap<Double, Float> data = LinkedListMultimap.create();
        for (Row row : results){       
           // Column Names in Cassandra (Case Sensitive)
           start_frequency = row.getDouble("Start_Frequency");
           power = row.getFloat("Power");
           bandwidth = row.getDouble("Bandwidth"); 

// Create Channel Power Buckets    
                for(channel = 1.6000E8; channel <= channel_end;  ){ 
                    if( (channel >= start_frequency) && (channel <= (start_frequency + bandwidth)) ) {     
                     data.put(channel, power);
                    }  // end if
                    channel+=increment;
                }  // end for      
        } // end "row" for

// Create Spark List for DataFrame        
        List<Value> values = data.asMap().entrySet()
            .stream()
            .flatMap(x -> x.getValue()
                    .stream()
                    .map(y -> new Value(x.getKey(), y)))
            .collect(Collectors.toList());

// Create DataFrame and Calculate Results
    sqlContext.createDataFrame(sc.parallelize(values), Value.class).groupBy(col("channel"))
        .agg(min("power"), max("power"), avg("power"))
        .write().mode(SaveMode.Append)      
        .option("table", "results")
        .option("keyspace", "model")
        .format("org.apache.spark.sql.cassandra").save();

    } // end session
} // End Compute 
JavaRDD<MeasuredValue> rdd = javaFunctions(sc).cassandraTable("SB1000_47130646", "Measured_Value", mapRowTo(MeasuredValue.class));
JavaRDD<Value> valueRdd = rdd.flatMap(new FlatMapFunction<MeasuredValue, Value>(){
@Override 
public Iterable<Value> call(MeasuredValue row) throws Exception { 
double start_frequency = row.getStart_frequency(); 
float power = row.getPower(); 
double bandwidth = row.getBandwidth(); 

// Define Variable 
double channel,channel_end, increment;  

// Initialize Variables 
channel_end = 1.6159E8; 
increment = 5000; 

List<Value> list = new ArrayList<Value>(); 
// Create Channel Power Buckets 
for(channel = 1.6000E8; channel <= channel_end; ){ 
if( (channel >= start_frequency) && (channel <= (start_frequency + bandwidth)) ) { 
list.add(new Value(channel, power)); 
} // end if 
channel+=increment; 
} // end for 

return list; 
}    
    });

    sqlContext.createDataFrame(valueRdd, Value.class).groupBy(col("channel"))
    .agg(min("power"), max("power"), avg("power"))
    .write().mode(SaveMode.Append)      
    .option("table", "results")
    .option("keyspace", "model")
    .format("org.apache.spark.sql.cassandra").save();

} // end session

public static class MeasuredValue implements Serializable {

        public MeasuredValue() { }

        private double start_frequency;
        public double getStart_frequency() { return start_frequency; }
        public void setStart_frequency(double start_frequency) { this.start_frequency = start_frequency; }

        private double bandwidth ;
        public double getBandwidth() { return bandwidth; }
        public void setBandwidth(double bandwidth) { this.bandwidth = bandwidth; }

        private float power;    
        public float getPower() { return power; }
        public void setPower(float power) { this.power = power; }

    }