数据流:使用 BigQueryIO 写入时出现 SocketTimeoutException

Dataflow: SocketTimeoutException when writing with BigQueryIO

我正在使用 Dataflow BigQueryIO.Write.to() 将数据写入 BigQuery。

有时,我会从 Dataflow 收到此警告:

{
 metadata: {
  severity: "WARNING"    
  projectId: "[...]"    
  serviceName: "dataflow.googleapis.com"    
  region: "us-east1-d"    
  labels: {
   compute.googleapis.com/resource_type: "instance"     
   compute.googleapis.com/resource_name: "dataflow-[...]-08240401-e41e-harness-7dkd"     
   dataflow.googleapis.com/region: "us-east1-d"     
   dataflow.googleapis.com/job_name: "[...]"     
   compute.googleapis.com/resource_id: "[...]"     
   dataflow.googleapis.com/step_id: ""     
   dataflow.googleapis.com/job_id: "[...]"     
  }
  timestamp: "2016-08-30T11:32:00.591Z"    
  projectNumber: "[...]"    
 }
 insertId: "[...]"   
 log: "dataflow.googleapis.com/worker"   
 structPayload: {
  message: "exception thrown while executing request"    
  work: "[...]"    
  thread: "117"    
  worker: "dataflow-[...]-08240401-e41e-harness-7dkd"    
  exception: "java.net.SocketTimeoutException: Read timed out
    at java.net.SocketInputStream.socketRead0(Native Method)
    at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
    at java.net.SocketInputStream.read(SocketInputStream.java:170)
    at java.net.SocketInputStream.read(SocketInputStream.java:141)
    at sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
    at sun.security.ssl.InputRecord.read(InputRecord.java:503)
    at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:961)
    at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:918)
    at sun.security.ssl.AppInputStream.read(AppInputStream.java:105)
    at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
    at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
    at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
    at sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:704)
    at sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:647)
    at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1535)
    at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1440)
    at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)
    at sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:338)
    at com.google.api.client.http.javanet.NetHttpResponse.<init>(NetHttpResponse.java:37)
    at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:94)
    at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:981)
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419)
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
    at com.google.cloud.dataflow.sdk.util.BigQueryTableInserter.call(BigQueryTableInserter.java:229)
    at com.google.cloud.dataflow.sdk.util.BigQueryTableInserter.call(BigQueryTableInserter.java:222)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)"    
  logger: "com.google.api.client.http.HttpTransport"    
  stage: "F5"    
  job: "[...]"    
 }
}

我没有看到任何跟在这之后的 "retry" 日志。

我的问题是:

以下是我的使用背景:

您将看到这些与来自 BigQuery 流服务的暂时性问题相关的错误。我的经验是,您可能会看到这些散落在工作的整个生命周期中。如果您看到这些日志大量中断,这通常意味着 BigQuery 流式处理服务出现故障。

Cloud Dataflow 将重试请求的行(请参阅此处的代码 BigQuery... line 290)。如果您在警告后的某个时候没有在 table 中看到这些日志项或您的记录 - 还有其他问题。

在流模式下,服务将无限次重试。这意味着作业不会因为这个问题而失败。由于我们永远尝试 - 它确实回避了这是错误还是警告的问题。我们将在内部对此进行辩论,您也可以 post 给 Apache Beam user group 留言来推动辩论 :-)

您可以在 Cloud Logging 中针对该警告消息创建指标并对其采取措施。我们正在致力于更深入的 Stackdriver 集成,这是一个很好的用例。

您不会丢失数据,而是您的数据到达 BigQuery 会延迟。我已经构建了一些简单的固定 window 并计算为 1 分钟 windows - 使用事件处理时间。然后,我会查看随时间变化的计数,以此作为新鲜度指标。如果我修复的 window 落后于水印,则插入有问题。

  • 根据评论进行编辑以进一步说明

对于此异常继承自的 IOException,路径随后调用 ApiErrorExtractor() 来测试这是否是由于速率限制问题造成的。

在这种情况下,SocketTimeout 不是由于速率限制,因此将异常抛给调用者。调用者是 finishBundle 中的 BigQuery.IO 行 2308。它调用 flushRows() 捕获 IOException 并抛出 RuntimeException。

在 steaming 模式下,任何以这种方式失败的包都会被无限重试。注意:在批处理模式下,运行程序将尝试 4 次然后失败。

在这种情况下(非速率限制情况)您将不会重试行日志。

您的数据并没有丢失,而是在重试捆绑包时会延迟。

最坏的情况是所有工作人员都遇到此问题,因此管道无法取得进展。如果 BigQuery 流服务关闭或断开所有连接,则可能会发生这种情况。现在——一旦 BiqQuery 摄取服务稳定下来并且捆绑包通过,您可能会看到速率限制案例启动,但回退代码将有助于抑制这些错误。

最糟糕的情况是,您的传入管道数据速率不断徘徊在 BigQuery 流式摄取服务控制的最大写入速率(速率限制速率)附近。因此,如果您因重试(瞬态或其他)而遇到积压 - 您的管道可能永远赶不上。

流式数据流中有一个 Drain 功能,它将停止处理传入的数据,然后推进管道以优雅地排出所有未完成的数据 windows。但是,Drain 要求 finishBundle() 成功。因此,在这种情况下 (SocketTimeout) Drain 将被卡住。如果你终止了管道而不是排水——你会遇到未完成的包的数据丢失。

如果您愿意,可以覆盖 BigQuery.IO 逻辑并将出错的数据传输到其他地方。您可以这样做,但我希望 BigQuery 流媒体服务永远不会出现终端中断。话虽如此,如果您 运行 经常处于接近速率限制的速率并且对不可恢复的积压处理敏感,您可能需要实施不同的缩减或分片机制以避免速率限制问题。

关于积压恢复的另一个建议是,您可以停止事件流入您的流媒体源。例如,停止写 Pub/Sub 中的主题。您将开始通过订阅来撰写另一个主题。您现有的 Dataflow 管道会耗尽现有主题。您仍然需要处理如何处理新订阅中的新积压工作,但至少可以保证您不会丢失现有管道中的任何数据。

如果您不使用事件时间处理,这种方法可能非常有效;但是,您正在使用事件时间处理,您的 windows 将有重叠的输出,它们都标记为 ONTIME,即使情况并非如此。

我在这里对您的用例做出了很多假设,但我想分享一下,因为您的问题在考虑数据丢失时提出了其他架构概念。

希望这对您有所帮助。