如何用 Direct Spark Cassandra Table 数据操作替换 JAVA 循环
How to Replace JAVA loop with Direct Spark Cassandra Table Data Manipulation
我正在努力提高我的代码效率,因为我必须在 cassandra 中处理数十亿行数据。我目前在 Datastax Cassandra Spark Connector 中使用 JAVA 循环来提取数据并把它变成我熟悉的格式 以便让 spark 进行操作。我希望能够用 cassandra 的直接 spark 操作替换这个 Multimap 循环 table 以节省时间并使一切更有效率。我非常感谢任何代码建议来实现这一点。这是我现有的代码:
Statement stmt = new SimpleStatement("SELECT \"Power\",\"Bandwidth\",\"Start_Frequency\" FROM \"SB1000_49552019\".\"Measured_Value\";");
stmt.setFetchSize(2000000);
ResultSet results = session.execute(stmt);
// Get the Variables from each Row of Cassandra Data
Multimap<Double, Float> data = LinkedListMultimap.create();
for (Row row : results){
// Column Names in Cassandra (Case Sensitive)
start_frequency = row.getDouble("Start_Frequency");
power = row.getFloat("Power");
bandwidth = row.getDouble("Bandwidth");
// Create Channel Power Buckets
for(channel = 1.6000E8; channel <= channel_end; ){
if( (channel >= start_frequency) && (channel <= (start_frequency + bandwidth)) ) {
data.put(channel, power);
} // end if
channel+=increment;
} // end for
} // end "row" for
// Create Spark List for DataFrame
List<Value> values = data.asMap().entrySet()
.stream()
.flatMap(x -> x.getValue()
.stream()
.map(y -> new Value(x.getKey(), y)))
.collect(Collectors.toList());
// Create DataFrame and Calculate Results
sqlContext.createDataFrame(sc.parallelize(values), Value.class).groupBy(col("channel"))
.agg(min("power"), max("power"), avg("power"))
.write().mode(SaveMode.Append)
.option("table", "results")
.option("keyspace", "model")
.format("org.apache.spark.sql.cassandra").save();
} // end session
} // End Compute
JavaRDD<MeasuredValue> rdd = javaFunctions(sc).cassandraTable("SB1000_47130646", "Measured_Value", mapRowTo(MeasuredValue.class));
JavaRDD<Value> valueRdd = rdd.flatMap(new FlatMapFunction<MeasuredValue, Value>(){
@Override
public Iterable<Value> call(MeasuredValue row) throws Exception {
double start_frequency = row.getStart_frequency();
float power = row.getPower();
double bandwidth = row.getBandwidth();
// Define Variable
double channel,channel_end, increment;
// Initialize Variables
channel_end = 1.6159E8;
increment = 5000;
List<Value> list = new ArrayList<Value>();
// Create Channel Power Buckets
for(channel = 1.6000E8; channel <= channel_end; ){
if( (channel >= start_frequency) && (channel <= (start_frequency + bandwidth)) ) {
list.add(new Value(channel, power));
} // end if
channel+=increment;
} // end for
return list;
}
});
sqlContext.createDataFrame(valueRdd, Value.class).groupBy(col("channel"))
.agg(min("power"), max("power"), avg("power"))
.write().mode(SaveMode.Append)
.option("table", "results")
.option("keyspace", "model")
.format("org.apache.spark.sql.cassandra").save();
} // end session
public static class MeasuredValue implements Serializable {
public MeasuredValue() { }
private double start_frequency;
public double getStart_frequency() { return start_frequency; }
public void setStart_frequency(double start_frequency) { this.start_frequency = start_frequency; }
private double bandwidth ;
public double getBandwidth() { return bandwidth; }
public void setBandwidth(double bandwidth) { this.bandwidth = bandwidth; }
private float power;
public float getPower() { return power; }
public void setPower(float power) { this.power = power; }
}
我正在努力提高我的代码效率,因为我必须在 cassandra 中处理数十亿行数据。我目前在 Datastax Cassandra Spark Connector 中使用 JAVA 循环来提取数据并把它变成我熟悉的格式
Statement stmt = new SimpleStatement("SELECT \"Power\",\"Bandwidth\",\"Start_Frequency\" FROM \"SB1000_49552019\".\"Measured_Value\";");
stmt.setFetchSize(2000000);
ResultSet results = session.execute(stmt);
// Get the Variables from each Row of Cassandra Data
Multimap<Double, Float> data = LinkedListMultimap.create();
for (Row row : results){
// Column Names in Cassandra (Case Sensitive)
start_frequency = row.getDouble("Start_Frequency");
power = row.getFloat("Power");
bandwidth = row.getDouble("Bandwidth");
// Create Channel Power Buckets
for(channel = 1.6000E8; channel <= channel_end; ){
if( (channel >= start_frequency) && (channel <= (start_frequency + bandwidth)) ) {
data.put(channel, power);
} // end if
channel+=increment;
} // end for
} // end "row" for
// Create Spark List for DataFrame
List<Value> values = data.asMap().entrySet()
.stream()
.flatMap(x -> x.getValue()
.stream()
.map(y -> new Value(x.getKey(), y)))
.collect(Collectors.toList());
// Create DataFrame and Calculate Results
sqlContext.createDataFrame(sc.parallelize(values), Value.class).groupBy(col("channel"))
.agg(min("power"), max("power"), avg("power"))
.write().mode(SaveMode.Append)
.option("table", "results")
.option("keyspace", "model")
.format("org.apache.spark.sql.cassandra").save();
} // end session
} // End Compute
JavaRDD<MeasuredValue> rdd = javaFunctions(sc).cassandraTable("SB1000_47130646", "Measured_Value", mapRowTo(MeasuredValue.class));
JavaRDD<Value> valueRdd = rdd.flatMap(new FlatMapFunction<MeasuredValue, Value>(){
@Override
public Iterable<Value> call(MeasuredValue row) throws Exception {
double start_frequency = row.getStart_frequency();
float power = row.getPower();
double bandwidth = row.getBandwidth();
// Define Variable
double channel,channel_end, increment;
// Initialize Variables
channel_end = 1.6159E8;
increment = 5000;
List<Value> list = new ArrayList<Value>();
// Create Channel Power Buckets
for(channel = 1.6000E8; channel <= channel_end; ){
if( (channel >= start_frequency) && (channel <= (start_frequency + bandwidth)) ) {
list.add(new Value(channel, power));
} // end if
channel+=increment;
} // end for
return list;
}
});
sqlContext.createDataFrame(valueRdd, Value.class).groupBy(col("channel"))
.agg(min("power"), max("power"), avg("power"))
.write().mode(SaveMode.Append)
.option("table", "results")
.option("keyspace", "model")
.format("org.apache.spark.sql.cassandra").save();
} // end session
public static class MeasuredValue implements Serializable {
public MeasuredValue() { }
private double start_frequency;
public double getStart_frequency() { return start_frequency; }
public void setStart_frequency(double start_frequency) { this.start_frequency = start_frequency; }
private double bandwidth ;
public double getBandwidth() { return bandwidth; }
public void setBandwidth(double bandwidth) { this.bandwidth = bandwidth; }
private float power;
public float getPower() { return power; }
public void setPower(float power) { this.power = power; }
}