FlinkKinesisConsumer 不会重试 NoHttpResponseException?

FlinkKinesisConsumer does not retry on NoHttpResponseException?

(Apache Flink1.8 on AWS EMR 发布标签 5.28.x)

我们的数据源是 AWS Kinesis 流(如果重要的话有 450 个分片)。我们使用 FlinkKinesisConsumer 来读取运动流。 我们的应用程序偶尔(每两天一次)崩溃并出现 "Target server failed to respond" 错误。完整的堆栈跟踪在底部。

深入研究代码库,我发现 'ProvisionedThroughputExceededException' 是唯一重试的异常类型。 Code
1. 想知道为什么 kinesis 连接器不重试瞬态 http 响应异常?
2. 有没有一种方法可以传递重试配置来重试这些错误?

附带说明一下,我们设置了以下重试配置 -

env.setRestartStrategy(RestartStrategies.failureRateRestart(12,
      org.apache.flink.api.common.time.Time.of(60, TimeUnit.MINUTES),
                org.apache.flink.api.common.time.Time.of(300, TimeUnit.SECONDS)));

异常的完整堆栈跟踪 -

    at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1201)
    at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1147)
    at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:796)
    at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:764)
    at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:738)
    at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access0(AmazonHttpClient.java:698)
    at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:680)
    at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:544)
    at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:524)
    at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2809)
    at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2776)
    at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2765)
    at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.executeGetRecords(AmazonKinesisClient.java:1292)
    at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:1263)
    at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getRecords(KinesisProxy.java:250)
    at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.getRecords(ShardConsumer.java:400)
    at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:243)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

你用env.setRestartStrategy()配置的重启策略是关于在失败的情况下重启整个Flink作业。它不会影响 Flink 中的 Kinesis 连接器。

Kinesis 消费者有 the following configuration settings(自 1.11 起)用于更改重启行为:

    /** The maximum number of records to try to get each time we fetch records from a AWS Kinesis shard. */
    public static final String SHARD_GETRECORDS_MAX = "flink.shard.getrecords.maxrecordcount";

    /** The maximum number of getRecords attempts if we get a recoverable exception. */
    public static final String SHARD_GETRECORDS_RETRIES = "flink.shard.getrecords.maxretries";

    /** The base backoff time between getRecords attempts if we get a ProvisionedThroughputExceededException. */
    public static final String SHARD_GETRECORDS_BACKOFF_BASE = "flink.shard.getrecords.backoff.base";

    /** The maximum backoff time between getRecords attempts if we get a ProvisionedThroughputExceededException. */
    public static final String SHARD_GETRECORDS_BACKOFF_MAX = "flink.shard.getrecords.backoff.max";

    /** The power constant for exponential backoff between each getRecords attempt. */
    public static final String SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT = "flink.shard.getrecords.backoff.expconst";

    /** The interval between each getRecords request to a AWS Kinesis shard in milliseconds. */
    public static final String SHARD_GETRECORDS_INTERVAL_MILLIS = "flink.shard.getrecords.intervalmillis";

KinesisProxy 支持重试异常,重试行为可以通过前面回答中提到的设置来控制。然而,并不是所有的异常都会被重试,并且默认的白名单并不涵盖 Kinesis 服务通常会发生的所有瞬态问题。我们已经如下定制代理(随着时间的推移)以达到稳定的生产设置:

  @Override
  protected boolean isRecoverableSdkClientException(SdkClientException ex) {
    if (ex instanceof KMSThrottlingException) {
      // not handled in KinesisProxy in 1.5.x
      return true;
    } else if (ex instanceof AmazonServiceException) {
      return KinesisProxy.isRecoverableException((AmazonServiceException)ex);
    } else if (ex.getCause() instanceof SocketTimeoutException) {
      return true;
    } else if (ex.getCause() instanceof NoHttpResponseException) {
      return true;
    } else if (ex.getCause() instanceof ConnectTimeoutException) {
      return true;
    } else if (ex.getCause() instanceof java.net.UnknownHostException) {
      return true;
    } else if (ex.getCause() instanceof javax.net.ssl.SSLHandshakeException) {
      return true;
    }
    return false;
  }