如何启用从 Cassandra 到 Spark 的流式传输?

How to enable streaming from Cassandra to Spark?

我有以下 spark 作业

from __future__ import print_function

import os
import sys
import time
from random import random
from operator import add
from pyspark.streaming import StreamingContext
from pyspark import SparkContext,SparkConf
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import SQLContext, Row
from pyspark.streaming import StreamingContext
from pyspark_cassandra import streaming,CassandraSparkContext

if __name__ == "__main__":

    conf = SparkConf().setAppName("PySpark Cassandra Test")
    sc = CassandraSparkContext(conf=conf)
    stream = StreamingContext(sc, 2)

    rdd=sc.cassandraTable("keyspace2","users").collect()
    #print rdd
    stream.start()
    stream.awaitTermination()
    sc.stop() 

当我 运行 这个时,它给了我以下 error:

ERROR StreamingContext: Error starting the context, marking it as stopped
java.lang.IllegalArgumentException: requirement failed: \
No output operations registered, so nothing to execute

shell脚本我运行:

./bin/spark-submit --packages TargetHolding:pyspark-cassandra:0.2.4 example
s/src/main/python/test/reading-cassandra.py

将 spark streaming 与 kafka 进行比较,我在上面的代码中缺少这一行:

kafkaStream = KafkaUtils.createStream(stream, 'localhost:2181', "name", {'topic':1})

我实际使用的地方 createStream 但对于 cassandra,我在文档中看不到这样的内容。如何启动 spark streaming 和 cassandra 之间的流式传输?

版本:

Cassandra v2.1.12
Spark v1.4.1
Scala 2.10

要从 Cassandra table 创建 DStream,您可以使用 ConstantInputDStream 提供从 Cassandra table 创建的 RDD 作为输入。这将导致在每个 DStream 间隔上具体化 RDD。

请注意,持续增长的大 table 或 table 会对流式处理作业的性能产生负面影响。

另请参见: 示例。