SparkStreaming:fileStream() 中的错误
SparkStreaming: error in fileStream()
我正在尝试在 Scala 中实现 Spark Streaming 应用程序。我想使用 fileStream() 方法来处理新到达的文件以及 hadoop 目录中存在的旧文件。
我从 Whosebug 的两个线程中跟踪了 fileStream() 的实现:
- spark streaming fileStream
我正在使用 fileStream() 如下:
val linesRDD = ssc.fileStream[LongWritable, Text, TextInputFormat](inputDirectory, (t: org.apache.hadoop.fs.Path) => true, false).map(_._2.toString)
但我收到如下错误信息:
type arguments [org.apache.hadoop.io.LongWritable,org.apache.hadoop.io.Text,
org.apache.hadoop.mapred.TextInputFormat] conform to the bounds of none of the overloaded alternatives of value fileStream: [K, V, F <: org.apache.hadoop.mapreduce.InputFormat[K,V]](directory: String, filter: org.apache.hadoop.fs.Path ⇒ Boolean, newFilesOnly: Boolean, conf: org.apache.hadoop.conf.Configuration)(implicit evidence: scala.reflect.ClassTag[K], implicit evidence: scala.reflect.ClassTag[V], implicit evidence: scala.reflect.ClassTag[F])
org.apache.spark.streaming.dstream.InputDStream[(K, V)] <and>
[K, V, F <: org.apache.hadoop.mapreduce.InputFormat[K,V]](directory:
String, filter: org.apache.hadoop.fs.Path ⇒ Boolean, newFilesOnly: Boolean)(implicit evidence: scala.reflect.ClassTag[K], implicit evidence: scala.reflect.ClassTag[V],
implicit evidence: scala.reflect.ClassTag[F])
org.apache.spark.streaming.dstream.InputDStream[(K, V)] <and> [K, V, F <: org.apache.hadoop.mapreduce.InputFormat[K,V]](directory: String)(implicit evidence: scala.reflect.ClassTag[K], implicit evidence: scala.reflect.ClassTag[V], implicit evidence: scala.reflect.ClassTag[F])
org.apache.spark.streaming.dstream.InputDStream[(K, V)]
wrong number of type parameters for overloaded method value fileStream with alternatives:
[K, V, F <: org.apache.hadoop.mapreduce.InputFormat[K,V]](directory: String, filter: org.apache.hadoop.fs.Path ⇒ Boolean, newFilesOnly: Boolean, conf: org.apache.hadoop.conf.Configuration)(implicit evidence: scala.reflect.ClassTag[K], implicit evidence: scala.reflect.ClassTag[V], implicit evidence: scala.reflect.ClassTag[F])
org.apache.spark.streaming.dstream.InputDStream[(K, V)] <and> [K, V, F <: org.apache.hadoop.mapreduce.InputFormat[K,V]](directory: String, filter: org.apache.hadoop.fs.Path ⇒ Boolean, newFilesOnly: Boolean)(implicit evidence: scala.reflect.ClassTag[K], implicit evidence: scala.reflect.ClassTag[V], implicit evidence: scala.reflect.ClassTag[F])
org.apache.spark.streaming.dstream.InputDStream[(K, V)] <and>
[K, V, F <: org.apache.hadoop.mapreduce.InputFormat[K,V]](directory: String)(implicit evidence: scala.reflect.ClassTag[K], implicit evidence: scala.reflect.ClassTag[V], implicit evidence: scala.reflect.ClassTag[F])
org.apache.spark.streaming.dstream.InputDStream[(K, V)]
我正在使用 spark 1.4.1 和 hadoop 2.7.1。在发布这个问题之前,我已经查看了通过 Whosebug 和 spark 文档讨论的不同实现,但没有任何帮助。任何帮助将不胜感激。
谢谢
罗杰尼什
请在下面找到示例 java 代码,导入正确,对我来说工作正常
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
JavaStreamingContext jssc = SparkUtils.getStreamingContext("key", jsc);
// JavaDStream<String> rawInput = jssc.textFileStream(inputPath);
JavaPairInputDStream<LongWritable, Text> inputStream = jssc.fileStream(
inputPath, LongWritable.class, Text.class,
TextInputFormat.class, new Function<Path, Boolean>() {
@Override
public Boolean call(Path v1) throws Exception {
if ( v1.getName().contains("COPYING") ) {
// This eliminates staging files.
return Boolean.FALSE;
}
return Boolean.TRUE;
}
}, true);
JavaDStream<String> rawInput = inputStream.map(
new Function<Tuple2<LongWritable, Text>, String>() {
@Override
public String call(Tuple2<LongWritable, Text> v1) throws Exception {
return v1._2().toString();
}
});
log.info(tracePrefix + "Created the stream, Window Interval: " + windowInterval + ", Slide interval: " + slideInterval);
rawInput.print();
我正在尝试在 Scala 中实现 Spark Streaming 应用程序。我想使用 fileStream() 方法来处理新到达的文件以及 hadoop 目录中存在的旧文件。
我从 Whosebug 的两个线程中跟踪了 fileStream() 的实现:
- spark streaming fileStream
我正在使用 fileStream() 如下:
val linesRDD = ssc.fileStream[LongWritable, Text, TextInputFormat](inputDirectory, (t: org.apache.hadoop.fs.Path) => true, false).map(_._2.toString)
但我收到如下错误信息:
type arguments [org.apache.hadoop.io.LongWritable,org.apache.hadoop.io.Text,
org.apache.hadoop.mapred.TextInputFormat] conform to the bounds of none of the overloaded alternatives of value fileStream: [K, V, F <: org.apache.hadoop.mapreduce.InputFormat[K,V]](directory: String, filter: org.apache.hadoop.fs.Path ⇒ Boolean, newFilesOnly: Boolean, conf: org.apache.hadoop.conf.Configuration)(implicit evidence: scala.reflect.ClassTag[K], implicit evidence: scala.reflect.ClassTag[V], implicit evidence: scala.reflect.ClassTag[F])
org.apache.spark.streaming.dstream.InputDStream[(K, V)] <and>
[K, V, F <: org.apache.hadoop.mapreduce.InputFormat[K,V]](directory:
String, filter: org.apache.hadoop.fs.Path ⇒ Boolean, newFilesOnly: Boolean)(implicit evidence: scala.reflect.ClassTag[K], implicit evidence: scala.reflect.ClassTag[V],
implicit evidence: scala.reflect.ClassTag[F])
org.apache.spark.streaming.dstream.InputDStream[(K, V)] <and> [K, V, F <: org.apache.hadoop.mapreduce.InputFormat[K,V]](directory: String)(implicit evidence: scala.reflect.ClassTag[K], implicit evidence: scala.reflect.ClassTag[V], implicit evidence: scala.reflect.ClassTag[F])
org.apache.spark.streaming.dstream.InputDStream[(K, V)]
wrong number of type parameters for overloaded method value fileStream with alternatives:
[K, V, F <: org.apache.hadoop.mapreduce.InputFormat[K,V]](directory: String, filter: org.apache.hadoop.fs.Path ⇒ Boolean, newFilesOnly: Boolean, conf: org.apache.hadoop.conf.Configuration)(implicit evidence: scala.reflect.ClassTag[K], implicit evidence: scala.reflect.ClassTag[V], implicit evidence: scala.reflect.ClassTag[F])
org.apache.spark.streaming.dstream.InputDStream[(K, V)] <and> [K, V, F <: org.apache.hadoop.mapreduce.InputFormat[K,V]](directory: String, filter: org.apache.hadoop.fs.Path ⇒ Boolean, newFilesOnly: Boolean)(implicit evidence: scala.reflect.ClassTag[K], implicit evidence: scala.reflect.ClassTag[V], implicit evidence: scala.reflect.ClassTag[F])
org.apache.spark.streaming.dstream.InputDStream[(K, V)] <and>
[K, V, F <: org.apache.hadoop.mapreduce.InputFormat[K,V]](directory: String)(implicit evidence: scala.reflect.ClassTag[K], implicit evidence: scala.reflect.ClassTag[V], implicit evidence: scala.reflect.ClassTag[F])
org.apache.spark.streaming.dstream.InputDStream[(K, V)]
我正在使用 spark 1.4.1 和 hadoop 2.7.1。在发布这个问题之前,我已经查看了通过 Whosebug 和 spark 文档讨论的不同实现,但没有任何帮助。任何帮助将不胜感激。
谢谢 罗杰尼什
请在下面找到示例 java 代码,导入正确,对我来说工作正常
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
JavaStreamingContext jssc = SparkUtils.getStreamingContext("key", jsc);
// JavaDStream<String> rawInput = jssc.textFileStream(inputPath);
JavaPairInputDStream<LongWritable, Text> inputStream = jssc.fileStream(
inputPath, LongWritable.class, Text.class,
TextInputFormat.class, new Function<Path, Boolean>() {
@Override
public Boolean call(Path v1) throws Exception {
if ( v1.getName().contains("COPYING") ) {
// This eliminates staging files.
return Boolean.FALSE;
}
return Boolean.TRUE;
}
}, true);
JavaDStream<String> rawInput = inputStream.map(
new Function<Tuple2<LongWritable, Text>, String>() {
@Override
public String call(Tuple2<LongWritable, Text> v1) throws Exception {
return v1._2().toString();
}
});
log.info(tracePrefix + "Created the stream, Window Interval: " + windowInterval + ", Slide interval: " + slideInterval);
rawInput.print();