使用 Spark 连接到 Cassandra

Connecting to Cassandra with Spark

首先,我购买了新的 O'Reilly Spark 书籍并尝试了那些 Cassandra 设置说明。我还在网上找到了其他 Whosebug posts 和各种 posts 和指南。 None 他们按原样工作。以下是我所能得到的。

这是一个只有少量虚拟测试数据记录的测试。我正在 运行 正在使用 plasetcassandra.org 提供的最新 Cassandra 2.0.7 Virtual Box VM,链接来自 Cassandra 项目主页。

我下载了 Spark 1.2.1 源代码并从 github 获取了最新的 Cassandra 连接器代码,并针对 Scala 2.11 进行了构建。我在 JDK 1.8.0_40 和 Mac OS 10.10.2.

上安装了 Scala 2.11.6

我 运行 已加载 cassandra 连接器的 spark shell:

bin/spark-shell --driver-class-path ../spark-cassandra-connector/spark-cassandra-connector/target/scala-2.11/spark-cassandra-connector-assembly-1.2.0-SNAPSHOT.jar

然后我对四个记录的 table 进行简单的行计数类型测试:

import com.datastax.spark.connector._
sc.stop
val conf = new org.apache.spark.SparkConf(true).set("spark.cassandra.connection.host", "192.168.56.101")
val sc = new org.apache.spark.SparkContext(conf)
val table = sc.cassandraTable("mykeyspace", "playlists")
table.count

我收到以下错误。令人困惑的是,它在尝试在 127.0.0.1 找到 Cassandra 时出错,但它也能识别我配置的主机名,即 192.168.56.101。

15/03/16 15:56:54 INFO Cluster: New Cassandra host /192.168.56.101:9042 added
15/03/16 15:56:54 INFO CassandraConnector: Connected to Cassandra cluster: Cluster on a Stick
15/03/16 15:56:54 ERROR ServerSideTokenRangeSplitter: Failure while fetching splits from Cassandra
java.io.IOException: Failed to open thrift connection to Cassandra at 127.0.0.1:9160
<snip>
java.io.IOException: Failed to fetch splits of TokenRange(0,0,Set(CassandraNode(/127.0.0.1,/127.0.0.1)),None) from all endpoints: CassandraNode(/127.0.0.1,/127.0.0.1)

顺便说一句,我还可以使用 conf/spark-defaults.conf 中的配置文件来执行上述操作,而无需 close/recreate spark 上下文或传递 --driver-clas-path 参数。不过我最终还是遇到了同样的错误,上面的步骤在这个 post.

中似乎更容易交流

有什么想法吗?

检查 cassandra 节点上 cassandra.yaml 文件中的 rpc_address 配置。火花连接器可能正在使用系统中的值。local/system.peers tables 并且它可能在您的 cassandra.yaml.

中设置为 127.0.0.1

spark 连接器使用 thrift 从 cassandra 获取令牌范围拆分。最终我打赌这将被替换,因为 C* 2.1.4 有一个新的 table 称为 system.size_estimates (CASSANDRA-7688)。看起来它正在获取主机元数据以找到最近的主机,然后在端口 9160 上使用 thrift 进行查询。