Dataflow 的 BigQuery 插入器线程池耗尽
Dataflow's BigQuery inserter thread pool exhausted
我正在使用 Dataflow 将数据写入 BigQuery。
当音量变大并过了一段时间后,我从 Dataflow 收到此错误:
{
metadata: {
severity: "ERROR"
projectId: "[...]"
serviceName: "dataflow.googleapis.com"
region: "us-east1-d"
labels: {…}
timestamp: "2016-08-19T06:39:54.492Z"
projectNumber: "[...]"
}
insertId: "[...]"
log: "dataflow.googleapis.com/worker"
structPayload: {
message: "Uncaught exception: "
work: "[...]"
thread: "46"
worker: "[...]-08180915-7f04-harness-jv7y"
exception: "java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@1a1680f rejected from java.util.concurrent.ThreadPoolExecutor@b11a8a1[Shutting down, pool size = 100, active threads = 100, queued tasks = 2316, completed tasks = 1192]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:134)
at java.util.concurrent.Executors$DelegatedExecutorService.submit(Executors.java:681)
at com.google.cloud.dataflow.sdk.util.BigQueryTableInserter.insertAll(BigQueryTableInserter.java:218)
at com.google.cloud.dataflow.sdk.io.BigQueryIO$StreamingWriteFn.flushRows(BigQueryIO.java:2155)
at com.google.cloud.dataflow.sdk.io.BigQueryIO$StreamingWriteFn.finishBundle(BigQueryIO.java:2113)
at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.finishBundle(DoFnRunnerBase.java:158)
at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.finishBundle(SimpleParDoFn.java:196)
at com.google.cloud.dataflow.sdk.runners.worker.ForwardingParDoFn.finishBundle(ForwardingParDoFn.java:47)
at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.finish(ParDoOperation.java:62)
at com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:79)
at com.google.cloud.dataflow.sdk.runners.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:657)
at com.google.cloud.dataflow.sdk.runners.worker.StreamingDataflowWorker.access0(StreamingDataflowWorker.java:86)
at com.google.cloud.dataflow.sdk.runners.worker.StreamingDataflowWorker.run(StreamingDataflowWorker.java:483)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)"
logger: "com.google.cloud.dataflow.sdk.runners.worker.StreamingDataflowWorker"
stage: "F10"
job: "[...]"
}
}
看来我正在耗尽 BigQueryTableInserter.java:84 中定义的线程池。此线程池的硬编码大小为 100 个线程,无法配置。
我的问题是:
如何避免这个错误?
我是不是做错了什么?
池大小不应该是可配置的吗? 100 个线程如何完美满足所有需求和机器类型?
以下是我的使用背景:
我在流模式下使用 Dataflow,使用 KafkaIO.java
从 Kafka 读取数据
"After some time"是几个小时,(不到12h)
我正在使用 36 个类型为 n1-standard-4 的 worker
我正在从 Kafka 读取大约 180k messages/s(我的工作人员的网络输入速度约为 130MB/s)
消息被组合在一起,输出大约 7k messages/s 到 BigQuery
数据流工作人员在 us-east1-d 区域,BigQuery 数据集位置是美国
您没有做错任何事情,但您可能需要更多资源,具体取决于交易量保持高位的时间。
流式 BigQueryIO
写入通过 data size and row count 执行一些基本的插入批处理。如果我对你的数字理解正确,你的行就足够大了,每一行都在自己的请求中提交给 BigQuery。
插入的线程池似乎应该安装ThreadPoolExecutor.CallerRunsPolicy
which causes the caller to block and run jobs synchronously when they exceed the capacity of the executor. I've posted PR #393。这会将工作队列溢出转换为管道积压,因为所有处理线程都会阻塞。
至此,问题就标准了:
- 如果积压是暂时的,一旦数量减少,您就会赶上。
- 如果积压无限制地增长,那当然解决不了问题,你需要申请更多的资源。标志应与任何其他积压相同。
另一点需要注意的是,每个线程大约 250 rows/second 这将超过 BigQuery 配额 100k updates/second table(将重试此类失败,因此无论如何,您可能会通过它们)。如果我对你的数字理解正确,你离这个还很远。
我正在使用 Dataflow 将数据写入 BigQuery。
当音量变大并过了一段时间后,我从 Dataflow 收到此错误:
{
metadata: {
severity: "ERROR"
projectId: "[...]"
serviceName: "dataflow.googleapis.com"
region: "us-east1-d"
labels: {…}
timestamp: "2016-08-19T06:39:54.492Z"
projectNumber: "[...]"
}
insertId: "[...]"
log: "dataflow.googleapis.com/worker"
structPayload: {
message: "Uncaught exception: "
work: "[...]"
thread: "46"
worker: "[...]-08180915-7f04-harness-jv7y"
exception: "java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@1a1680f rejected from java.util.concurrent.ThreadPoolExecutor@b11a8a1[Shutting down, pool size = 100, active threads = 100, queued tasks = 2316, completed tasks = 1192]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:134)
at java.util.concurrent.Executors$DelegatedExecutorService.submit(Executors.java:681)
at com.google.cloud.dataflow.sdk.util.BigQueryTableInserter.insertAll(BigQueryTableInserter.java:218)
at com.google.cloud.dataflow.sdk.io.BigQueryIO$StreamingWriteFn.flushRows(BigQueryIO.java:2155)
at com.google.cloud.dataflow.sdk.io.BigQueryIO$StreamingWriteFn.finishBundle(BigQueryIO.java:2113)
at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.finishBundle(DoFnRunnerBase.java:158)
at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.finishBundle(SimpleParDoFn.java:196)
at com.google.cloud.dataflow.sdk.runners.worker.ForwardingParDoFn.finishBundle(ForwardingParDoFn.java:47)
at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.finish(ParDoOperation.java:62)
at com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:79)
at com.google.cloud.dataflow.sdk.runners.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:657)
at com.google.cloud.dataflow.sdk.runners.worker.StreamingDataflowWorker.access0(StreamingDataflowWorker.java:86)
at com.google.cloud.dataflow.sdk.runners.worker.StreamingDataflowWorker.run(StreamingDataflowWorker.java:483)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)"
logger: "com.google.cloud.dataflow.sdk.runners.worker.StreamingDataflowWorker"
stage: "F10"
job: "[...]"
}
}
看来我正在耗尽 BigQueryTableInserter.java:84 中定义的线程池。此线程池的硬编码大小为 100 个线程,无法配置。
我的问题是:
如何避免这个错误?
我是不是做错了什么?
池大小不应该是可配置的吗? 100 个线程如何完美满足所有需求和机器类型?
以下是我的使用背景:
我在流模式下使用 Dataflow,使用 KafkaIO.java
从 Kafka 读取数据
"After some time"是几个小时,(不到12h)
我正在使用 36 个类型为 n1-standard-4 的 worker
我正在从 Kafka 读取大约 180k messages/s(我的工作人员的网络输入速度约为 130MB/s)
消息被组合在一起,输出大约 7k messages/s 到 BigQuery
数据流工作人员在 us-east1-d 区域,BigQuery 数据集位置是美国
您没有做错任何事情,但您可能需要更多资源,具体取决于交易量保持高位的时间。
流式 BigQueryIO
写入通过 data size and row count 执行一些基本的插入批处理。如果我对你的数字理解正确,你的行就足够大了,每一行都在自己的请求中提交给 BigQuery。
插入的线程池似乎应该安装ThreadPoolExecutor.CallerRunsPolicy
which causes the caller to block and run jobs synchronously when they exceed the capacity of the executor. I've posted PR #393。这会将工作队列溢出转换为管道积压,因为所有处理线程都会阻塞。
至此,问题就标准了:
- 如果积压是暂时的,一旦数量减少,您就会赶上。
- 如果积压无限制地增长,那当然解决不了问题,你需要申请更多的资源。标志应与任何其他积压相同。
另一点需要注意的是,每个线程大约 250 rows/second 这将超过 BigQuery 配额 100k updates/second table(将重试此类失败,因此无论如何,您可能会通过它们)。如果我对你的数字理解正确,你离这个还很远。