使用 apache beam 按顺序读取文件和文件夹

reading files and folders in order with apache beam

我有一个 year/month/day/hour/* 类型的文件夹结构,我希望 beam 按时间顺序将其作为无限来源阅读。具体来说,这意味着在记录的第一个小时内读取所有文件并添加其内容以进行处理。然后,添加下一个小时的文件内容进行处理,直到当前等待新文件到达最新year/month/day/hour文件夹中的时间。

是否可以用 apache beam 做到这一点?

所以我要做的是根据文件路径为每个元素添加时间戳。作为测试,我使用了以下示例。

首先,如this answer所述,您可以使用FileIO来连续匹配一个文件模式。根据您的用例,这将有所帮助,一旦您完成回填,您希望继续在同一作业中读取新到达的文件。在这种情况下,我提供 gs://BUCKET_NAME/data/** 因为我的文件将像 gs://BUCKET_NAME/data/year/month/day/hour/filename.extension:

p
    .apply(FileIO.match()
    .filepattern(inputPath)
    .continuously(
        // Check for new files every minute
        Duration.standardMinutes(1),
        // Never stop checking for new files
        Watch.Growth.<String>never()))
    .apply(FileIO.readMatches())

观看频率和超时时间可随意调整

然后,在下一步中我们将收到匹配的文件。我将使用 ReadableFile.getMetadata().resourceId() 获取完整路径并将其拆分为 "/" 以构建相应的时间戳。我将它四舍五入到小时,这里不考虑时区校正。使用 readFullyAsUTF8String 我们将读取整个文件(如果整个文件不适合内存,请小心,建议在需要时对您的输入进行分片)并将其拆分为行。使用 ProcessContext.outputWithTimestamp 我们将向下游发出文件名和行的 KV(不再需要文件名,但它有助于查看每个文件的来源)以及从路径派生的时间戳。请注意,我们正在移动时间戳 "back in time",因此这可能会扰乱水印启发式算法,您将收到如下消息:

Cannot output with timestamp 2019-03-17T00:00:00.000Z. Output timestamps must be no earlier than the timestamp of the current input (2019-06-05T15:41:29.645Z) minus the allowed skew (0 milliseconds). See the DoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed skew.

为了克服这个问题,我将 getAllowedTimestampSkew 设置为 Long.MAX_VALUE,但考虑到这已被弃用。 ParDo代码:

.apply("Add Timestamps", ParDo.of(new DoFn<ReadableFile, KV<String, String>>() {

    @Override
    public Duration getAllowedTimestampSkew() {
        return new Duration(Long.MAX_VALUE);
    }

    @ProcessElement
    public void processElement(ProcessContext c) {
        ReadableFile file = c.element();
        String fileName = file.getMetadata().resourceId().toString();
        String lines[];

        String[] dateFields = fileName.split("/");
        Integer numElements = dateFields.length;

        String hour = dateFields[numElements - 2];
        String day = dateFields[numElements - 3];
        String month = dateFields[numElements - 4];
        String year = dateFields[numElements - 5];

        String ts = String.format("%s-%s-%s %s:00:00", year, month, day, hour);
        Log.info(ts);

        try{
            lines = file.readFullyAsUTF8String().split("\n");

            for (String line : lines) {
                c.outputWithTimestamp(KV.of(fileName, line), new Instant(dateTimeFormat.parseMillis(ts)));
            }
        }

        catch(IOException e){
            Log.info("failed");
        }
    }}))

最后,我 window 进入 1 小时 FixedWindows 并记录结果:

.apply(Window
    .<KV<String,String>>into(FixedWindows.of(Duration.standardHours(1)))
    .triggering(AfterWatermark.pastEndOfWindow())
    .discardingFiredPanes()
    .withAllowedLateness(Duration.ZERO))
.apply("Log results", ParDo.of(new DoFn<KV<String, String>, Void>() {
    @ProcessElement
    public void processElement(ProcessContext c, BoundedWindow window) {
        String file = c.element().getKey();
        String value = c.element().getValue();
        String eventTime = c.timestamp().toString();

        String logString = String.format("File=%s, Line=%s, Event Time=%s, Window=%s", file, value, eventTime, window.toString());
        Log.info(logString);
    }
}));

对我来说,它适用于 .withAllowedLateness(Duration.ZERO),但根据您可能需要设置的顺序。请记住,值太高会导致 windows 打开更长时间并使用更多持久存储。

我设置了 $BUCKET$PROJECT 变量,我只上传了两个文件:

gsutil cp file1 gs://$BUCKET/data/2019/03/17/00/
gsutil cp file2 gs://$BUCKET/data/2019/03/18/22/

和运行工作:

mvn -Pdataflow-runner compile -e exec:java \
 -Dexec.mainClass=com.dataflow.samples.ChronologicalOrder \
      -Dexec.args="--project=$PROJECT \
      --path=gs://$BUCKET/data/** \
      --stagingLocation=gs://$BUCKET/staging/ \
      --runner=DataflowRunner"

结果:

完整 code

让我知道这是如何工作的。这只是一个入门示例,您可能需要调整 windowing 和触发策略、延迟等以适合您的用例