Cassandra 与 Apache Spark 的集成
Cassandra Integration with Apache Spark
我正在尝试运行下面提供的代码link:
http://www.datastax.com/dev/blog/accessing-cassandra-from-spark-in-java
代码如下:
import java.io.Serializable;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import com.datastax.driver.core.Session;
import com.datastax.spark.connector.cql.CassandraConnector;
public class JavaDemo implements Serializable {
private transient SparkConf conf;
private JavaDemo(SparkConf conf) {
this.conf = conf;
}
private void run() {
JavaSparkContext sc = new JavaSparkContext(conf);
generateData(sc);
compute(sc);
showResults(sc);
sc.stop();
}
private void generateData(JavaSparkContext sc) {
CassandraConnector connector = CassandraConnector.apply(sc.getConf());
try (Session session = connector.openSession()) {
session.execute("DROP KEYSPACE IF EXISTS java_api");
session.execute("CREATE KEYSPACE java_api WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}");
session.execute("CREATE TABLE java_api.products (id INT PRIMARY KEY, name TEXT, parents LIST<INT>)");
session.execute("CREATE TABLE java_api.sales (id UUID PRIMARY KEY, product INT, price DECIMAL)");
session.execute("CREATE TABLE java_api.summaries (product INT PRIMARY KEY, summary DECIMAL)");
}
}
private void compute(JavaSparkContext sc) {
}
private void showResults(JavaSparkContext sc) {
}
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setAppName("Java API demo");
conf.setMaster("local");
conf.set("spark.cassandra.connection.host", "XX.XX.XX.XX");
JavaDemo app = new JavaDemo(conf);
app.run();
}
}
我收到以下错误:
Exception in thread "main" java.io.IOException: Failed to open native connection to Cassandra at {<Cassandra IP>}:9042<
at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:176)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun.apply(CassandraConnector.scala:162)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun.apply(CassandraConnector.scala:162)
at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:31)
at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:56)
at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:73)
at JavaDemo.generateData(JavaDemo.java:28)
at JavaDemo.run(JavaDemo.java:18)
at JavaDemo.main(JavaDemo.java:52)
Caused by: java.lang.IllegalArgumentException: Contact points contain multiple data centers:
at com.datastax.spark.connector.cql.LocalNodeFirstLoadBalancingPolicy.init(LocalNodeFirstLoadBalancingPolicy.scala:47)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1024)
at com.datastax.driver.core.Cluster.getMetadata(Cluster.java:270)
at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:169)
有什么我可以做的。我已经尝试 运行ning Java 与 Cassandra 的连接,这似乎工作正常
将项目转换为 Maven 项目,并尝试 运行,这以某种方式解决了问题。使用的 Cassandra 版本:2.1.2。我认为这是一个版本不匹配问题
POM.xml下面提供的内容
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.mycompany.app</groupId>
<artifactId>my-app</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>Maven Quick Start Archetype</name>
<dependencies>
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector_2.10</artifactId>
<version>1.2.0-rc3</version>
</dependency>
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector-java_2.10</artifactId>
<version>1.2.0-rc3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>1.2.0</version>
</dependency>
</dependencies>
</project>
我正在尝试运行下面提供的代码link: http://www.datastax.com/dev/blog/accessing-cassandra-from-spark-in-java
代码如下:
import java.io.Serializable;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import com.datastax.driver.core.Session;
import com.datastax.spark.connector.cql.CassandraConnector;
public class JavaDemo implements Serializable {
private transient SparkConf conf;
private JavaDemo(SparkConf conf) {
this.conf = conf;
}
private void run() {
JavaSparkContext sc = new JavaSparkContext(conf);
generateData(sc);
compute(sc);
showResults(sc);
sc.stop();
}
private void generateData(JavaSparkContext sc) {
CassandraConnector connector = CassandraConnector.apply(sc.getConf());
try (Session session = connector.openSession()) {
session.execute("DROP KEYSPACE IF EXISTS java_api");
session.execute("CREATE KEYSPACE java_api WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}");
session.execute("CREATE TABLE java_api.products (id INT PRIMARY KEY, name TEXT, parents LIST<INT>)");
session.execute("CREATE TABLE java_api.sales (id UUID PRIMARY KEY, product INT, price DECIMAL)");
session.execute("CREATE TABLE java_api.summaries (product INT PRIMARY KEY, summary DECIMAL)");
}
}
private void compute(JavaSparkContext sc) {
}
private void showResults(JavaSparkContext sc) {
}
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setAppName("Java API demo");
conf.setMaster("local");
conf.set("spark.cassandra.connection.host", "XX.XX.XX.XX");
JavaDemo app = new JavaDemo(conf);
app.run();
}
}
我收到以下错误:
Exception in thread "main" java.io.IOException: Failed to open native connection to Cassandra at {<Cassandra IP>}:9042<
at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:176)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun.apply(CassandraConnector.scala:162)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun.apply(CassandraConnector.scala:162)
at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:31)
at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:56)
at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:73)
at JavaDemo.generateData(JavaDemo.java:28)
at JavaDemo.run(JavaDemo.java:18)
at JavaDemo.main(JavaDemo.java:52)
Caused by: java.lang.IllegalArgumentException: Contact points contain multiple data centers:
at com.datastax.spark.connector.cql.LocalNodeFirstLoadBalancingPolicy.init(LocalNodeFirstLoadBalancingPolicy.scala:47)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1024)
at com.datastax.driver.core.Cluster.getMetadata(Cluster.java:270)
at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:169)
有什么我可以做的。我已经尝试 运行ning Java 与 Cassandra 的连接,这似乎工作正常
将项目转换为 Maven 项目,并尝试 运行,这以某种方式解决了问题。使用的 Cassandra 版本:2.1.2。我认为这是一个版本不匹配问题
POM.xml下面提供的内容
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.mycompany.app</groupId>
<artifactId>my-app</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>Maven Quick Start Archetype</name>
<dependencies>
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector_2.10</artifactId>
<version>1.2.0-rc3</version>
</dependency>
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector-java_2.10</artifactId>
<version>1.2.0-rc3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>1.2.0</version>
</dependency>
</dependencies>
</project>