Scala Spark 流式文件流
Scala Spark streaming fileStream
类似于 this question 我正在尝试使用 fileStream
但收到有关类型参数的编译时错误。我正在尝试使用 mahout-examples
提供的 org.apache.mahout.text.wikipedia.XmlInputFormat
作为我的 InputFormat
类型来摄取 XML 数据。
val fileStream = ssc.fileStream[LongWritable, Text, XmlInputFormat](WATCHDIR)
编译错误为:
Error:(39, 26) type arguments [org.apache.hadoop.io.LongWritable,scala.xml.Text,org.apache.mahout.text.wikipedia.XmlInputFormat] conform to the bounds of none of the overloaded alternatives of
value fileStream: [K, V, F <: org.apache.hadoop.mapreduce.InputFormat[K,V]](directory: String, filter: org.apache.hadoop.fs.Path => Boolean, newFilesOnly: Boolean, conf: org.apache.hadoop.conf.Configuration)(implicit evidence: scala.reflect.ClassTag[K], implicit evidence: scala.reflect.ClassTag[V], implicit evidence: scala.reflect.ClassTag[F])org.apache.spark.streaming.dstream.InputDStream[(K, V)] <and> [K, V, F <: org.apache.hadoop.mapreduce.InputFormat[K,V]](directory: String, filter: org.apache.hadoop.fs.Path => Boolean, newFilesOnly: Boolean)(implicit evidence: scala.reflect.ClassTag[K], implicit evidence: scala.reflect.ClassTag[V], implicit evidence: scala.reflect.ClassTag[F])org.apache.spark.streaming.dstream.InputDStream[(K, V)] <and> [K, V, F <: org.apache.hadoop.mapreduce.InputFormat[K,V]](directory: String)(implicit evidence: scala.reflect.ClassTag[K], implicit evidence: scala.reflect.ClassTag[V], implicit evidence: scala.reflect.ClassTag[F])org.apache.spark.streaming.dstream.InputDStream[(K, V)]
val fileStream = ssc.fileStream[LongWritable, Text, XmlInputFormat](WATCHDIR)
^
Error:(39, 26) wrong number of type parameters for overloaded method value fileStream with alternatives:
[K, V, F <: org.apache.hadoop.mapreduce.InputFormat[K,V]](directory: String, filter: org.apache.hadoop.fs.Path => Boolean, newFilesOnly: Boolean, conf: org.apache.hadoop.conf.Configuration)(implicit evidence: scala.reflect.ClassTag[K], implicit evidence: scala.reflect.ClassTag[V], implicit evidence: scala.reflect.ClassTag[F])org.apache.spark.streaming.dstream.InputDStream[(K, V)] <and>
[K, V, F <: org.apache.hadoop.mapreduce.InputFormat[K,V]](directory: String, filter: org.apache.hadoop.fs.Path => Boolean, newFilesOnly: Boolean)(implicit evidence: scala.reflect.ClassTag[K], implicit evidence: scala.reflect.ClassTag[V], implicit evidence: scala.reflect.ClassTag[F])org.apache.spark.streaming.dstream.InputDStream[(K, V)] <and>
[K, V, F <: org.apache.hadoop.mapreduce.InputFormat[K,V]](directory: String)(implicit evidence: scala.reflect.ClassTag[K], implicit evidence: scala.reflect.ClassTag[V], implicit evidence: scala.reflect.ClassTag[F])org.apache.spark.streaming.dstream.InputDStream[(K, V)]
val fileStream = ssc.fileStream[LongWritable, Text, XmlInputFormat](WATCHDIR)
^
我是 Scala 的新手,所以我不太熟悉 类 类型(我假设这就是这里发生的事情?)。任何帮助将不胜感激。
异常列表正在搜索 scala.xml.Text
,而您需要使用 org.apache.hadoop.io.Text
类似于 this question 我正在尝试使用 fileStream
但收到有关类型参数的编译时错误。我正在尝试使用 mahout-examples
提供的 org.apache.mahout.text.wikipedia.XmlInputFormat
作为我的 InputFormat
类型来摄取 XML 数据。
val fileStream = ssc.fileStream[LongWritable, Text, XmlInputFormat](WATCHDIR)
编译错误为:
Error:(39, 26) type arguments [org.apache.hadoop.io.LongWritable,scala.xml.Text,org.apache.mahout.text.wikipedia.XmlInputFormat] conform to the bounds of none of the overloaded alternatives of
value fileStream: [K, V, F <: org.apache.hadoop.mapreduce.InputFormat[K,V]](directory: String, filter: org.apache.hadoop.fs.Path => Boolean, newFilesOnly: Boolean, conf: org.apache.hadoop.conf.Configuration)(implicit evidence: scala.reflect.ClassTag[K], implicit evidence: scala.reflect.ClassTag[V], implicit evidence: scala.reflect.ClassTag[F])org.apache.spark.streaming.dstream.InputDStream[(K, V)] <and> [K, V, F <: org.apache.hadoop.mapreduce.InputFormat[K,V]](directory: String, filter: org.apache.hadoop.fs.Path => Boolean, newFilesOnly: Boolean)(implicit evidence: scala.reflect.ClassTag[K], implicit evidence: scala.reflect.ClassTag[V], implicit evidence: scala.reflect.ClassTag[F])org.apache.spark.streaming.dstream.InputDStream[(K, V)] <and> [K, V, F <: org.apache.hadoop.mapreduce.InputFormat[K,V]](directory: String)(implicit evidence: scala.reflect.ClassTag[K], implicit evidence: scala.reflect.ClassTag[V], implicit evidence: scala.reflect.ClassTag[F])org.apache.spark.streaming.dstream.InputDStream[(K, V)]
val fileStream = ssc.fileStream[LongWritable, Text, XmlInputFormat](WATCHDIR)
^
Error:(39, 26) wrong number of type parameters for overloaded method value fileStream with alternatives:
[K, V, F <: org.apache.hadoop.mapreduce.InputFormat[K,V]](directory: String, filter: org.apache.hadoop.fs.Path => Boolean, newFilesOnly: Boolean, conf: org.apache.hadoop.conf.Configuration)(implicit evidence: scala.reflect.ClassTag[K], implicit evidence: scala.reflect.ClassTag[V], implicit evidence: scala.reflect.ClassTag[F])org.apache.spark.streaming.dstream.InputDStream[(K, V)] <and>
[K, V, F <: org.apache.hadoop.mapreduce.InputFormat[K,V]](directory: String, filter: org.apache.hadoop.fs.Path => Boolean, newFilesOnly: Boolean)(implicit evidence: scala.reflect.ClassTag[K], implicit evidence: scala.reflect.ClassTag[V], implicit evidence: scala.reflect.ClassTag[F])org.apache.spark.streaming.dstream.InputDStream[(K, V)] <and>
[K, V, F <: org.apache.hadoop.mapreduce.InputFormat[K,V]](directory: String)(implicit evidence: scala.reflect.ClassTag[K], implicit evidence: scala.reflect.ClassTag[V], implicit evidence: scala.reflect.ClassTag[F])org.apache.spark.streaming.dstream.InputDStream[(K, V)]
val fileStream = ssc.fileStream[LongWritable, Text, XmlInputFormat](WATCHDIR)
^
我是 Scala 的新手,所以我不太熟悉 类 类型(我假设这就是这里发生的事情?)。任何帮助将不胜感激。
异常列表正在搜索 scala.xml.Text
,而您需要使用 org.apache.hadoop.io.Text