pyspark textFileStreaming 在 textFile 工作时无法检测到 txt 文件
pyspark textFileStreaming can not detect txt file while textFile works
说明我的问题不一样:这个问题和标出的问题不一样。首先,输入参数已经是一个目录(这是正确的,但标记的问题是错误的)。其次,我在流式传输期间将 txt 文件复制到目录 运行 模拟新的 txt 文件到达(因此生成新文件而不是该目录中现有的相同文件)
下面是我的问题
我有一个目录和txt文件/tmp/a.txt
,文件里的内容是
aaa
bbb
我使用 pyspark 并在同一目录中连续手动复制此文件(在流式传输期间 运行 同时创建文件)
def count(x):
if x.isEmpty:
print("empty")
return
print(x.count())
sc = SparkContext()
ssc = StreamingContext(sc, 3)
ssc.textFileStream("/tmp/").foreachRDD(count)
输出显示RDD为空
不过我用的是
c = sc.textFile("/tmp/").count()
print(c)
显示c为2(与txt文件内容一致)
为什么流式传输不起作用?
您是要获取添加到 /tmp/a.txt
文件的新行还是要获取添加到 tmp
目录的新文件?
如果是后者,请尝试用此
替换您的最后一行
ssc.textFileStream("/tmp/*").foreachRDD(count)
我在scala中找到了解决方案(在python中仍然无法提取新文件)
首先,sc.textFile
和sc.textFileStream
采用相同的参数,即目录名。所以上面的代码是正确的。
但是,不同的是,如果目录存在,sc.textFile
可以提取文件(并且它必须存在,否则会引发 InvalidInputException
),但在流式模式下 sc.textFileStream
(本地文件系统),要求该目录不存在,由streaming程序创建,否则无法拾取新文件(好像是个bug,只存在于本地文件系统,在HDFS好像根据其他人的经验工作得很好)。
而且,根据其他人的经验,他们说如果删除目录和re-run程序,回收站也必须清空。
然而,在python中这个问题仍然存在,当目录中没有文件存在时,scala程序只会打印0
,但python程序会发出警告
WARN FileInputDStream:87 - Error finding new files
java.lang.NullPointerException
这是我在 python 和 scala 中的代码,写新文件的方式是一样的,所以我不post 这里
python代码:
if __name__ == "__main__":
sc = SparkContext()
ssc = StreamingContext(sc, 3)
ssc.textFileStream(path).foreachRDD(lambda x: print(x.count()))
ssc.start()
ssc.awaitTermination()
scala代码:
def main(args: Array[String]): Unit = {
val sc = new SparkContext()
val ssc = new StreamingContext(sc, Seconds(3))
ssc.textFileStream(params.inputPath).foreachRDD { x =>
print(x.count())
}
ssc.start()
ssc.awaitTermination()
}
说明我的问题不一样:这个问题和标出的问题不一样。首先,输入参数已经是一个目录(这是正确的,但标记的问题是错误的)。其次,我在流式传输期间将 txt 文件复制到目录 运行 模拟新的 txt 文件到达(因此生成新文件而不是该目录中现有的相同文件)
下面是我的问题
我有一个目录和txt文件/tmp/a.txt
,文件里的内容是
aaa
bbb
我使用 pyspark 并在同一目录中连续手动复制此文件(在流式传输期间 运行 同时创建文件)
def count(x):
if x.isEmpty:
print("empty")
return
print(x.count())
sc = SparkContext()
ssc = StreamingContext(sc, 3)
ssc.textFileStream("/tmp/").foreachRDD(count)
输出显示RDD为空
不过我用的是
c = sc.textFile("/tmp/").count()
print(c)
显示c为2(与txt文件内容一致)
为什么流式传输不起作用?
您是要获取添加到 /tmp/a.txt
文件的新行还是要获取添加到 tmp
目录的新文件?
如果是后者,请尝试用此
替换您的最后一行ssc.textFileStream("/tmp/*").foreachRDD(count)
我在scala中找到了解决方案(在python中仍然无法提取新文件)
首先,sc.textFile
和sc.textFileStream
采用相同的参数,即目录名。所以上面的代码是正确的。
但是,不同的是,如果目录存在,sc.textFile
可以提取文件(并且它必须存在,否则会引发 InvalidInputException
),但在流式模式下 sc.textFileStream
(本地文件系统),要求该目录不存在,由streaming程序创建,否则无法拾取新文件(好像是个bug,只存在于本地文件系统,在HDFS好像根据其他人的经验工作得很好)。
而且,根据其他人的经验,他们说如果删除目录和re-run程序,回收站也必须清空。
然而,在python中这个问题仍然存在,当目录中没有文件存在时,scala程序只会打印0
,但python程序会发出警告
WARN FileInputDStream:87 - Error finding new files
java.lang.NullPointerException
这是我在 python 和 scala 中的代码,写新文件的方式是一样的,所以我不post 这里
python代码:
if __name__ == "__main__":
sc = SparkContext()
ssc = StreamingContext(sc, 3)
ssc.textFileStream(path).foreachRDD(lambda x: print(x.count()))
ssc.start()
ssc.awaitTermination()
scala代码:
def main(args: Array[String]): Unit = {
val sc = new SparkContext()
val ssc = new StreamingContext(sc, Seconds(3))
ssc.textFileStream(params.inputPath).foreachRDD { x =>
print(x.count())
}
ssc.start()
ssc.awaitTermination()
}