Zeppelin 在 Spark-Cassandra 系统上的问题:Classnotfoundexception

Issue with Zeppelin on Spark-Cassandra system: Classnotfoundexception

我最近开始在 Spark-Cassandra 集群(Master + 3 Worker)系统上使用 zeppelin 来 运行 使用 MLlib 库的简单机器学习算法。

这是我加载到 zeppelin 的库:

%dep
z.load("com.datastax.spark:spark-cassandra-connector_2.10:1.4.0-M1")
z.load("org.apache.spark:spark-core_2.10:1.4.1")
z.load("com.datastax.cassandra:cassandra-driver-core:2.1.3")
z.load("org.apache.thrift:libthrift:0.9.2")
z.load("org.apache.spark:spark-mllib_2.10:1.4.0")
z.load("cassandra-clientutil-2.1.3.jar")
z.load("joda-time-2.3.jar")

我试图实现一个线性回归脚本。但是,当我 运行 它时,我收到以下错误消息:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 192.xxx.xxx.xxx): java.lang.ClassNotFoundException: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun
at java.net.URLClassLoader.run(URLClassLoader.java:372)
at java.net.URLClassLoader.run(URLClassLoader.java:361)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:344)
at org.apache.spark.serializer.JavaDeserializationStream$$anon.resolveClass(JavaSerializer.scala:66)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
...

问题是脚本 运行 使用 spark-submit 脚本没有问题,这让我很困惑。

这是我试图执行的一些代码:

import org.apache.spark.SparkContext

import org.apache.spark.SparkConf

import com.datastax.spark.connector._

import com.datastax.spark.connector.cql.CassandraConnector

import org.apache.spark.mllib.regression.{LinearRegressionWithSGD, LinearRegressionModel, LabeledPoint}

import org.apache.spark.rdd.RDD



    sc.stop()
    val conf = new SparkConf(true).set("spark.cassandra.connection.host", "xxx.xxx.xxx.xxx").setMaster("spark://xxx.xxx.xxx.xxx:7077").setAppName("DEMONSTRATION")

    val sc = new SparkContext(conf)

   case class Fact(numdoc:String, numl:String, year:String, creator:Double, date:Double, day:Double, user:Double, workingday:Double, total:String) 

    val data= sc.textFile("~/Input/Data.csv »)

    val parsed = data.filter(!_.isEmpty).map {row => 
        val splitted = row.split(",")
        val Array(nd, nl, yr)=splitted.slice(0,3)
        val Array(cr, dt, wd, us, wod)=splitted.slice(3,8).map(_.toDouble)
        Fact (nd, nl, yr, cr, dt, wd, us, wod, splitted(8))
    }

    val class2id = parsed.map(_.total.toDouble).distinct.collect.zipWithIndex.map{case (k,v) => (k, v.toDouble)}.toMap

    val id2class = class2id.map(_.swap)

    val parsedData = parsed.map { i => LabaledPoint(class2id(i.total.toDouble), Array(i.creator,i.date,i.day,i.workingday))

    val model: LinearRegressionModel = LinearRegressionWithSGD.train(parsedData, 3)

提前致谢!

我终于找到了解决办法! 事实上,我不应该在一开始就停止 SparkContext 并创建一个新的。但是,在那种情况下,我无法在远程机器上访问 Cassandra,因为默认情况下,zeppelin 使用安装它的机器的地址作为 Cassandra 主机的地址。所以我在那里安装了一个新的 Cassandra 实例,并将其添加到我的初始集群中,问题就解决了。