在 运行 作为 Google Dataflow 中的 DataflowPipelineRunner 时访问资源文件

Accessing resource file when running as a DataflowPipelineRunner in Google Dataflow

在我的项目中,我试图将一些元数据添加到我的管道中处理的数据中。元数据位于 src 文件夹旁边名为 resources 的子文件夹中的 DBF 文件中。

src 文件夹包含 main-class 并且我有几个包(IO、处理、聚合、实用程序)。

我在定义管道的主要 class 中读取和处理包含元数据的文件。我用来访问文件的代码如下:

File temp1 = new File("resources/xxx.dbf");

我检查是否使用以下方法找到文件:

LOG.info(temp1.exists())

运行没问题。

有消息以字符串形式传入,我使用 PubSubIO 读取这些消息。我使用此文件的内容来填充包含键和值的 Map。

Map<String, ArrayList<Double>> sensorToCoordinates = coordinateData.getSensorLocations();

然后我在名为 'SensorValues' 的自定义 class 中设置了一个静态变量 我做了:

SensorValue.setKeyToCoordinates(sensorToCoordinates);

在将传入消息从字符串解析为 SensorValue-class 时,我使用 ParDo 函数(从 PCollection 到 PCollection)在 SensorValue-[=37= 的构造函数中使用了映射].

运行 此代码使用 DirectPipelineRunner 效果完美。但是,当我使用 DataflowPipelineRunner 并尝试访问 SensorValue 构造函数中的映射时,我 运行 进入 NullPointerException。

现在我想知道为什么 setter 在使用 DataflowPipelineRunner 时不起作用(我猜这与在多个工作人员之间分配执行有关)以及最佳实践是什么使用任何静态资源文件来丰富您的管道?

您是对的,问题是因为 ParDo 的执行分配给了多个工作人员。他们没有本地文件,他们可能没有地图的内容。

这里有几个选项:

  1. 将文件放在 GCS 中,让管道读取文件的内容(使用 TextIO 或类似的东西)并将其用作 side-input 供您稍后处理。

  2. 将文件包含在管道的资源中,并将其加载到需要它的 DoFnstartBundle 中(将来会有一些方法可以做到这一点发生频率低于每个捆绑包)。

  3. 您可以将映射的内容序列化为 DoFn 的参数,方法是将其作为非静态字段传递给 class 的构造函数.

随着此文件大小的增加,选项 1 更好(因为它可以支持将其拆分成多个部分并进行查找),而选项 2 可能会减少检索文件的网络流量。选项 3 仅在文件非常小的情况下才有效,因为它会显着增加序列化的大小 DoFn,这可能导致提交到数据流服务的作业太大。