在从 Dataflow 插入 BigQuery 之前验证行
Validating rows before inserting into BigQuery from Dataflow
根据
当前无法在将数据从 Dataflow 加载到 BigQuery 时设置 maxBadRecords
配置。建议在将数据流作业中的行插入 BigQuery 之前验证它们。
如果我有 TableSchema
和 TableRow
,我该如何确保该行可以安全地插入 table?
一定有比遍历架构中的字段、查看它们的类型并查看行中值的 class 更简单的方法,对吧?这似乎很容易出错,而且该方法必须是万无一失的,因为如果无法加载单行,整个管道就会失败。
更新:
我的用例是一个 ETL 作业,首先将 运行 on JSON(每行一个对象)登录 Cloud Storage 并批量写入 BigQuery,但稍后将从中读取对象PubSub 并连续写入 BigQuery。这些对象包含大量 BigQuery 中不需要的信息,还包含甚至无法在模式中描述的部分(基本上是自由形式 JSON 有效负载)。时间戳之类的东西也需要格式化才能与 BigQuery 一起使用。这项工作 运行 在不同的输入和写入不同的 tables 时会有一些变体。
理论上这不是一个非常困难的过程,它需要一个对象,提取一些属性 (50-100),格式化其中一些并将对象输出到 BigQuery。我或多或少只是遍历 属性 名称列表,从源对象中提取值,查看配置以查看 属性 是否应该以某种方式格式化,必要时应用格式化(this可能是小写,将毫秒时间戳除以 1000,从 URL 中提取主机名,等等),并将值写入 TableRow
对象。
我的问题是数据很乱。有几亿个物体,有些看起来不像预期的那样,这种情况很少见,但对于这些体积,罕见的事情仍然会发生。有时应该包含字符串的 属性 包含整数,反之亦然。有时有一个数组或一个对象应该有一个字符串。
理想情况下,我想把我的 TableRow
交给 TableSchema
然后问 "does this work?"。
因为这是不可能的,所以我改为查看 TableSchema
对象并尝试 validate/cast 自己的值。如果 TableSchema
说 属性 是 STRING
类型,我会 运行 value.toString()
然后再将它添加到 TableRow
。如果它是 INTEGER
,我会检查它是 Integer
、Long
还是 BigInteger
,等等。这种方法的问题是我只是在猜测 BigQuery 中的工作原理。 FLOAT
将接受哪些 Java 数据类型?对于 TIMESTAMP
?我认为我的 validations/casts 解决了大多数问题,但总有例外和边缘情况。
根据我的经验,这是非常有限的,如果单行未通过 BigQuery 的验证(就像常规加载一样,除非 maxBadRecords
设置为足够大的数字)。它还会失败,并显示表面有用的消息,如 'BigQuery import job "dataflow_job_xxx" failed. Causes: (5db0b2cdab1557e0): BigQuery job "dataflow_job_xxx" in project "xxx" finished with error(s): errorResult: JSON map specified for non-record field, error: JSON map specified for non-record field, error: JSON map specified for non-record field, error: JSON map specified for non-record field, error: JSON map specified for non-record field, error: JSON map specified for non-record field'。也许在某个地方可以看到更详细的错误消息,可以告诉我它是哪个 属性 以及它的值是多少?如果没有这些信息,它也可以说 "bad data".
据我所知,至少当 运行 以批处理模式运行时,Dataflow 会将 TableRow
对象写入 Cloud Storage 中的暂存区,然后在一切就绪后开始加载。这意味着我无处可捕获任何错误,加载 BigQuery 时我的代码不再 运行ning。我还没有 运行 流模式下的任何工作,但我不确定那里会有什么不同,根据我(公认的有限)的理解,基本原理是相同的,只是批处理大小更小.
人们使用 Dataflow 和 BigQuery,因此在不总是担心整个管道因单个错误输入而停止的情况下完成这项工作并非不可能。人们是怎么做到的?
我假设您将文件中的 JSON 反序列化为 Map<String, Object>
。然后你应该能够使用 TableSchema
.
递归地对其进行类型检查
我建议采用迭代方法来开发模式验证,包括以下两个步骤。
编写一个 PTransform<Map<String, Object>, TableRow>
将您的 JSON 行转换为 TableRow
对象。 TableSchema
也应该是函数的构造函数参数。您可以开始让这个函数变得非常严格——要求 JSON 将输入直接解析为 Integer,例如,当发现 BigQuery INTEGER 模式时——并积极声明错误记录。基本上,通过超严格的处理来确保不输出无效记录。
我们的 code here 做了一些类似的事情——给定一个由 BigQuery 生成并以 JSON 形式写入 GCS 的文件,我们递归地遍历模式并进行一些类型转换。但是,我们不需要验证,因为 BigQuery 本身写了数据。
请注意 TableSchema
对象不是 Serializable
。我们通过将 DoFn
或 PTransform
构造函数中的 TableSchema
转换为 JSON String
并返回。见 the code in BigQueryIO.java
that uses the jsonTableSchema
variable.
使用此 blog post 中描述的 "dead-letter" 策略来处理错误记录——从你的 PTransform 中输出有问题的 Map<String, Object>
行并将它们写入文件。这样,您可以稍后检查未通过验证的行。
您可以从一些小文件开始并使用 DirectPipelineRunner
而不是 DataflowPipelineRunner
。直接 运行ner 运行s 管道在您的计算机上,而不是在 Google Cloud Dataflow 服务上,它使用 BigQuery 流式写入。我相信当这些写入失败时,您会收到更好的错误消息。
(我们对批处理作业使用 GCS->BigQuery 加载作业模式,因为它更高效且更具成本效益,但 BigQuery 流式写入流式作业,因为它们延迟低。)
最后,在日志信息方面:
- 一定要检查 Cloud Logging(按照日志面板上的
Worker Logs
link。
- 如果您 运行
bq
command-line utility: bq show -j PROJECT:dataflow_job_XXXXXXX
. ,您可能会更好地了解为什么批处理数据流触发的加载作业失败
根据
maxBadRecords
配置。建议在将数据流作业中的行插入 BigQuery 之前验证它们。
如果我有 TableSchema
和 TableRow
,我该如何确保该行可以安全地插入 table?
一定有比遍历架构中的字段、查看它们的类型并查看行中值的 class 更简单的方法,对吧?这似乎很容易出错,而且该方法必须是万无一失的,因为如果无法加载单行,整个管道就会失败。
更新:
我的用例是一个 ETL 作业,首先将 运行 on JSON(每行一个对象)登录 Cloud Storage 并批量写入 BigQuery,但稍后将从中读取对象PubSub 并连续写入 BigQuery。这些对象包含大量 BigQuery 中不需要的信息,还包含甚至无法在模式中描述的部分(基本上是自由形式 JSON 有效负载)。时间戳之类的东西也需要格式化才能与 BigQuery 一起使用。这项工作 运行 在不同的输入和写入不同的 tables 时会有一些变体。
理论上这不是一个非常困难的过程,它需要一个对象,提取一些属性 (50-100),格式化其中一些并将对象输出到 BigQuery。我或多或少只是遍历 属性 名称列表,从源对象中提取值,查看配置以查看 属性 是否应该以某种方式格式化,必要时应用格式化(this可能是小写,将毫秒时间戳除以 1000,从 URL 中提取主机名,等等),并将值写入 TableRow
对象。
我的问题是数据很乱。有几亿个物体,有些看起来不像预期的那样,这种情况很少见,但对于这些体积,罕见的事情仍然会发生。有时应该包含字符串的 属性 包含整数,反之亦然。有时有一个数组或一个对象应该有一个字符串。
理想情况下,我想把我的 TableRow
交给 TableSchema
然后问 "does this work?"。
因为这是不可能的,所以我改为查看 TableSchema
对象并尝试 validate/cast 自己的值。如果 TableSchema
说 属性 是 STRING
类型,我会 运行 value.toString()
然后再将它添加到 TableRow
。如果它是 INTEGER
,我会检查它是 Integer
、Long
还是 BigInteger
,等等。这种方法的问题是我只是在猜测 BigQuery 中的工作原理。 FLOAT
将接受哪些 Java 数据类型?对于 TIMESTAMP
?我认为我的 validations/casts 解决了大多数问题,但总有例外和边缘情况。
根据我的经验,这是非常有限的,如果单行未通过 BigQuery 的验证(就像常规加载一样,除非 maxBadRecords
设置为足够大的数字)。它还会失败,并显示表面有用的消息,如 'BigQuery import job "dataflow_job_xxx" failed. Causes: (5db0b2cdab1557e0): BigQuery job "dataflow_job_xxx" in project "xxx" finished with error(s): errorResult: JSON map specified for non-record field, error: JSON map specified for non-record field, error: JSON map specified for non-record field, error: JSON map specified for non-record field, error: JSON map specified for non-record field, error: JSON map specified for non-record field'。也许在某个地方可以看到更详细的错误消息,可以告诉我它是哪个 属性 以及它的值是多少?如果没有这些信息,它也可以说 "bad data".
据我所知,至少当 运行 以批处理模式运行时,Dataflow 会将 TableRow
对象写入 Cloud Storage 中的暂存区,然后在一切就绪后开始加载。这意味着我无处可捕获任何错误,加载 BigQuery 时我的代码不再 运行ning。我还没有 运行 流模式下的任何工作,但我不确定那里会有什么不同,根据我(公认的有限)的理解,基本原理是相同的,只是批处理大小更小.
人们使用 Dataflow 和 BigQuery,因此在不总是担心整个管道因单个错误输入而停止的情况下完成这项工作并非不可能。人们是怎么做到的?
我假设您将文件中的 JSON 反序列化为 Map<String, Object>
。然后你应该能够使用 TableSchema
.
我建议采用迭代方法来开发模式验证,包括以下两个步骤。
编写一个
PTransform<Map<String, Object>, TableRow>
将您的 JSON 行转换为TableRow
对象。TableSchema
也应该是函数的构造函数参数。您可以开始让这个函数变得非常严格——要求 JSON 将输入直接解析为 Integer,例如,当发现 BigQuery INTEGER 模式时——并积极声明错误记录。基本上,通过超严格的处理来确保不输出无效记录。我们的 code here 做了一些类似的事情——给定一个由 BigQuery 生成并以 JSON 形式写入 GCS 的文件,我们递归地遍历模式并进行一些类型转换。但是,我们不需要验证,因为 BigQuery 本身写了数据。
请注意
TableSchema
对象不是Serializable
。我们通过将DoFn
或PTransform
构造函数中的TableSchema
转换为 JSONString
并返回。见 the code inBigQueryIO.java
that uses thejsonTableSchema
variable.使用此 blog post 中描述的 "dead-letter" 策略来处理错误记录——从你的 PTransform 中输出有问题的
Map<String, Object>
行并将它们写入文件。这样,您可以稍后检查未通过验证的行。
您可以从一些小文件开始并使用 DirectPipelineRunner
而不是 DataflowPipelineRunner
。直接 运行ner 运行s 管道在您的计算机上,而不是在 Google Cloud Dataflow 服务上,它使用 BigQuery 流式写入。我相信当这些写入失败时,您会收到更好的错误消息。
(我们对批处理作业使用 GCS->BigQuery 加载作业模式,因为它更高效且更具成本效益,但 BigQuery 流式写入流式作业,因为它们延迟低。)
最后,在日志信息方面:
- 一定要检查 Cloud Logging(按照日志面板上的
Worker Logs
link。 - 如果您 运行
bq
command-line utility:bq show -j PROJECT:dataflow_job_XXXXXXX
. ,您可能会更好地了解为什么批处理数据流触发的加载作业失败