如果在处理过程中出现问题,则停止处理 kafka 消息

Stop processing kafka messages if something goes wrong during process

在我的处理器 API 中,我将消息存储在键值存储中,每 100 条消息我都会发出一个 POST 请求。如果在尝试发送消息时出现问题(api 没有响应等),我想停止处理消息。在有证据表明 API 调用有效之前。 这是我的代码:

public class BulkProcessor implements Processor<byte[], UserEvent> {

    private KeyValueStore<Integer, ArrayList<UserEvent>> keyValueStore;

    private BulkAPIClient bulkClient;

    private String storeName;

    private ProcessorContext context;

    private int count;

    @Autowired
    public BulkProcessor(String storeName, BulkClient bulkClient) {
        this.storeName = storeName;
        this.bulkClient = bulkClient;
    }

    @Override
    public void init(ProcessorContext context) {
        this.context = context;
        keyValueStore = (KeyValueStore<Integer, ArrayList<UserEvent>>) context.getStateStore(storeName);
        count = 0;
        // to check every 15 minutes if there are any remainders in the store that are not sent yet
        this.context.schedule(Duration.ofMinutes(15), PunctuationType.WALL_CLOCK_TIME, (timestamp) -> {
            if (count > 0) {
                sendEntriesFromStore();
            }
        });
    }

    @Override
    public void process(byte[] key, UserEvent value) {
        int userGroupId = Integer.valueOf(value.getUserGroupId());
        ArrayList<UserEvent> userEventArrayList = keyValueStore.get(userGroupId);
        if (userEventArrayList == null) {
            userEventArrayList = new ArrayList<>();
        }
        userEventArrayList.add(value);
        keyValueStore.put(userGroupId, userEventArrayList);
        if (count == 100) {
            sendEntriesFromStore();
        }
    }

    private void sendEntriesFromStore() {
        KeyValueIterator<Integer, ArrayList<UserEvent>> iterator = keyValueStore.all();
        while (iterator.hasNext()) {
            KeyValue<Integer, ArrayList<UserEvent>> entry = iterator.next();
            BulkRequest bulkRequest = new BulkRequest(entry.key, entry.value);
            if (bulkRequest.getLocation() != null) {
                URI url = bulkClient.buildURIPath(bulkRequest);
                try {
                    bulkClient.postRequestBulkApi(url, bulkRequest);
                    keyValueStore.delete(entry.key);
                } catch (BulkApiException e) {
                    logger.warn(e.getMessage(), e.fillInStackTrace());
                }
            }
        }
        iterator.close();
        count = 0;
    }

    @Override
    public void close() {
    }
}

目前在我的代码中,如果对 API 的调用失败,它将迭代下一个 100(只要失败就会继续发生)并将它们添加到 keyValueStore。我不希望这发生。相反,我宁愿停止流并在 keyValueStore 清空后继续。这可能吗?
我可以扔 StreamsException 吗?

try {
    bulkClient.postRequestBulkApi(url, bulkRequest);
    keyValueStore.delete(entry.key);
} catch (BulkApiException e) {
    throw new StreamsException(e);
}

这会终止我的流应用程序并导致进程终止吗?

  1. 只有在确保记录已被 API 成功处理后,才应从状态存储中删除记录,因此删除第一个 keyValueStore.delete(entry.key); 并保留第二个。如果不是,那么当 keyValueStore.delete 致力于基础更新日志主题但您的消息尚未成功处理时,您可能会丢失一些消息,因此它最多只能保证一个。
  2. 只需将调用 API 代码包裹在一个无限循环中并继续尝试直到记录成功处理,您的处理器将不会使用来自上述处理器节点的新消息,因为它 运行 在同一个 StreamThread 中:
    private void sendEntriesFromStore() {
        KeyValueIterator<Integer, ArrayList<UserEvent>> iterator = keyValueStore.all();
        while (iterator.hasNext()) {
            KeyValue<Integer, ArrayList<UserEvent>> entry = iterator.next();
            //remove this state store delete code : keyValueStore.delete(entry.key);
            BulkRequest bulkRequest = new BulkRequest(entry.key, entry.value);
            if (bulkRequest.getLocation() != null) {
                URI url = bulkClient.buildURIPath(bulkRequest);
                while (true) {
                    try {
                        bulkClient.postRequestBulkApi(url, bulkRequest);
                        keyValueStore.delete(entry.key);//only delete after successfully process the message to achieve at least one processing guarantee
                        break;
                    } catch (BulkApiException e) {
                        logger.warn(e.getMessage(), e.fillInStackTrace());
                    }
                }
            }
        }
        iterator.close();
        count = 0;
    }
  1. 是的,您可以抛出 StreamsException,此 StreamTask 将在重新平衡期间迁移到另一个 StreamThread,可能在示例应用程序实例上。如果 API 在所有 StreamThread 死亡之前一直导致异常,您的应用程序将不会自动退出并接收以下异常,您应该添加自定义 StreamsException 处理程序以在所有流线程死亡时退出您的应用程序 KafkaStreams#setUncaughtExceptionHandler或监听 Stream State 变化(到 ERROR 状态):
All stream threads have died. The instance will be in error state and should be closed.

最后我使用了一个简单的 KafkaConsumer 而不是 KafkaStreams,但最重要的是我将 BulkApiException 更改为扩展 RuntimeException,我抛出在我登录后再次。所以现在它看起来如下:

        } catch (BulkApiException bae) {
            logger.error(bae.getMessage(), bae.fillInStackTrace());
            throw new BulkApiException();
        } finally {
            consumer.close();
            int exitCode = SpringApplication.exit(ctx, () -> 1);
            System.exit(exitCode);
        }

这样应用退出,k8s重启pod。那是因为如果我尝试转发请求的 api 已关闭,那么继续阅读消息就没有意义了。因此,在另一个 api 备份之前,k8s 将重新启动一个 pod。