从 cassandra DB 检索数据后创建 RDD
Creating an RDD after retrieving data from cassandra DB
我正在为我的项目使用 cassandra 和 spark,现在我写这个是为了从数据库中检索数据:
results = session.execute("SELECT * FROM foo.test");
ArrayList<String> supportList = new ArrayList<String>();
for (Row row : results) {
supportList.add(row.getString("firstColumn") + "," + row.getString("secondColumn")));
}
JavaRDD<String> input = sparkContext.parallelize(supportList);
JavaPairRDD<String, Double> tuple = input.mapToPair(new PairFunction<String, String, Double>() {
public Tuple2<String, Double> call(String x) {
String[] parts = x.split(",");
return new Tuple2(parts[0],String.valueOf(new Random().nextInt(30) + 1));
}
可以,但是我想知道上面的代码有没有漂亮的写法,我想实现的是:
在 scala 中,我可以通过这种方式简单地检索和填充 RDD :
val dataRDD = sc.cassandraTable[TableColumnNames]("keySpace", "table")
如何在不使用支持列表或其他 "nasty" 的情况下在 Java 中编写相同的内容。
更新
JavaRDD<String> cassandraRowsRDD = javaFunctions(javaSparkContext).cassandraTable("keyspace", "table")
.map(new Function<CassandraRow, String>() {
@Override
public String call(CassandraRow cassandraRow) throws Exception {
return cassandraRow.toString();
}
});
我在这一行 -> public String call(CassandraRow cassandraRow)
这个异常:
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1623)
at org.apache.spark.rdd.RDD.map(RDD.scala:286)
at org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:89)
at org.apache.spark.api.java.AbstractJavaRDDLike.map(JavaRDDLike.scala:46)
at org.sparkexamples.cassandraExample.main.KMeans.executeQuery(KMeans.java:271)
at org.sparkexamples.cassandraExample.main.KMeans.main(KMeans.java:67)
Caused by: java.io.NotSerializableException: org.sparkexamples.cassandraExample.main.KMeans
Serialization stack:
- object not serializable (class: org.sparkexamples.cassandraExample.main.KMeans, value: org.sparkexamples.cassandraExample.main.KMeans@3015db78)
- field (class: org.sparkexamples.cassandraExample.main.KMeans, name: this[=12=], type: class org.sparkexamples.cassandraExample.main.KMeans)
- object (class org.sparkexamples.cassandraExample.main.KMeans, org.sparkexamples.cassandraExample.main.KMeans@5dbf5634)
- field (class: org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction, name: fun, type: interface org.apache.spark.api.java.function.Function)
- object (class org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:38)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
... 7 more
提前致谢。
看看答案:RDD not serializable Cassandra/Spark connector java API
问题可能是您显示的代码块周围的 class 不是可序列化的。
我遇到了同样的问题。我在单独的 class 中实现了 spark 接口函数,并将其提供给地图功能。它奏效了 post。
样本
public一个实现函数
{....}
在地图中使用了这个
.....地图(新a())
已整改。关于匿名 class.
的 spark 反序列化的一些问题
我正在为我的项目使用 cassandra 和 spark,现在我写这个是为了从数据库中检索数据:
results = session.execute("SELECT * FROM foo.test");
ArrayList<String> supportList = new ArrayList<String>();
for (Row row : results) {
supportList.add(row.getString("firstColumn") + "," + row.getString("secondColumn")));
}
JavaRDD<String> input = sparkContext.parallelize(supportList);
JavaPairRDD<String, Double> tuple = input.mapToPair(new PairFunction<String, String, Double>() {
public Tuple2<String, Double> call(String x) {
String[] parts = x.split(",");
return new Tuple2(parts[0],String.valueOf(new Random().nextInt(30) + 1));
}
可以,但是我想知道上面的代码有没有漂亮的写法,我想实现的是:
在 scala 中,我可以通过这种方式简单地检索和填充 RDD :
val dataRDD = sc.cassandraTable[TableColumnNames]("keySpace", "table")
如何在不使用支持列表或其他 "nasty" 的情况下在 Java 中编写相同的内容。
更新
JavaRDD<String> cassandraRowsRDD = javaFunctions(javaSparkContext).cassandraTable("keyspace", "table")
.map(new Function<CassandraRow, String>() {
@Override
public String call(CassandraRow cassandraRow) throws Exception {
return cassandraRow.toString();
}
});
我在这一行 -> public String call(CassandraRow cassandraRow)
这个异常:
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1623)
at org.apache.spark.rdd.RDD.map(RDD.scala:286)
at org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:89)
at org.apache.spark.api.java.AbstractJavaRDDLike.map(JavaRDDLike.scala:46)
at org.sparkexamples.cassandraExample.main.KMeans.executeQuery(KMeans.java:271)
at org.sparkexamples.cassandraExample.main.KMeans.main(KMeans.java:67)
Caused by: java.io.NotSerializableException: org.sparkexamples.cassandraExample.main.KMeans
Serialization stack:
- object not serializable (class: org.sparkexamples.cassandraExample.main.KMeans, value: org.sparkexamples.cassandraExample.main.KMeans@3015db78)
- field (class: org.sparkexamples.cassandraExample.main.KMeans, name: this[=12=], type: class org.sparkexamples.cassandraExample.main.KMeans)
- object (class org.sparkexamples.cassandraExample.main.KMeans, org.sparkexamples.cassandraExample.main.KMeans@5dbf5634)
- field (class: org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction, name: fun, type: interface org.apache.spark.api.java.function.Function)
- object (class org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:38)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
... 7 more
提前致谢。
看看答案:RDD not serializable Cassandra/Spark connector java API
问题可能是您显示的代码块周围的 class 不是可序列化的。
我遇到了同样的问题。我在单独的 class 中实现了 spark 接口函数,并将其提供给地图功能。它奏效了 post。
样本
public一个实现函数 {....}
在地图中使用了这个
.....地图(新a())
已整改。关于匿名 class.
的 spark 反序列化的一些问题