Spark Streaming 与 Hadoop 配置对象
Spark Streaming with Hadoop configuration object
StreamingContext - fileStream 被重载以获取 Hadoop 配置对象,但它似乎不起作用。
Spark 源代码中的代码片段:
def fileStream[K: ClassTag,V: ClassTag,F <: NewInputFormat[K, V]: ClassTag] (directory: String): InputDStream[(K, V)] =
{ new FileInputDStream[K, V, F](this, directory) }
def fileStream[K: ClassTag,V: ClassTag,F <: NewInputFormat[K, V]: ClassTag] (directory: String, filter: Path => Boolean, newFilesOnly: Boolean): InputDStream[(K, V)] =
{ new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly) }
def fileStream[K: ClassTag,V: ClassTag,F <: NewInputFormat[K, V]: ClassTag] (directory: String,filter: Path => Boolean, newFilesOnly: Boolean, conf: Configuration): InputDStream[(K, V)] =
{ new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly, Option(conf)) }
代码片段:
工作正常
val windowDStream = ssc.fileStream[LongWritable, Text, TextInputFormat](args(0), (x: Path) => true, true);
编译错误:
val conf = sc.hadoopConfiguration;
val windowDStream = ssc.fileStream[LongWritable, Text, TextInputFormat](args(0), (x: Path) => true, true,conf);
错误:
overloaded method value fileStream with alternatives: (directory: String,filter: org.apache.hadoop.fs.Path ⇒ Boolean,newFilesOnly: Boolean)(implicit evidence: scala.reflect.ClassTag[org.apache.hadoop.io.LongWritable], implicit evidence: scala.reflect.ClassTag[org.apache.hadoop.io.Text], implicit evidence: scala.reflect.ClassTag[org.apache.hadoop.mapreduce.lib.input.TextInputFormat])org.apache.spark.streaming.dstream.InputDStream[(org.apache.hadoop.io.LongWritable, org.apache.hadoop.io.Text)] <and> (directory: String)(implicit evidence: scala.reflect.ClassTag[org.apache.hadoop.io.LongWritable], implicit evidence: scala.reflect.ClassTag[org.apache.hadoop.io.Text], implicit evidence: scala.reflect.ClassTag[org.apache.hadoop.mapreduce.lib.input.TextInputFormat])org.apache.spark.streaming.dstream.InputDStream[(org.apache.hadoop.io.LongWritable, org.apache.hadoop.io.Text)] cannot be applied to (String, org.apache.hadoop.fs.Path ⇒ Boolean, Boolean, org.apache.hadoop.conf.Configuration)
我假设您使用的是 Spark 1.2 或更早版本。如果你从 master 切换到 1.2 分支,你会看到这个重载不存在。事实上,FileInputDStream
本身直到 1.3 才接受 this 作为构造函数参数
StreamingContext - fileStream 被重载以获取 Hadoop 配置对象,但它似乎不起作用。
Spark 源代码中的代码片段:
def fileStream[K: ClassTag,V: ClassTag,F <: NewInputFormat[K, V]: ClassTag] (directory: String): InputDStream[(K, V)] =
{ new FileInputDStream[K, V, F](this, directory) }
def fileStream[K: ClassTag,V: ClassTag,F <: NewInputFormat[K, V]: ClassTag] (directory: String, filter: Path => Boolean, newFilesOnly: Boolean): InputDStream[(K, V)] =
{ new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly) }
def fileStream[K: ClassTag,V: ClassTag,F <: NewInputFormat[K, V]: ClassTag] (directory: String,filter: Path => Boolean, newFilesOnly: Boolean, conf: Configuration): InputDStream[(K, V)] =
{ new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly, Option(conf)) }
代码片段: 工作正常
val windowDStream = ssc.fileStream[LongWritable, Text, TextInputFormat](args(0), (x: Path) => true, true);
编译错误:
val conf = sc.hadoopConfiguration;
val windowDStream = ssc.fileStream[LongWritable, Text, TextInputFormat](args(0), (x: Path) => true, true,conf);
错误:
overloaded method value fileStream with alternatives: (directory: String,filter: org.apache.hadoop.fs.Path ⇒ Boolean,newFilesOnly: Boolean)(implicit evidence: scala.reflect.ClassTag[org.apache.hadoop.io.LongWritable], implicit evidence: scala.reflect.ClassTag[org.apache.hadoop.io.Text], implicit evidence: scala.reflect.ClassTag[org.apache.hadoop.mapreduce.lib.input.TextInputFormat])org.apache.spark.streaming.dstream.InputDStream[(org.apache.hadoop.io.LongWritable, org.apache.hadoop.io.Text)] <and> (directory: String)(implicit evidence: scala.reflect.ClassTag[org.apache.hadoop.io.LongWritable], implicit evidence: scala.reflect.ClassTag[org.apache.hadoop.io.Text], implicit evidence: scala.reflect.ClassTag[org.apache.hadoop.mapreduce.lib.input.TextInputFormat])org.apache.spark.streaming.dstream.InputDStream[(org.apache.hadoop.io.LongWritable, org.apache.hadoop.io.Text)] cannot be applied to (String, org.apache.hadoop.fs.Path ⇒ Boolean, Boolean, org.apache.hadoop.conf.Configuration)
我假设您使用的是 Spark 1.2 或更早版本。如果你从 master 切换到 1.2 分支,你会看到这个重载不存在。事实上,FileInputDStream
本身直到 1.3 才接受 this 作为构造函数参数