pyspark textFileStreaming 在 textFile 工作时无法检测到 txt 文件

pyspark textFileStreaming can not detect txt file while textFile works

说明我的问题不一样:这个问题和标出的问题不一样。首先,输入参数已经是一个目录(这是正确的,但标记的问题是错误的)。其次,我在流式传输期间将 txt 文件复制到目录 运行 模拟新的 txt 文件到达(因此生成新文件而不是该目录中现有的相同文件)

下面是我的问题


我有一个目录和txt文件/tmp/a.txt,文件里的内容是

aaa
bbb

我使用 pyspark 并在同一目录中连续手动复制此文件(在流式传输期间 运行 同时创建文件)

def count(x):
    if x.isEmpty:
        print("empty")
        return
    print(x.count())

sc = SparkContext()
ssc = StreamingContext(sc, 3)
ssc.textFileStream("/tmp/").foreachRDD(count)

输出显示RDD为空

不过我用的是

c = sc.textFile("/tmp/").count()
print(c)

显示c为2(与txt文件内容一致)

为什么流式传输不起作用?

您是要获取添加到 /tmp/a.txt 文件的新行还是要获取添加到 tmp 目录的新文件?

如果是后者,请尝试用此

替换您的最后一行

ssc.textFileStream("/tmp/*").foreachRDD(count)

我在scala中找到了解决方案(在python中仍然无法提取新文件)

首先,sc.textFilesc.textFileStream采用相同的参数,即目录名。所以上面的代码是正确的。

但是,不同的是,如果目录存在,sc.textFile 可以提取文件(并且它必须存在,否则会引发 InvalidInputException),但在流式模式下 sc.textFileStream(本地文件系统),要求该目录不存在,由streaming程序创建,否则无法拾取新文件(好像是个bug,只存在于本地文件系统,在HDFS好像根据其他人的经验工作得很好)。

而且,根据其他人的经验,他们说如果删除目录和re-run程序,回收站也必须清空。


然而,在python中这个问题仍然存在,当目录中没有文件存在时,scala程序只会打印0,但python程序会发出警告

WARN FileInputDStream:87 - Error finding new files 
java.lang.NullPointerException

这是我在 python 和 scala 中的代码,写新文件的方式是一样的,所以我不post 这里

python代码:

if __name__ == "__main__":
    sc = SparkContext()    
    ssc = StreamingContext(sc, 3)
    ssc.textFileStream(path).foreachRDD(lambda x: print(x.count()))
    ssc.start()
    ssc.awaitTermination()

scala代码:

def main(args: Array[String]): Unit = {  
  val sc = new SparkContext()
  val ssc = new StreamingContext(sc, Seconds(3))
  ssc.textFileStream(params.inputPath).foreachRDD { x =>
    print(x.count())
  }
  ssc.start()
  ssc.awaitTermination()
}