Utils.Scala Databricks ~ Scala 中的 Twitter 流

Utils.Scala Databricks ~ Twitter Stream in Scala

我在 IntelliJ 中工作并收到此错误:

Error:(118, 13) not found: value Utils
        Utils.parseCommandLineWithTwitterCredentials(args)
        ^

This 是试图引用的代码。

如果我将这段代码直接放入我的 main 中,错误就会解决。


这是我的 build.sbt 文件:

name := "TwtrStream"

version := "1.0"

scalaVersion := "2.10.4"

libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.5.2"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.5.2"
libraryDependencies += "org.apache.spark" % "spark-sql_2.10" % "1.5.2"
libraryDependencies += "org.apache.spark" % "spark-mllib_2.10" % "1.5.2"
libraryDependencies += "org.apache.spark" % "spark-streaming-twitter_2.10" % "1.5.2"
libraryDependencies += "com.google.code.gson" % "gson" % "2.3.1"

这是我收到错误的代码:Utils.IntParam 没有解决

object Collect {
    private var numTweetsCollected = 0L
    private var partNum = 0
    private var gson = new Gson()

    def main(args: Array[String]) {
      // Process program arguments and set properties
      if (args.length < 3) {
        System.err.println("Usage: " + this.getClass.getSimpleName +
          "<outputDirectory> <numTweetsToCollect> <intervalInSeconds> <partitionsEachInterval>")
        System.exit(1)
      }
      val Array(outputDirectory, Utils.IntParam(numTweetsToCollect),  Utils.IntParam(intervalSecs), Utils.IntParam(partitionsEachInterval)) =
        Utils.parseCommandLineWithTwitterCredentials(args)
      /*
      https://github.com/databricks/reference-apps/blob/master/twitter_classifier/scala/src/main/scala/com/databricks/apps/twitter_classifier/Utils.scala
       */
      val outputDir = new File(outputDirectory.toString)
      if (outputDir.exists()) {
        System.err.println("ERROR - %s already exists: delete or specify another directory".format(
          outputDirectory))
        System.exit(1)
      }
      outputDir.mkdirs()

      println("Initializing Streaming Spark Context...")
      val conf = new SparkConf().setAppName(this.getClass.getSimpleName)
      val sc = new SparkContext(conf)
      val ssc = new StreamingContext(sc, Seconds(intervalSecs))

      val tweetStream = TwitterUtils.createStream(ssc, Utils.getAuth)
        .map(gson.toJson(_))

      tweetStream.foreachRDD((rdd, time) => {
        val count = rdd.count()
        if (count > 0) {
          val outputRDD = rdd.repartition(partitionsEachInterval)
          outputRDD.saveAsTextFile(outputDirectory + "/tweets_" + time.milliseconds.toString)
          numTweetsCollected += count
          if (numTweetsCollected > numTweetsToCollect) {
            System.exit(0)
          }
        }
      })
    }
  }

实际上,问题是您正在尝试访问一个 class,它不在您的项目中,也不在您的依赖项列表中的任何库中。

因此您需要将 class 添加到您的项目或创建一个类似的项目。

your.package
|_ Collect.scala
|_ Utils.scala

现在您可以在您的 Collect 对象代码中使用它了。