通过 Java + Spark + SparkSession 在 Cassandra table 中 insert/update 行的最佳方法是什么
What is the best way to insert/update rows in Cassandra table via Java + Spark + SparkSession
这是通过Java + Spark + SparkSession:
从cassandra table获取数据的方法
SparkSession spark = SparkSession
.builder()
.appName("JavaDemoDataSet")
.config("spark.sql.warehouse.dir", "/file:C:/temp")
.config("spark.cassandra.connection.host", "127.0.0.1")
.config("spark.cassandra.connection.port", "9042")
.master("local[2]")
.getOrCreate();
Dataset<Row> dataset = spark.read()
.format("org.apache.spark.sql.cassandra")
.options(new HashMap<String, String>() {
{
put("keyspace", "chat");
put("table", "dictionary");
}
})
.load()
.filter("value_id BETWEEN 1 AND 5");
但是当我研究如何在此 table 中添加或修改行(至少 1 行)时 - 我找不到最好的方法。
例如,我正在使用 GUI 开发简单的应用程序,我需要向 "Dictionary" table 添加一个新值。所以,在这种情况下,从我的角度来看——我不需要数据集来做到这一点。
当我研究如何通过 SparkSession 添加一行时 - 我找不到 Java+Spark+Sparksession 示例如何做到这一点。
我绝对可以通过 Statement 使用 CQL 语句来做到这一点,但是哪种方法最适合更新或添加 1 或 2 行?特别是当我使用 SparkSession 阅读它们时。
如果可能的话,我会非常感谢示例(甚至是超链接,我研究了很多,但可能我错过了一些重要的东西),因为我对这一切都很陌生。
谢谢!
我强烈建议不要使用 Spark 进行单行更新。内置的连接器方法面向大量数据,单行更改可能效率很低。您最好直接使用驱动程序或使用 CassandraConnector 接口。
这是使用 Java+SparkSession+CassandraConnector 保存和读取的示例。
public class SparkCassandraDatasetApplication {
public static void main(String[] args) {
SparkSession spark = SparkSession
.builder()
.appName("SparkCassandraDatasetApplication")
.config("spark.sql.warehouse.dir", "/file:C:/temp")
.config("spark.cassandra.connection.host", "127.0.0.1")
.config("spark.cassandra.connection.port", "9042")
.master("local")
.getOrCreate();
//Data
MyData data = new MyData();
data.setId("111");
data.setUsername("userOne");
List<MyData> users = Arrays.asList(data);
Dataset<MyData> datasetWrite = spark.createDataset(users, Encoders.bean(MyData.class));
//Save data to Cassandra
datasetWrite.write().format("org.apache.spark.sql.cassandra").options(new HashMap<String, String>() {
{
put("keyspace", "mykeyspace");
put("table", "mytable");
}
}).mode(SaveMode.Append).save();
//Read data back
Dataset<Row> datasetRead = spark.read().format("org.apache.spark.sql.cassandra")
.options(new HashMap<String, String>() {
{
put("keyspace", "mykeyspace");
put("table", "mytable");
}
}).load();
datasetRead.show();
spark.stop();
}
}
这是通过Java + Spark + SparkSession:
从cassandra table获取数据的方法SparkSession spark = SparkSession
.builder()
.appName("JavaDemoDataSet")
.config("spark.sql.warehouse.dir", "/file:C:/temp")
.config("spark.cassandra.connection.host", "127.0.0.1")
.config("spark.cassandra.connection.port", "9042")
.master("local[2]")
.getOrCreate();
Dataset<Row> dataset = spark.read()
.format("org.apache.spark.sql.cassandra")
.options(new HashMap<String, String>() {
{
put("keyspace", "chat");
put("table", "dictionary");
}
})
.load()
.filter("value_id BETWEEN 1 AND 5");
但是当我研究如何在此 table 中添加或修改行(至少 1 行)时 - 我找不到最好的方法。 例如,我正在使用 GUI 开发简单的应用程序,我需要向 "Dictionary" table 添加一个新值。所以,在这种情况下,从我的角度来看——我不需要数据集来做到这一点。
当我研究如何通过 SparkSession 添加一行时 - 我找不到 Java+Spark+Sparksession 示例如何做到这一点。 我绝对可以通过 Statement 使用 CQL 语句来做到这一点,但是哪种方法最适合更新或添加 1 或 2 行?特别是当我使用 SparkSession 阅读它们时。
如果可能的话,我会非常感谢示例(甚至是超链接,我研究了很多,但可能我错过了一些重要的东西),因为我对这一切都很陌生。
谢谢!
我强烈建议不要使用 Spark 进行单行更新。内置的连接器方法面向大量数据,单行更改可能效率很低。您最好直接使用驱动程序或使用 CassandraConnector 接口。
这是使用 Java+SparkSession+CassandraConnector 保存和读取的示例。
public class SparkCassandraDatasetApplication {
public static void main(String[] args) {
SparkSession spark = SparkSession
.builder()
.appName("SparkCassandraDatasetApplication")
.config("spark.sql.warehouse.dir", "/file:C:/temp")
.config("spark.cassandra.connection.host", "127.0.0.1")
.config("spark.cassandra.connection.port", "9042")
.master("local")
.getOrCreate();
//Data
MyData data = new MyData();
data.setId("111");
data.setUsername("userOne");
List<MyData> users = Arrays.asList(data);
Dataset<MyData> datasetWrite = spark.createDataset(users, Encoders.bean(MyData.class));
//Save data to Cassandra
datasetWrite.write().format("org.apache.spark.sql.cassandra").options(new HashMap<String, String>() {
{
put("keyspace", "mykeyspace");
put("table", "mytable");
}
}).mode(SaveMode.Append).save();
//Read data back
Dataset<Row> datasetRead = spark.read().format("org.apache.spark.sql.cassandra")
.options(new HashMap<String, String>() {
{
put("keyspace", "mykeyspace");
put("table", "mytable");
}
}).load();
datasetRead.show();
spark.stop();
}
}