Spark Structured Streaming 在从 HDFS 读取时不写入数据

Spark Structured Streaming not writing data while reading from HDFS

我正在开发一个流式脚本,它应该在文件登陆 HDFS 时立即获取文件,聚合它们并将它们写入其他地方。

在这里,我无法让写入工作 - 它创建了元数据文件夹,但没有实际写入。在 10 多个文件中(所有相同的结构),只写了一个,我不确定为什么

有人能帮帮我吗?

from pyspark.sql import SparkSession
import pyspark.sql.functions as sqlfunc
import argparse, sys
from pyspark.sql import *
from pyspark.sql.functions import *
from datetime import datetime
from pyspark.sql.functions import lit
from pyspark.sql.types import *
from pyspark.sql.functions import udf
from pyspark.sql.types import *
from pyspark.sql.functions import udf, input_file_name, lower
from pyspark.streaming import StreamingContext
import sys
reload(sys)

sys.setdefaultencoding('utf-8')

now = datetime.now()

#create a contexit that supports hive
def create_session(appname):
    spark_session = SparkSession\
        .builder\
        .appName(appname)\
        .enableHiveSupport()\
        .getOrCreate()
    return spark_session

### START MAIN ###
if __name__ == '__main__':
    spark_session = create_session('streaming_monitor')
    ssc = StreamingContext(spark_session, 1)
    print('start')
    print(datetime.now())

    myschema = StructType([
      StructField('text', StringType())
    ])

    #only files after stream starts
    df = spark_session\
        .readStream\
        .option('newFilesOnly', 'true')\
        .option('header', 'true')\
        .schema(myschema)\
        .text('hdfs://nameservice/user/user1/streamtest/')\
        .withColumn("FileName", input_file_name())

    output = df.createOrReplaceTempView('log')
    #hive_dump = spark_session.sql("select '" + str(now) + "' as timestamp, FileName, did_it_error, solution, text from log")

    output = df\
    .writeStream\
    .format("csv")\
    .queryName('logsmonitor')\
    .option("checkpointLocation", "file:///home/user1/analytics/logs/chkpoint_dir")\
    .start('hdfs://nameservice/user/user1/streamtest/output/')\
    .awaitTermination()

您在这里观察到的是,必须以原子方式将 Spark Streaming 读取的文件放入源文件夹中。否则,文件将在创建后立即被读取(并且没有任何内容)。 Spark 不会对文件中的更新数据采取行动,而是只查看文件一次。

如果你

你会看到你所有的数据都在浮动
  • 停止流媒体作业
  • 删除检查点目录(或将所有输入文件重命名为新的唯一名称)
  • 将所有文件移动到源文件夹
  • 等待移动完成
  • 启动流媒体应用程序

当然,如果你想让这个工作运行不断地添加越来越多的文件,这将不是一个解决方案,但秘诀在于自动放置文件立刻进入文件夹。

我对 HDFS 不是很熟悉,但通常这种原子性可以通过将数据写入另一个文件夹,然后将其移动到源文件夹来实现。

这里是Input Sources文档中的参考:

"File source - Reads files written in a directory as a stream of data. Files will be processed in the order of file modification time. If latestFirst is set, order will be reversed. Supported file formats are text, CSV, JSON, ORC, Parquet. See the docs of the DataStreamReader interface for a more up-to-date list, and supported options for each file format. Note that the files must be atomically placed in the given directory, which in most file systems, can be achieved by file move operations."