使用 spark 中的数据帧以 writetime 写入 Cassandra

Write to Cassandra with writetime using dataframe in spark

我有以下代码:-

  val kafkaStream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER)
  val collection = kafkaStream.map(_._2).map(parser)
    collection.foreachRDD(rdd =>
      {
        if (!rdd.partitions.isEmpty) {
          try {
            val dfs = rdd.toDF() 
dfs.write.format("org.apache.spark.sql.cassandra").options(Map("table" -> "tablename", "keyspace" -> "dbname"))
              .mode(SaveMode.Append).save()
          } catch {
            case e: Exception => e.printStackTrace
          }
        } else {
          println("blank rdd")
        }
      })

在上面的示例中,我使用数据帧将 spark 流保存到 cassandra。现在,我希望 df 的每一行都应该有其特定的写入时间,类似于此命令 -

insert into table (imei , date , gpsdt ) VALUES ( '1345','2010-10-12','2010-10-12 10:10:10') USING TIMESTAMP 1530313803922977;

所以基本上每行的写入时间应该等于该行的 gpsdt 列。在搜索时我发现了这个 link 但它显示了 RDD 的示例,我想要类似的数据框用例 - https://github.com/datastax/spark-cassandra-connector/blob/master/doc/5_saving.md 任何建议,谢谢

据我所知,DataFrame版本中没有这样的功能(有相应的JIRA:https://datastax-oss.atlassian.net/browse/SPARKC-416)。但是你无论如何都有 RDD,你转换成 DataFrame - 为什么不使用你引用的 link 中描述的 saveToCassandra

P.S。您在检查是否为空时可能会遇到性能问题 (http://www.waitingforcode.com/apache-spark/isEmpty-trap-spark/read)