使用 Spark DataFrame 将数据插入 Cassandra table
Inserting Data Into Cassandra table Using Spark DataFrame
我正在使用 Scala 版本 2.10.5 Cassandra 3.0 和 Spark 1.6。我想将数据插入 cassandra,所以我尝试了基本示例
scala> val collection = sc.parallelize(Seq(("cat", 30), ("fox", 40)))
scala> collection.saveToCassandra("test", "words", SomeColumns("word", "count"))
可以正常工作并能够将数据插入 Cassandra.So 我有一个 csv 文件,我想通过匹配模式
将其插入 Cassandra table
val person = sc.textFile("hdfs://localhost:9000/user/hduser/person")
import org.apache.spark.sql._
val schema = StructType(Array(StructField("firstName",StringType,true),StructField("lastName",StringType,true),StructField("age",IntegerType,true)))
val rowRDD = person.map(_.split(",")).map(p => org.apache.spark.sql.Row(p(0),p(1),p(2).toInt))
val personSchemaRDD = sqlContext.applySchema(rowRDD, schema)
personSchemaRDD.saveToCassandra
当我使用 SaveToCassndra 时,我发现 saveToCassandra 不是 personSchemaRDD 的一部分。所以被教导以不同的方式尝试
df.write.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "words_copy", "keyspace" -> "test")).save()
但是无法在 ip:port 上连接到 cassandra。谁能告诉我最好的方法。我需要定期将文件中的数据保存到 cassandra。
sqlContext.applySchema(...)
returns一个DataFrame
和一个DataFrame
没有saveToCassandra
方法。
您可以使用 .write
方法:
val personDF = sqlContext.applySchema(rowRDD, schema)
personDF.write.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "words_copy", "keyspace" -> "test")).save()
如果我们想使用savetoCassandra
方法,最好的方法是有一个模式感知的RDD,使用case class.
case class Person(firstname:String, lastName:String, age:Int)
val rowRDD = person.map(_.split(",")).map(p => Person(p(0),p(1),p(2).toInt)
rowRDD.saveToCassandra(keyspace, table)
Dataframe write
方法应该有效。检查您是否正确配置了上下文。
我将我的代码放在这里以使用 Spark Java 将 Spark 数据集保存到 Cassandra table。
private static void readBigEmptable(SparkSession sparkSession) {
String cassandraEmpColumns= "id,name,salary,age,city";
Dataset<Row> bigDataset = sparkSession.sql("select * from big_emp");
// Generate the schema for output row
List<StructField> fields = new ArrayList<>();
for (String fieldName : cassandraEmpColumns.split(",")) {
StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);
fields.add(field);
}
StructType schemaStructure = DataTypes.createStructType(fields);
// Converting big dataset to RDD to perform operation on Row field
JavaRDD<Row> bigRDD = bigDataset.toJavaRDD();
JavaRDD<Row> resultRDD = bigRDD .map(new Function<Row, Row>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public Row call(Row row) throws Exception {
// return compareField(row).iterator();
Row outputRow = RowFactory.create(row.getAs("id"), row.getAs("name"), row.getAs("salary"),
row.getAs("age"), row.getAs("city"));
return outputRow;
}
});
Dataset<Row> empDs = sparkSession.createDataFrame(resultRDD, schemaStructure);
empDs.show();
writeToCassandraTable(empDs);
}
private static void writeToCassandraTable(Dataset<Row> dataset) {
Map<String, String> tableProperties = new HashMap();
tableProperties.put("keyspace", "test_keyspace");
tableProperties.put("table", "emp_test");
tableProperties.put("confirm.truncate", "true");
dataset.write().format("org.apache.spark.sql.cassandra").options(tableProperties).mode(SaveMode.Overwrite)
.save();
}
注意:如果我们使用模式(SaveMode.Overwrite)那么我们应该使用tableProperties.put("confirm.truncate", "true"); 否则我们会收到错误信息。
SaveMode.Append
- Append模式是指在将DataFrame保存到数据源时,
如果 data/table 已经存在,则需要 DataFrame 的内容
附加到现有数据。
SaveMode.ErrorIfExists
- ErrorIfExists模式是指当保存一个DataFrame到一个数据
源,如果数据已经存在,则预计会出现异常
抛出。
SaveMode.Ignore
- Ignore模式是指在将DataFrame保存到数据源时,如果数据已经存在,则保存操作不保存DataFrame的内容,也不改变已有的数据。
SaveMode.Overwrite
- Overwrite模式是指在将DataFrame保存到数据源时,
如果 data/table 已经存在,则现有数据应该是
被 DataFrame 的内容覆盖。
我正在使用 Scala 版本 2.10.5 Cassandra 3.0 和 Spark 1.6。我想将数据插入 cassandra,所以我尝试了基本示例
scala> val collection = sc.parallelize(Seq(("cat", 30), ("fox", 40)))
scala> collection.saveToCassandra("test", "words", SomeColumns("word", "count"))
可以正常工作并能够将数据插入 Cassandra.So 我有一个 csv 文件,我想通过匹配模式
将其插入 Cassandra tableval person = sc.textFile("hdfs://localhost:9000/user/hduser/person")
import org.apache.spark.sql._
val schema = StructType(Array(StructField("firstName",StringType,true),StructField("lastName",StringType,true),StructField("age",IntegerType,true)))
val rowRDD = person.map(_.split(",")).map(p => org.apache.spark.sql.Row(p(0),p(1),p(2).toInt))
val personSchemaRDD = sqlContext.applySchema(rowRDD, schema)
personSchemaRDD.saveToCassandra
当我使用 SaveToCassndra 时,我发现 saveToCassandra 不是 personSchemaRDD 的一部分。所以被教导以不同的方式尝试
df.write.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "words_copy", "keyspace" -> "test")).save()
但是无法在 ip:port 上连接到 cassandra。谁能告诉我最好的方法。我需要定期将文件中的数据保存到 cassandra。
sqlContext.applySchema(...)
returns一个DataFrame
和一个DataFrame
没有saveToCassandra
方法。
您可以使用 .write
方法:
val personDF = sqlContext.applySchema(rowRDD, schema)
personDF.write.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "words_copy", "keyspace" -> "test")).save()
如果我们想使用savetoCassandra
方法,最好的方法是有一个模式感知的RDD,使用case class.
case class Person(firstname:String, lastName:String, age:Int)
val rowRDD = person.map(_.split(",")).map(p => Person(p(0),p(1),p(2).toInt)
rowRDD.saveToCassandra(keyspace, table)
Dataframe write
方法应该有效。检查您是否正确配置了上下文。
我将我的代码放在这里以使用 Spark Java 将 Spark 数据集保存到 Cassandra table。
private static void readBigEmptable(SparkSession sparkSession) {
String cassandraEmpColumns= "id,name,salary,age,city";
Dataset<Row> bigDataset = sparkSession.sql("select * from big_emp");
// Generate the schema for output row
List<StructField> fields = new ArrayList<>();
for (String fieldName : cassandraEmpColumns.split(",")) {
StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);
fields.add(field);
}
StructType schemaStructure = DataTypes.createStructType(fields);
// Converting big dataset to RDD to perform operation on Row field
JavaRDD<Row> bigRDD = bigDataset.toJavaRDD();
JavaRDD<Row> resultRDD = bigRDD .map(new Function<Row, Row>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public Row call(Row row) throws Exception {
// return compareField(row).iterator();
Row outputRow = RowFactory.create(row.getAs("id"), row.getAs("name"), row.getAs("salary"),
row.getAs("age"), row.getAs("city"));
return outputRow;
}
});
Dataset<Row> empDs = sparkSession.createDataFrame(resultRDD, schemaStructure);
empDs.show();
writeToCassandraTable(empDs);
}
private static void writeToCassandraTable(Dataset<Row> dataset) {
Map<String, String> tableProperties = new HashMap();
tableProperties.put("keyspace", "test_keyspace");
tableProperties.put("table", "emp_test");
tableProperties.put("confirm.truncate", "true");
dataset.write().format("org.apache.spark.sql.cassandra").options(tableProperties).mode(SaveMode.Overwrite)
.save();
}
注意:如果我们使用模式(SaveMode.Overwrite)那么我们应该使用tableProperties.put("confirm.truncate", "true"); 否则我们会收到错误信息。
SaveMode.Append
- Append模式是指在将DataFrame保存到数据源时, 如果 data/table 已经存在,则需要 DataFrame 的内容 附加到现有数据。
SaveMode.ErrorIfExists
- ErrorIfExists模式是指当保存一个DataFrame到一个数据 源,如果数据已经存在,则预计会出现异常 抛出。
SaveMode.Ignore
- Ignore模式是指在将DataFrame保存到数据源时,如果数据已经存在,则保存操作不保存DataFrame的内容,也不改变已有的数据。
SaveMode.Overwrite
- Overwrite模式是指在将DataFrame保存到数据源时, 如果 data/table 已经存在,则现有数据应该是 被 DataFrame 的内容覆盖。