数据流:使用 BigQueryIO 写入时出现 SocketTimeoutException
Dataflow: SocketTimeoutException when writing with BigQueryIO
我正在使用 Dataflow BigQueryIO.Write.to()
将数据写入 BigQuery。
有时,我会从 Dataflow 收到此警告:
{
metadata: {
severity: "WARNING"
projectId: "[...]"
serviceName: "dataflow.googleapis.com"
region: "us-east1-d"
labels: {
compute.googleapis.com/resource_type: "instance"
compute.googleapis.com/resource_name: "dataflow-[...]-08240401-e41e-harness-7dkd"
dataflow.googleapis.com/region: "us-east1-d"
dataflow.googleapis.com/job_name: "[...]"
compute.googleapis.com/resource_id: "[...]"
dataflow.googleapis.com/step_id: ""
dataflow.googleapis.com/job_id: "[...]"
}
timestamp: "2016-08-30T11:32:00.591Z"
projectNumber: "[...]"
}
insertId: "[...]"
log: "dataflow.googleapis.com/worker"
structPayload: {
message: "exception thrown while executing request"
work: "[...]"
thread: "117"
worker: "dataflow-[...]-08240401-e41e-harness-7dkd"
exception: "java.net.SocketTimeoutException: Read timed out
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:170)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
at sun.security.ssl.InputRecord.read(InputRecord.java:503)
at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:961)
at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:918)
at sun.security.ssl.AppInputStream.read(AppInputStream.java:105)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
at sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:704)
at sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:647)
at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1535)
at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1440)
at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)
at sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:338)
at com.google.api.client.http.javanet.NetHttpResponse.<init>(NetHttpResponse.java:37)
at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:94)
at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:981)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
at com.google.cloud.dataflow.sdk.util.BigQueryTableInserter.call(BigQueryTableInserter.java:229)
at com.google.cloud.dataflow.sdk.util.BigQueryTableInserter.call(BigQueryTableInserter.java:222)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)"
logger: "com.google.api.client.http.HttpTransport"
stage: "F5"
job: "[...]"
}
}
我没有看到任何跟在这之后的 "retry" 日志。
我的问题是:
- 我会丢失数据吗?我不知道写操作是否正确完成。如果我正确理解代码,整个写入批处理都处于不确定状态。
- 如果是这样,有什么方法可以确保只将数据写入 BigQuery 一次?
- 如果是这样,严重性不应该是错误而不是警告吗?
以下是我的使用背景:
- 我在流模式下使用 Dataflow,使用 KafkaIO.java
从 Kafka 读取数据
- "Sometimes"可以是每小时0到3次
- 根据工作的不同,我使用了 2 到 36 个 n1-standard-4 类型的工人
- 根据工作的不同,我正在将 3k 到 10k messages/s 写入 BigQuery
- 平均邮件大小为 3kB
- Dataflow 工作人员在 us-east1-d 区域,BigQuery 数据集位置是美国
您将看到这些与来自 BigQuery 流服务的暂时性问题相关的错误。我的经验是,您可能会看到这些散落在工作的整个生命周期中。如果您看到这些日志大量中断,这通常意味着 BigQuery 流式处理服务出现故障。
Cloud Dataflow 将重试请求的行(请参阅此处的代码 BigQuery... line 290)。如果您在警告后的某个时候没有在 table 中看到这些日志项或您的记录 - 还有其他问题。
在流模式下,服务将无限次重试。这意味着作业不会因为这个问题而失败。由于我们永远尝试 - 它确实回避了这是错误还是警告的问题。我们将在内部对此进行辩论,您也可以 post 给 Apache Beam user group 留言来推动辩论 :-)
您可以在 Cloud Logging 中针对该警告消息创建指标并对其采取措施。我们正在致力于更深入的 Stackdriver 集成,这是一个很好的用例。
您不会丢失数据,而是您的数据到达 BigQuery 会延迟。我已经构建了一些简单的固定 window 并计算为 1 分钟 windows - 使用事件处理时间。然后,我会查看随时间变化的计数,以此作为新鲜度指标。如果我修复的 window 落后于水印,则插入有问题。
- 根据评论进行编辑以进一步说明
对于此异常继承自的 IOException,路径随后调用 ApiErrorExtractor() 来测试这是否是由于速率限制问题造成的。
在这种情况下,SocketTimeout 不是由于速率限制,因此将异常抛给调用者。调用者是 finishBundle 中的 BigQuery.IO 行 2308。它调用 flushRows() 捕获 IOException 并抛出 RuntimeException。
在 steaming 模式下,任何以这种方式失败的包都会被无限重试。注意:在批处理模式下,运行程序将尝试 4 次然后失败。
在这种情况下(非速率限制情况)您将不会重试行日志。
您的数据并没有丢失,而是在重试捆绑包时会延迟。
最坏的情况是所有工作人员都遇到此问题,因此管道无法取得进展。如果 BigQuery 流服务关闭或断开所有连接,则可能会发生这种情况。现在——一旦 BiqQuery 摄取服务稳定下来并且捆绑包通过,您可能会看到速率限制案例启动,但回退代码将有助于抑制这些错误。
最糟糕的情况是,您的传入管道数据速率不断徘徊在 BigQuery 流式摄取服务控制的最大写入速率(速率限制速率)附近。因此,如果您因重试(瞬态或其他)而遇到积压 - 您的管道可能永远赶不上。
流式数据流中有一个 Drain 功能,它将停止处理传入的数据,然后推进管道以优雅地排出所有未完成的数据 windows。但是,Drain 要求 finishBundle() 成功。因此,在这种情况下 (SocketTimeout) Drain 将被卡住。如果你终止了管道而不是排水——你会遇到未完成的包的数据丢失。
如果您愿意,可以覆盖 BigQuery.IO 逻辑并将出错的数据传输到其他地方。您可以这样做,但我希望 BigQuery 流媒体服务永远不会出现终端中断。话虽如此,如果您 运行 经常处于接近速率限制的速率并且对不可恢复的积压处理敏感,您可能需要实施不同的缩减或分片机制以避免速率限制问题。
关于积压恢复的另一个建议是,您可以停止事件流入您的流媒体源。例如,停止写 Pub/Sub 中的主题。您将开始通过订阅来撰写另一个主题。您现有的 Dataflow 管道会耗尽现有主题。您仍然需要处理如何处理新订阅中的新积压工作,但至少可以保证您不会丢失现有管道中的任何数据。
如果您不使用事件时间处理,这种方法可能非常有效;但是,您正在使用事件时间处理,您的 windows 将有重叠的输出,它们都标记为 ONTIME,即使情况并非如此。
我在这里对您的用例做出了很多假设,但我想分享一下,因为您的问题在考虑数据丢失时提出了其他架构概念。
希望这对您有所帮助。
我正在使用 Dataflow BigQueryIO.Write.to()
将数据写入 BigQuery。
有时,我会从 Dataflow 收到此警告:
{
metadata: {
severity: "WARNING"
projectId: "[...]"
serviceName: "dataflow.googleapis.com"
region: "us-east1-d"
labels: {
compute.googleapis.com/resource_type: "instance"
compute.googleapis.com/resource_name: "dataflow-[...]-08240401-e41e-harness-7dkd"
dataflow.googleapis.com/region: "us-east1-d"
dataflow.googleapis.com/job_name: "[...]"
compute.googleapis.com/resource_id: "[...]"
dataflow.googleapis.com/step_id: ""
dataflow.googleapis.com/job_id: "[...]"
}
timestamp: "2016-08-30T11:32:00.591Z"
projectNumber: "[...]"
}
insertId: "[...]"
log: "dataflow.googleapis.com/worker"
structPayload: {
message: "exception thrown while executing request"
work: "[...]"
thread: "117"
worker: "dataflow-[...]-08240401-e41e-harness-7dkd"
exception: "java.net.SocketTimeoutException: Read timed out
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:170)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
at sun.security.ssl.InputRecord.read(InputRecord.java:503)
at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:961)
at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:918)
at sun.security.ssl.AppInputStream.read(AppInputStream.java:105)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
at sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:704)
at sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:647)
at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1535)
at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1440)
at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)
at sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:338)
at com.google.api.client.http.javanet.NetHttpResponse.<init>(NetHttpResponse.java:37)
at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:94)
at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:981)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
at com.google.cloud.dataflow.sdk.util.BigQueryTableInserter.call(BigQueryTableInserter.java:229)
at com.google.cloud.dataflow.sdk.util.BigQueryTableInserter.call(BigQueryTableInserter.java:222)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)"
logger: "com.google.api.client.http.HttpTransport"
stage: "F5"
job: "[...]"
}
}
我没有看到任何跟在这之后的 "retry" 日志。
我的问题是:
- 我会丢失数据吗?我不知道写操作是否正确完成。如果我正确理解代码,整个写入批处理都处于不确定状态。
- 如果是这样,有什么方法可以确保只将数据写入 BigQuery 一次?
- 如果是这样,严重性不应该是错误而不是警告吗?
以下是我的使用背景:
- 我在流模式下使用 Dataflow,使用 KafkaIO.java 从 Kafka 读取数据
- "Sometimes"可以是每小时0到3次
- 根据工作的不同,我使用了 2 到 36 个 n1-standard-4 类型的工人
- 根据工作的不同,我正在将 3k 到 10k messages/s 写入 BigQuery
- 平均邮件大小为 3kB
- Dataflow 工作人员在 us-east1-d 区域,BigQuery 数据集位置是美国
您将看到这些与来自 BigQuery 流服务的暂时性问题相关的错误。我的经验是,您可能会看到这些散落在工作的整个生命周期中。如果您看到这些日志大量中断,这通常意味着 BigQuery 流式处理服务出现故障。
Cloud Dataflow 将重试请求的行(请参阅此处的代码 BigQuery... line 290)。如果您在警告后的某个时候没有在 table 中看到这些日志项或您的记录 - 还有其他问题。
在流模式下,服务将无限次重试。这意味着作业不会因为这个问题而失败。由于我们永远尝试 - 它确实回避了这是错误还是警告的问题。我们将在内部对此进行辩论,您也可以 post 给 Apache Beam user group 留言来推动辩论 :-)
您可以在 Cloud Logging 中针对该警告消息创建指标并对其采取措施。我们正在致力于更深入的 Stackdriver 集成,这是一个很好的用例。
您不会丢失数据,而是您的数据到达 BigQuery 会延迟。我已经构建了一些简单的固定 window 并计算为 1 分钟 windows - 使用事件处理时间。然后,我会查看随时间变化的计数,以此作为新鲜度指标。如果我修复的 window 落后于水印,则插入有问题。
- 根据评论进行编辑以进一步说明
对于此异常继承自的 IOException,路径随后调用 ApiErrorExtractor() 来测试这是否是由于速率限制问题造成的。
在这种情况下,SocketTimeout 不是由于速率限制,因此将异常抛给调用者。调用者是 finishBundle 中的 BigQuery.IO 行 2308。它调用 flushRows() 捕获 IOException 并抛出 RuntimeException。
在 steaming 模式下,任何以这种方式失败的包都会被无限重试。注意:在批处理模式下,运行程序将尝试 4 次然后失败。
在这种情况下(非速率限制情况)您将不会重试行日志。
您的数据并没有丢失,而是在重试捆绑包时会延迟。
最坏的情况是所有工作人员都遇到此问题,因此管道无法取得进展。如果 BigQuery 流服务关闭或断开所有连接,则可能会发生这种情况。现在——一旦 BiqQuery 摄取服务稳定下来并且捆绑包通过,您可能会看到速率限制案例启动,但回退代码将有助于抑制这些错误。
最糟糕的情况是,您的传入管道数据速率不断徘徊在 BigQuery 流式摄取服务控制的最大写入速率(速率限制速率)附近。因此,如果您因重试(瞬态或其他)而遇到积压 - 您的管道可能永远赶不上。
流式数据流中有一个 Drain 功能,它将停止处理传入的数据,然后推进管道以优雅地排出所有未完成的数据 windows。但是,Drain 要求 finishBundle() 成功。因此,在这种情况下 (SocketTimeout) Drain 将被卡住。如果你终止了管道而不是排水——你会遇到未完成的包的数据丢失。
如果您愿意,可以覆盖 BigQuery.IO 逻辑并将出错的数据传输到其他地方。您可以这样做,但我希望 BigQuery 流媒体服务永远不会出现终端中断。话虽如此,如果您 运行 经常处于接近速率限制的速率并且对不可恢复的积压处理敏感,您可能需要实施不同的缩减或分片机制以避免速率限制问题。
关于积压恢复的另一个建议是,您可以停止事件流入您的流媒体源。例如,停止写 Pub/Sub 中的主题。您将开始通过订阅来撰写另一个主题。您现有的 Dataflow 管道会耗尽现有主题。您仍然需要处理如何处理新订阅中的新积压工作,但至少可以保证您不会丢失现有管道中的任何数据。
如果您不使用事件时间处理,这种方法可能非常有效;但是,您正在使用事件时间处理,您的 windows 将有重叠的输出,它们都标记为 ONTIME,即使情况并非如此。
我在这里对您的用例做出了很多假设,但我想分享一下,因为您的问题在考虑数据丢失时提出了其他架构概念。
希望这对您有所帮助。