将 Spark Structured Streaming 数据写入 Cassandra

Writing Spark Structure Streaming data into Cassandra

我想使用 Pyspark 将结构流数据写入 Cassandra API。

我的数据流如下:

Nifi -> Kafka -> Spark Structure Streaming -> Cassandra

我试过以下方法:

query = df.writeStream\
  .format("org.apache.spark.sql.cassandra")\
  .option("keyspace", "demo")\
  .option("table", "test")\
  .start()

但是收到以下错误信息: "org.apache.spark.sql.cassandra" 不支持流式写入。

我还尝试了另一种方法:[Source - DSE 6.0 Administrator Guide]

query = df.writeStream\
   .cassandraFormat("test", "demo")\
   .start()

但出现异常:AttributeError: 'DataStreamWriter' 对象没有属性 'cassandraFormat'

任何人都可以告诉我如何进一步进行吗?

提前致谢。

除了以下内容,您在这里无能为力:

  • 关注(并投票支持)corresponding JIRA
  • 实现所需的功能并自己打开 PR。

除此之外,您可以直接创建使用foreach sink并直接写入。

升级 DSE 6.0(最新版本)后,我可以将结构化流数据写入 Cassandra。 [Spark 2.2 & Cassandra 3.11]

参考代码:

query = fileStreamDf.writeStream\
 .option("checkpointLocation", '/tmp/check_point/')\
 .format("org.apache.spark.sql.cassandra")\
 .option("keyspace", "analytics")\
 .option("table", "test")\
 .start()

DSE 文档 URL:https://docs.datastax.com/en/dse/6.0/dse-dev/datastax_enterprise/spark/structuredStreaming.html

此答案用于将数据写入 Cassandra,而不是 DSE (which supports Structured Streaming for storing data)

对于Spark 2.4.0及更高版本,您可以使用foreachBatch方法,该方法允许您使用Spark Cassandra Connector提供的Cassandra批处理数据编写器将流式查询的每个微批处理的输出写入到卡桑德拉:

import org.apache.spark.sql.cassandra._

df.writeStream
  .foreachBatch { (batchDF, _) => 
    batchDF
     .write
     .cassandraFormat("tableName", "keyspace")
     .mode("append")
     .save
  }.start

Spark 2.4.0以下版本需要实现foreach sink。

import com.datastax.spark.connector.cql.CassandraConnector
import com.datastax.driver.core.querybuilder.QueryBuilder
import com.datastax.driver.core.Statement
import org.apache.spark.SparkConf
import org.apache.spark.sql.Row

class CassandraSink(sparkConf: SparkConf) extends ForeachWriter[Row] {
    def open(partitionId: Long, version: Long): Boolean = true

    def process(row: Row) = {
      def buildStatement: Statement =
        QueryBuilder.insertInto("keyspace", "tableName")
          .value("key", row.getAs[String]("value"))
      CassandraConnector(sparkConf).withSessionDo { session =>
        session.execute(buildStatement)
      }
    }

    def close(errorOrNull: Throwable) = Unit
}

然后你就可以使用foreach sink如下:

df.writeStream
 .foreach(new CassandraSink(spark.sparkContext.getConf))
 .start