Spark Streaming:如何在 Python 中获取已处理文件的文件名
Spark Streaming: How to get the filename of a processed file in Python
我对 Spark 有点菜鸟(而且 Python 老实说)所以如果我错过了一些明显的东西,请原谅我。
我正在使用 Spark 和 Python 进行文件流式传输。在我做的第一个例子中,Spark 正确地监听了给定的目录并计算文件中的单词出现次数,所以我知道在监听目录方面一切正常。
现在我正在尝试获取为审计目的而处理的文件的名称。我在这里读
http://mail-archives.us.apache.org/mod_mbox/spark-user/201504.mbox/%3CCANvfmP8OC9jrpVgWsRWfqjMxeYd6sE6EojfdyFy_GaJ3BO43_A@mail.gmail.com%3E
这不是一项微不足道的任务。我在这里得到了一个可能的解决方案
http://mail-archives.us.apache.org/mod_mbox/spark-user/201502.mbox/%3CCAEgyCiZbnrd6Y_aG0cBRCVC1u37X8FERSEcHB=tR3A2VGrGrPQ@mail.gmail.com%3E
我尝试按如下方式实现它:
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
def fileName(data):
string = data.toDebugString
if __name__ == "__main__":
sc = SparkContext(appName="PythonStreamingFileNamePrinter")
ssc = StreamingContext(sc, 1)
lines = ssc.textFileStream("file:///test/input/")
files = lines.foreachRDD(fileName)
print(files)
ssc.start()
ssc.awaitTermination()
不幸的是,现在它不是每秒都在文件夹中监听,而是监听一次,输出 'None' 然后等待什么也不做。这与确实有效的代码之间的唯一区别是
files = lines.foreachRDD(fileName)
在我什至担心获取文件名(明天的问题)之前,有人能明白为什么这只检查目录一次吗?
提前致谢
M
所以这是一个菜鸟错误。我发布我的解决方案供自己和其他人参考。
正如@user3689574 所指出的,我没有在我的函数中返回调试字符串。这充分解释了为什么我得到 'None'.
接下来,我在函数外部打印调试,这意味着它从来不是 foreachRDD 的一部分。将其移入函数如下:
def fileName(data):
debug = data.toDebugString()
print(debug)
这将按应有的方式打印调试信息,并按应有的方式继续侦听目录。改变它解决了我最初的问题。在获取文件名方面,这变得非常简单。
目录没有变化时的调试字符串如下:
(0) MapPartitionsRDD[1] at textFileStream at NativeMethodAccessorImpl.java:-2 [] | UnionRDD[0] at textFileStream at NativeMethodAccessorImpl.java:-2 []
这很清楚地表明没有文件。将文件复制到目录后,调试输出如下:
(1) MapPartitionsRDD[42] at textFileStream at NativeMethodAccessorImpl.java:-2 [] | UnionRDD[41] at testFileStream at NativeMethodAccessorImpl.java:-2 [] | file:/test/input/test.txt New HadoopRDD[40] at textFileStream at NativeMethodAccessorImpl.java:-2 []
使用快速正则表达式,可以轻松地为您提供文件名。希望这对其他人有帮助。
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
def get_file_info(rdd):
file_content = rdd.collect()
file_name = rdd.toDebugString()
print(file_name, file_content)
def main():
sc = SparkContext("local[2]", "deneme")
ssc = StreamingContext(sc, 1) # One DSTREAM in the same time
lines = ssc.textFileStream('../urne')
# here is the call
lines.foreachRDD(lambda rdd: get_file_info(rdd))
# Split each line into words
words = lines.flatMap(lambda line: line.split("\n"))
# Count each word in each batch
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)
wordCounts.pprint()
ssc.start()
ssc.awaitTermination()
if __name__ == "__main__":
main()
然后,当你得到这样的结果时:
b'(3) MapPartitionsRDD[237] at textFileStream at NativeMethodAccessorImpl.java:0 []\n | UnionRDD[236] at textFileStream at NativeMethodAccessorImpl.java:0 []\n |文件:/some/directory/file0.068513 NewHadoopRDD[231] at textFileStream at NativeMethodAccessorImpl.java:0 []\n |文件:/some/directory/file0.069317 NewHadoopRDD[233] at textFileStream at NativeMethodAccessorImpl.java:0 []\n |文件:/some/directory/file0.070036 NewHadoopRDD[235] at textFileStream at NativeMethodAccessorImpl.java:0 []' ['6', '3', '4', '3', '6', '0' , '1', '7', '10', '2', '0', '0', '1', '1', '10', '8', '7', '7', ' 0'、'8'、'8'、'9'、'7'、'2'、'9'、'1'、'5'、'8'、'9'、'9'、'0' , '6', '0', '4', '3', '4', '8', '5', '8', '10', '5', '2', '3', ' 6'、'10'、'2'、'1'、'0'、'4'、'3'、'1'、'8'、'2'、'10'、'4'、'0' , '4', '4', '1', '4', '3', '1', '2', '5', '5', '3', ]
制作一个正则表达式来获取文件的内容及其名称,向您展示它有 3 个文件作为一个 DSTREM,因此您可以从那里开始工作
我对 Spark 有点菜鸟(而且 Python 老实说)所以如果我错过了一些明显的东西,请原谅我。
我正在使用 Spark 和 Python 进行文件流式传输。在我做的第一个例子中,Spark 正确地监听了给定的目录并计算文件中的单词出现次数,所以我知道在监听目录方面一切正常。
现在我正在尝试获取为审计目的而处理的文件的名称。我在这里读 http://mail-archives.us.apache.org/mod_mbox/spark-user/201504.mbox/%3CCANvfmP8OC9jrpVgWsRWfqjMxeYd6sE6EojfdyFy_GaJ3BO43_A@mail.gmail.com%3E 这不是一项微不足道的任务。我在这里得到了一个可能的解决方案 http://mail-archives.us.apache.org/mod_mbox/spark-user/201502.mbox/%3CCAEgyCiZbnrd6Y_aG0cBRCVC1u37X8FERSEcHB=tR3A2VGrGrPQ@mail.gmail.com%3E 我尝试按如下方式实现它:
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
def fileName(data):
string = data.toDebugString
if __name__ == "__main__":
sc = SparkContext(appName="PythonStreamingFileNamePrinter")
ssc = StreamingContext(sc, 1)
lines = ssc.textFileStream("file:///test/input/")
files = lines.foreachRDD(fileName)
print(files)
ssc.start()
ssc.awaitTermination()
不幸的是,现在它不是每秒都在文件夹中监听,而是监听一次,输出 'None' 然后等待什么也不做。这与确实有效的代码之间的唯一区别是
files = lines.foreachRDD(fileName)
在我什至担心获取文件名(明天的问题)之前,有人能明白为什么这只检查目录一次吗?
提前致谢 M
所以这是一个菜鸟错误。我发布我的解决方案供自己和其他人参考。
正如@user3689574 所指出的,我没有在我的函数中返回调试字符串。这充分解释了为什么我得到 'None'.
接下来,我在函数外部打印调试,这意味着它从来不是 foreachRDD 的一部分。将其移入函数如下:
def fileName(data):
debug = data.toDebugString()
print(debug)
这将按应有的方式打印调试信息,并按应有的方式继续侦听目录。改变它解决了我最初的问题。在获取文件名方面,这变得非常简单。
目录没有变化时的调试字符串如下:
(0) MapPartitionsRDD[1] at textFileStream at NativeMethodAccessorImpl.java:-2 [] | UnionRDD[0] at textFileStream at NativeMethodAccessorImpl.java:-2 []
这很清楚地表明没有文件。将文件复制到目录后,调试输出如下:
(1) MapPartitionsRDD[42] at textFileStream at NativeMethodAccessorImpl.java:-2 [] | UnionRDD[41] at testFileStream at NativeMethodAccessorImpl.java:-2 [] | file:/test/input/test.txt New HadoopRDD[40] at textFileStream at NativeMethodAccessorImpl.java:-2 []
使用快速正则表达式,可以轻松地为您提供文件名。希望这对其他人有帮助。
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
def get_file_info(rdd):
file_content = rdd.collect()
file_name = rdd.toDebugString()
print(file_name, file_content)
def main():
sc = SparkContext("local[2]", "deneme")
ssc = StreamingContext(sc, 1) # One DSTREAM in the same time
lines = ssc.textFileStream('../urne')
# here is the call
lines.foreachRDD(lambda rdd: get_file_info(rdd))
# Split each line into words
words = lines.flatMap(lambda line: line.split("\n"))
# Count each word in each batch
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)
wordCounts.pprint()
ssc.start()
ssc.awaitTermination()
if __name__ == "__main__":
main()
然后,当你得到这样的结果时: b'(3) MapPartitionsRDD[237] at textFileStream at NativeMethodAccessorImpl.java:0 []\n | UnionRDD[236] at textFileStream at NativeMethodAccessorImpl.java:0 []\n |文件:/some/directory/file0.068513 NewHadoopRDD[231] at textFileStream at NativeMethodAccessorImpl.java:0 []\n |文件:/some/directory/file0.069317 NewHadoopRDD[233] at textFileStream at NativeMethodAccessorImpl.java:0 []\n |文件:/some/directory/file0.070036 NewHadoopRDD[235] at textFileStream at NativeMethodAccessorImpl.java:0 []' ['6', '3', '4', '3', '6', '0' , '1', '7', '10', '2', '0', '0', '1', '1', '10', '8', '7', '7', ' 0'、'8'、'8'、'9'、'7'、'2'、'9'、'1'、'5'、'8'、'9'、'9'、'0' , '6', '0', '4', '3', '4', '8', '5', '8', '10', '5', '2', '3', ' 6'、'10'、'2'、'1'、'0'、'4'、'3'、'1'、'8'、'2'、'10'、'4'、'0' , '4', '4', '1', '4', '3', '1', '2', '5', '5', '3', ]
制作一个正则表达式来获取文件的内容及其名称,向您展示它有 3 个文件作为一个 DSTREM,因此您可以从那里开始工作