通过 Java + Spark + SparkSession 在 Cassandra table 中 insert/update 行的最佳方法是什么

What is the best way to insert/update rows in Cassandra table via Java + Spark + SparkSession

这是通过Java + Spark + SparkSession:

从cassandra table获取数据的方法
SparkSession spark = SparkSession
          .builder()
          .appName("JavaDemoDataSet")
          .config("spark.sql.warehouse.dir", "/file:C:/temp")
          .config("spark.cassandra.connection.host", "127.0.0.1")
          .config("spark.cassandra.connection.port", "9042")
          .master("local[2]")
          .getOrCreate();

 Dataset<Row> dataset = spark.read()
        .format("org.apache.spark.sql.cassandra")
        .options(new HashMap<String, String>() {
            {
                put("keyspace", "chat");
                put("table", "dictionary");
            }
        })
        .load()
        .filter("value_id BETWEEN 1 AND 5");

但是当我研究如何在此 table 中添加或修改行(至少 1 行)时 - 我找不到最好的方法。 例如,我正在使用 GUI 开发简单的应用程序,我需要向 "Dictionary" table 添加一个新值。所以,在这种情况下,从我的角度来看——我不需要数据集来做到这一点。

当我研究如何通过 SparkSession 添加一行时 - 我找不到 Java+Spark+Sparksession 示例如何做到这一点。 我绝对可以通过 Statement 使用 CQL 语句来做到这一点,但是哪种方法最适合更新或添加 1 或 2 行?特别是当我使用 SparkSession 阅读它们时。

如果可能的话,我会非常感谢示例(甚至是超链接,我研究了很多,但可能我错过了一些重要的东西),因为我对这一切都很陌生。

谢谢!

我强烈建议不要使用 Spark 进行单行更新。内置的连接器方法面向大量数据,单行更改可能效率很低。您最好直接使用驱动程序或使用 CassandraConnector 接口。

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/1_connecting.md#connecting-manually-to-cassandra

这是使用 Java+SparkSession+CassandraConnector 保存和读取的示例。

public class SparkCassandraDatasetApplication {
public static void main(String[] args) {
     SparkSession spark = SparkSession
      .builder()
      .appName("SparkCassandraDatasetApplication")
      .config("spark.sql.warehouse.dir", "/file:C:/temp")
      .config("spark.cassandra.connection.host", "127.0.0.1")
      .config("spark.cassandra.connection.port", "9042")
      .master("local")
      .getOrCreate();

    //Data
    MyData data = new MyData();
    data.setId("111");
    data.setUsername("userOne");
    List<MyData> users = Arrays.asList(data);
    Dataset<MyData> datasetWrite = spark.createDataset(users, Encoders.bean(MyData.class));

    //Save data to Cassandra
    datasetWrite.write().format("org.apache.spark.sql.cassandra").options(new HashMap<String, String>() {
        {
            put("keyspace", "mykeyspace");
            put("table", "mytable");
        }
    }).mode(SaveMode.Append).save();

    //Read data back
    Dataset<Row> datasetRead = spark.read().format("org.apache.spark.sql.cassandra")
            .options(new HashMap<String, String>() {
                {
                    put("keyspace", "mykeyspace");
                    put("table", "mytable");
                }
            }).load();

    datasetRead.show();
    spark.stop();
   }
}