如何使用火花流检查rdd是否为空?
how to check if rdd is empty using spark streaming?
我有以下 pyspark 代码,我用它从 logs/ 目录中读取日志文件,然后仅当其中包含数据时才将结果保存到文本文件中……换句话说,当 RDD 不为空时。但是我在实施它时遇到了问题。我已经尝试了 take(1) 和 notempty。因为这是 dstream rdd 我们不能对它应用 rdd 方法。如果我遗漏了什么,请告诉我。
conf = SparkConf().setMaster("local").setAppName("PysparkStreaming")
sc = SparkContext.getOrCreate(conf = conf)
ssc = StreamingContext(sc, 3) #Streaming will execute in each 3 seconds
lines = ssc.textFileStream('/Users/rocket/Downloads/logs/') #'logs/ mean directory name
audit = lines.map(lambda x: x.split('|')[3])
result = audit.countByValue()
#result.pprint()
#result.foreachRDD(lambda rdd: rdd.foreach(sendRecord))
# Print the first ten elements of each RDD generated in this DStream to the console
if result.foreachRDD(lambda rdd: rdd.take(1)):
result.pprint()
result.saveAsTextFiles("/Users/rocket/Downloads/output","txt")
else:
result.pprint()
print("empty")
正确的结构应该是
import uuid
def process_batch(rdd):
if not rdd.isEmpty():
result.saveAsTextFiles("/Users/rocket/Downloads/output-{}".format(
str(uuid.uuid4())
) ,"txt")
result.foreachRDD(process_batch)
但是,正如您在上面看到的那样,每个批次都需要一个单独的目录,因为 RDD API 没有 append
模式。
备选方案可以是:
def process_batch(rdd):
if not rdd.isEmpty():
lines = rdd.map(str)
spark.createDataFrame(lines, "string").save.mode("append").format("text").save("/Users/rocket/Downloads/output")
我有以下 pyspark 代码,我用它从 logs/ 目录中读取日志文件,然后仅当其中包含数据时才将结果保存到文本文件中……换句话说,当 RDD 不为空时。但是我在实施它时遇到了问题。我已经尝试了 take(1) 和 notempty。因为这是 dstream rdd 我们不能对它应用 rdd 方法。如果我遗漏了什么,请告诉我。
conf = SparkConf().setMaster("local").setAppName("PysparkStreaming")
sc = SparkContext.getOrCreate(conf = conf)
ssc = StreamingContext(sc, 3) #Streaming will execute in each 3 seconds
lines = ssc.textFileStream('/Users/rocket/Downloads/logs/') #'logs/ mean directory name
audit = lines.map(lambda x: x.split('|')[3])
result = audit.countByValue()
#result.pprint()
#result.foreachRDD(lambda rdd: rdd.foreach(sendRecord))
# Print the first ten elements of each RDD generated in this DStream to the console
if result.foreachRDD(lambda rdd: rdd.take(1)):
result.pprint()
result.saveAsTextFiles("/Users/rocket/Downloads/output","txt")
else:
result.pprint()
print("empty")
正确的结构应该是
import uuid
def process_batch(rdd):
if not rdd.isEmpty():
result.saveAsTextFiles("/Users/rocket/Downloads/output-{}".format(
str(uuid.uuid4())
) ,"txt")
result.foreachRDD(process_batch)
但是,正如您在上面看到的那样,每个批次都需要一个单独的目录,因为 RDD API 没有 append
模式。
备选方案可以是:
def process_batch(rdd):
if not rdd.isEmpty():
lines = rdd.map(str)
spark.createDataFrame(lines, "string").save.mode("append").format("text").save("/Users/rocket/Downloads/output")