Spark 2.0.0:以集群模式从 Cassandra 读取
Spark 2.0.0 : Read from Cassandra in Cluster mode
我在使用 Spark 2.0.0 从 Cassandra 读取数据的 Spark 应用程序时遇到了一些问题。运行
我的代码工作如下:
DataFrameReader readerCassandra = SparkContextUtil.getInstance().read()
.format("org.apache.spark.sql.cassandra")
.option("spark.cassandra.connection.host", [DATABASE_IP])
.option("spark.cassandra.connection.port", [DATABASE_PORT]);
final Map<String,String> map = new HashMap<String,String>();
map.put("table", "MyTable");
map.put("keyspace", "MyKeyspace");
public final StructType schema = DataTypes.createStructType(
new StructField[] { DataTypes.createStructField("id", DataTypes.StringType, true),
DataTypes.createStructField("timestamp", DataTypes.TimestampType, true),
DataTypes.createStructField("value", DataTypes.DoubleType, true)
});
final Dataset<Row> dataset = readerCassandra.schema(schema).options(map).load();
dataset.show(false);
我想 运行 集群中的这段代码。我的集群使用 spark-2.0.2-bin-hadoop2.7(http://spark.apache.org/downloads.html 没有可用的 spark-2.0.0)。
首先,我使用以下脚本在客户端模式下提交:
#!/bin/bash
sparkMaster=local[*]
mainClass=package.MainClass
jar=/path/to/myJar-with-dependencies.jar
driverPort=7079
blockPort=7082
deployMode=client
$SPARK_HOME/bin/spark-submit \
--conf "spark.driver.port=${driverPort}"\
--conf "spark.blockManager.port=${blockPort}"\
--class $mainClass \
--master $sparkMaster \
--deploy-mode $deployMode \
--jars /path/to/jars/spark-cassandra-connector_2.11-2.0.0.jar \
$jar
当我这样做时,一切正常。但是现在,我想 运行 我的应用程序处于集群模式。
所以我修改了我的提交脚本,将 sparkMaster
设置为我的主 IP,并将 deployMode
设置为 'cluster'。
当我提交申请时,我的驱动程序日志中几乎立即出现以下错误:
Exception in thread "main" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:58)
at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
Caused by: java.lang.ClassNotFoundException: Failed to find data source: org.apache.spark.sql.cassandra. Please find packages at https://cwiki.apache.org/confluence/display/SPARK/Third+Party+Projects
at org.apache.spark.sql.execution.datasources.DataSource.lookupDataSource(DataSource.scala:148)
...
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.cassandra.DefaultSource
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
...
注意:
- 我仍然有错误,集群中只有一个 Worker 与我的 Master 在同一台机器上。
- 起初,我使用的是 Spark 2.3.1,运行在集群模式下我的代码没有问题(使用 spark-cassandra-connector_2.11-2.3.1.jar
--jars
).
- 我在
--jars
中尝试了多个罐子,例如:spark-cassandra-connector_2.11-2.0.0.jar
、spark-cassandra-connector_2.11-2.0.2.jar
、spark-cassandra-connector_2.11-2.3.1.jar
、spark-cassandra-connector-java_2.11-1.5.1.jar
,但其中 none 有效.
- 其他一些罐子在
--jars
参数中设置并被考虑在内
您可能需要将路径指定为 file:///path/to/jars/spark-cassandra-connector_2.11-2.0.0.jar
- 在这种情况下,它将通过驱动程序的 HTTP 服务器分发给执行程序。否则它希望该文件已经被您复制到所有机器以避免被进程本身复制。参见 Spark documentation for details...
我宁愿建议只创建具有所有依赖项(Spark 除外)的 uberjar,然后提交它 - 这样做会减轻痛苦。
我在使用 Spark 2.0.0 从 Cassandra 读取数据的 Spark 应用程序时遇到了一些问题。运行
我的代码工作如下:
DataFrameReader readerCassandra = SparkContextUtil.getInstance().read()
.format("org.apache.spark.sql.cassandra")
.option("spark.cassandra.connection.host", [DATABASE_IP])
.option("spark.cassandra.connection.port", [DATABASE_PORT]);
final Map<String,String> map = new HashMap<String,String>();
map.put("table", "MyTable");
map.put("keyspace", "MyKeyspace");
public final StructType schema = DataTypes.createStructType(
new StructField[] { DataTypes.createStructField("id", DataTypes.StringType, true),
DataTypes.createStructField("timestamp", DataTypes.TimestampType, true),
DataTypes.createStructField("value", DataTypes.DoubleType, true)
});
final Dataset<Row> dataset = readerCassandra.schema(schema).options(map).load();
dataset.show(false);
我想 运行 集群中的这段代码。我的集群使用 spark-2.0.2-bin-hadoop2.7(http://spark.apache.org/downloads.html 没有可用的 spark-2.0.0)。
首先,我使用以下脚本在客户端模式下提交:
#!/bin/bash
sparkMaster=local[*]
mainClass=package.MainClass
jar=/path/to/myJar-with-dependencies.jar
driverPort=7079
blockPort=7082
deployMode=client
$SPARK_HOME/bin/spark-submit \
--conf "spark.driver.port=${driverPort}"\
--conf "spark.blockManager.port=${blockPort}"\
--class $mainClass \
--master $sparkMaster \
--deploy-mode $deployMode \
--jars /path/to/jars/spark-cassandra-connector_2.11-2.0.0.jar \
$jar
当我这样做时,一切正常。但是现在,我想 运行 我的应用程序处于集群模式。
所以我修改了我的提交脚本,将 sparkMaster
设置为我的主 IP,并将 deployMode
设置为 'cluster'。
当我提交申请时,我的驱动程序日志中几乎立即出现以下错误:
Exception in thread "main" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:58)
at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
Caused by: java.lang.ClassNotFoundException: Failed to find data source: org.apache.spark.sql.cassandra. Please find packages at https://cwiki.apache.org/confluence/display/SPARK/Third+Party+Projects
at org.apache.spark.sql.execution.datasources.DataSource.lookupDataSource(DataSource.scala:148)
...
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.cassandra.DefaultSource
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
...
注意:
- 我仍然有错误,集群中只有一个 Worker 与我的 Master 在同一台机器上。
- 起初,我使用的是 Spark 2.3.1,运行在集群模式下我的代码没有问题(使用 spark-cassandra-connector_2.11-2.3.1.jar
--jars
). - 我在
--jars
中尝试了多个罐子,例如:spark-cassandra-connector_2.11-2.0.0.jar
、spark-cassandra-connector_2.11-2.0.2.jar
、spark-cassandra-connector_2.11-2.3.1.jar
、spark-cassandra-connector-java_2.11-1.5.1.jar
,但其中 none 有效. - 其他一些罐子在
--jars
参数中设置并被考虑在内
您可能需要将路径指定为 file:///path/to/jars/spark-cassandra-connector_2.11-2.0.0.jar
- 在这种情况下,它将通过驱动程序的 HTTP 服务器分发给执行程序。否则它希望该文件已经被您复制到所有机器以避免被进程本身复制。参见 Spark documentation for details...
我宁愿建议只创建具有所有依赖项(Spark 除外)的 uberjar,然后提交它 - 这样做会减轻痛苦。