Utils.Scala Databricks ~ Scala 中的 Twitter 流
Utils.Scala Databricks ~ Twitter Stream in Scala
我在 IntelliJ 中工作并收到此错误:
Error:(118, 13) not found: value Utils
Utils.parseCommandLineWithTwitterCredentials(args)
^
This 是试图引用的代码。
如果我将这段代码直接放入我的 main 中,错误就会解决。
这是我的 build.sbt 文件:
name := "TwtrStream"
version := "1.0"
scalaVersion := "2.10.4"
libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.5.2"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.5.2"
libraryDependencies += "org.apache.spark" % "spark-sql_2.10" % "1.5.2"
libraryDependencies += "org.apache.spark" % "spark-mllib_2.10" % "1.5.2"
libraryDependencies += "org.apache.spark" % "spark-streaming-twitter_2.10" % "1.5.2"
libraryDependencies += "com.google.code.gson" % "gson" % "2.3.1"
这是我收到错误的代码:Utils.IntParam 没有解决
object Collect {
private var numTweetsCollected = 0L
private var partNum = 0
private var gson = new Gson()
def main(args: Array[String]) {
// Process program arguments and set properties
if (args.length < 3) {
System.err.println("Usage: " + this.getClass.getSimpleName +
"<outputDirectory> <numTweetsToCollect> <intervalInSeconds> <partitionsEachInterval>")
System.exit(1)
}
val Array(outputDirectory, Utils.IntParam(numTweetsToCollect), Utils.IntParam(intervalSecs), Utils.IntParam(partitionsEachInterval)) =
Utils.parseCommandLineWithTwitterCredentials(args)
/*
https://github.com/databricks/reference-apps/blob/master/twitter_classifier/scala/src/main/scala/com/databricks/apps/twitter_classifier/Utils.scala
*/
val outputDir = new File(outputDirectory.toString)
if (outputDir.exists()) {
System.err.println("ERROR - %s already exists: delete or specify another directory".format(
outputDirectory))
System.exit(1)
}
outputDir.mkdirs()
println("Initializing Streaming Spark Context...")
val conf = new SparkConf().setAppName(this.getClass.getSimpleName)
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(intervalSecs))
val tweetStream = TwitterUtils.createStream(ssc, Utils.getAuth)
.map(gson.toJson(_))
tweetStream.foreachRDD((rdd, time) => {
val count = rdd.count()
if (count > 0) {
val outputRDD = rdd.repartition(partitionsEachInterval)
outputRDD.saveAsTextFile(outputDirectory + "/tweets_" + time.milliseconds.toString)
numTweetsCollected += count
if (numTweetsCollected > numTweetsToCollect) {
System.exit(0)
}
}
})
}
}
实际上,问题是您正在尝试访问一个 class,它不在您的项目中,也不在您的依赖项列表中的任何库中。
因此您需要将 class 添加到您的项目或创建一个类似的项目。
your.package
|_ Collect.scala
|_ Utils.scala
现在您可以在您的 Collect 对象代码中使用它了。
我在 IntelliJ 中工作并收到此错误:
Error:(118, 13) not found: value Utils
Utils.parseCommandLineWithTwitterCredentials(args)
^
This 是试图引用的代码。
如果我将这段代码直接放入我的 main 中,错误就会解决。
这是我的 build.sbt 文件:
name := "TwtrStream"
version := "1.0"
scalaVersion := "2.10.4"
libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.5.2"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.5.2"
libraryDependencies += "org.apache.spark" % "spark-sql_2.10" % "1.5.2"
libraryDependencies += "org.apache.spark" % "spark-mllib_2.10" % "1.5.2"
libraryDependencies += "org.apache.spark" % "spark-streaming-twitter_2.10" % "1.5.2"
libraryDependencies += "com.google.code.gson" % "gson" % "2.3.1"
这是我收到错误的代码:Utils.IntParam 没有解决
object Collect {
private var numTweetsCollected = 0L
private var partNum = 0
private var gson = new Gson()
def main(args: Array[String]) {
// Process program arguments and set properties
if (args.length < 3) {
System.err.println("Usage: " + this.getClass.getSimpleName +
"<outputDirectory> <numTweetsToCollect> <intervalInSeconds> <partitionsEachInterval>")
System.exit(1)
}
val Array(outputDirectory, Utils.IntParam(numTweetsToCollect), Utils.IntParam(intervalSecs), Utils.IntParam(partitionsEachInterval)) =
Utils.parseCommandLineWithTwitterCredentials(args)
/*
https://github.com/databricks/reference-apps/blob/master/twitter_classifier/scala/src/main/scala/com/databricks/apps/twitter_classifier/Utils.scala
*/
val outputDir = new File(outputDirectory.toString)
if (outputDir.exists()) {
System.err.println("ERROR - %s already exists: delete or specify another directory".format(
outputDirectory))
System.exit(1)
}
outputDir.mkdirs()
println("Initializing Streaming Spark Context...")
val conf = new SparkConf().setAppName(this.getClass.getSimpleName)
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(intervalSecs))
val tweetStream = TwitterUtils.createStream(ssc, Utils.getAuth)
.map(gson.toJson(_))
tweetStream.foreachRDD((rdd, time) => {
val count = rdd.count()
if (count > 0) {
val outputRDD = rdd.repartition(partitionsEachInterval)
outputRDD.saveAsTextFile(outputDirectory + "/tweets_" + time.milliseconds.toString)
numTweetsCollected += count
if (numTweetsCollected > numTweetsToCollect) {
System.exit(0)
}
}
})
}
}
实际上,问题是您正在尝试访问一个 class,它不在您的项目中,也不在您的依赖项列表中的任何库中。
因此您需要将 class 添加到您的项目或创建一个类似的项目。
your.package
|_ Collect.scala
|_ Utils.scala
现在您可以在您的 Collect 对象代码中使用它了。