如果在处理过程中出现问题,则停止处理 kafka 消息
Stop processing kafka messages if something goes wrong during process
在我的处理器 API 中,我将消息存储在键值存储中,每 100 条消息我都会发出一个 POST
请求。如果在尝试发送消息时出现问题(api 没有响应等),我想停止处理消息。在有证据表明 API 调用有效之前。
这是我的代码:
public class BulkProcessor implements Processor<byte[], UserEvent> {
private KeyValueStore<Integer, ArrayList<UserEvent>> keyValueStore;
private BulkAPIClient bulkClient;
private String storeName;
private ProcessorContext context;
private int count;
@Autowired
public BulkProcessor(String storeName, BulkClient bulkClient) {
this.storeName = storeName;
this.bulkClient = bulkClient;
}
@Override
public void init(ProcessorContext context) {
this.context = context;
keyValueStore = (KeyValueStore<Integer, ArrayList<UserEvent>>) context.getStateStore(storeName);
count = 0;
// to check every 15 minutes if there are any remainders in the store that are not sent yet
this.context.schedule(Duration.ofMinutes(15), PunctuationType.WALL_CLOCK_TIME, (timestamp) -> {
if (count > 0) {
sendEntriesFromStore();
}
});
}
@Override
public void process(byte[] key, UserEvent value) {
int userGroupId = Integer.valueOf(value.getUserGroupId());
ArrayList<UserEvent> userEventArrayList = keyValueStore.get(userGroupId);
if (userEventArrayList == null) {
userEventArrayList = new ArrayList<>();
}
userEventArrayList.add(value);
keyValueStore.put(userGroupId, userEventArrayList);
if (count == 100) {
sendEntriesFromStore();
}
}
private void sendEntriesFromStore() {
KeyValueIterator<Integer, ArrayList<UserEvent>> iterator = keyValueStore.all();
while (iterator.hasNext()) {
KeyValue<Integer, ArrayList<UserEvent>> entry = iterator.next();
BulkRequest bulkRequest = new BulkRequest(entry.key, entry.value);
if (bulkRequest.getLocation() != null) {
URI url = bulkClient.buildURIPath(bulkRequest);
try {
bulkClient.postRequestBulkApi(url, bulkRequest);
keyValueStore.delete(entry.key);
} catch (BulkApiException e) {
logger.warn(e.getMessage(), e.fillInStackTrace());
}
}
}
iterator.close();
count = 0;
}
@Override
public void close() {
}
}
目前在我的代码中,如果对 API 的调用失败,它将迭代下一个 100(只要失败就会继续发生)并将它们添加到 keyValueStore
。我不希望这发生。相反,我宁愿停止流并在 keyValueStore
清空后继续。这可能吗?
我可以扔 StreamsException
吗?
try {
bulkClient.postRequestBulkApi(url, bulkRequest);
keyValueStore.delete(entry.key);
} catch (BulkApiException e) {
throw new StreamsException(e);
}
这会终止我的流应用程序并导致进程终止吗?
- 只有在确保记录已被 API 成功处理后,才应从状态存储中删除记录,因此删除第一个
keyValueStore.delete(entry.key);
并保留第二个。如果不是,那么当 keyValueStore.delete
致力于基础更新日志主题但您的消息尚未成功处理时,您可能会丢失一些消息,因此它最多只能保证一个。
- 只需将调用 API 代码包裹在一个无限循环中并继续尝试直到记录成功处理,您的处理器将不会使用来自上述处理器节点的新消息,因为它 运行 在同一个 StreamThread 中:
private void sendEntriesFromStore() {
KeyValueIterator<Integer, ArrayList<UserEvent>> iterator = keyValueStore.all();
while (iterator.hasNext()) {
KeyValue<Integer, ArrayList<UserEvent>> entry = iterator.next();
//remove this state store delete code : keyValueStore.delete(entry.key);
BulkRequest bulkRequest = new BulkRequest(entry.key, entry.value);
if (bulkRequest.getLocation() != null) {
URI url = bulkClient.buildURIPath(bulkRequest);
while (true) {
try {
bulkClient.postRequestBulkApi(url, bulkRequest);
keyValueStore.delete(entry.key);//only delete after successfully process the message to achieve at least one processing guarantee
break;
} catch (BulkApiException e) {
logger.warn(e.getMessage(), e.fillInStackTrace());
}
}
}
}
iterator.close();
count = 0;
}
- 是的,您可以抛出 StreamsException,此
StreamTask
将在重新平衡期间迁移到另一个 StreamThread,可能在示例应用程序实例上。如果 API 在所有 StreamThread 死亡之前一直导致异常,您的应用程序将不会自动退出并接收以下异常,您应该添加自定义 StreamsException 处理程序以在所有流线程死亡时退出您的应用程序 KafkaStreams#setUncaughtExceptionHandler
或监听 Stream State 变化(到 ERROR 状态):
All stream threads have died. The instance will be in error state and should be closed.
最后我使用了一个简单的 KafkaConsumer
而不是 KafkaStreams
,但最重要的是我将 BulkApiException
更改为扩展 RuntimeException
,我抛出在我登录后再次。所以现在它看起来如下:
} catch (BulkApiException bae) {
logger.error(bae.getMessage(), bae.fillInStackTrace());
throw new BulkApiException();
} finally {
consumer.close();
int exitCode = SpringApplication.exit(ctx, () -> 1);
System.exit(exitCode);
}
这样应用退出,k8s重启pod。那是因为如果我尝试转发请求的 api 已关闭,那么继续阅读消息就没有意义了。因此,在另一个 api 备份之前,k8s 将重新启动一个 pod。
在我的处理器 API 中,我将消息存储在键值存储中,每 100 条消息我都会发出一个 POST
请求。如果在尝试发送消息时出现问题(api 没有响应等),我想停止处理消息。在有证据表明 API 调用有效之前。
这是我的代码:
public class BulkProcessor implements Processor<byte[], UserEvent> {
private KeyValueStore<Integer, ArrayList<UserEvent>> keyValueStore;
private BulkAPIClient bulkClient;
private String storeName;
private ProcessorContext context;
private int count;
@Autowired
public BulkProcessor(String storeName, BulkClient bulkClient) {
this.storeName = storeName;
this.bulkClient = bulkClient;
}
@Override
public void init(ProcessorContext context) {
this.context = context;
keyValueStore = (KeyValueStore<Integer, ArrayList<UserEvent>>) context.getStateStore(storeName);
count = 0;
// to check every 15 minutes if there are any remainders in the store that are not sent yet
this.context.schedule(Duration.ofMinutes(15), PunctuationType.WALL_CLOCK_TIME, (timestamp) -> {
if (count > 0) {
sendEntriesFromStore();
}
});
}
@Override
public void process(byte[] key, UserEvent value) {
int userGroupId = Integer.valueOf(value.getUserGroupId());
ArrayList<UserEvent> userEventArrayList = keyValueStore.get(userGroupId);
if (userEventArrayList == null) {
userEventArrayList = new ArrayList<>();
}
userEventArrayList.add(value);
keyValueStore.put(userGroupId, userEventArrayList);
if (count == 100) {
sendEntriesFromStore();
}
}
private void sendEntriesFromStore() {
KeyValueIterator<Integer, ArrayList<UserEvent>> iterator = keyValueStore.all();
while (iterator.hasNext()) {
KeyValue<Integer, ArrayList<UserEvent>> entry = iterator.next();
BulkRequest bulkRequest = new BulkRequest(entry.key, entry.value);
if (bulkRequest.getLocation() != null) {
URI url = bulkClient.buildURIPath(bulkRequest);
try {
bulkClient.postRequestBulkApi(url, bulkRequest);
keyValueStore.delete(entry.key);
} catch (BulkApiException e) {
logger.warn(e.getMessage(), e.fillInStackTrace());
}
}
}
iterator.close();
count = 0;
}
@Override
public void close() {
}
}
目前在我的代码中,如果对 API 的调用失败,它将迭代下一个 100(只要失败就会继续发生)并将它们添加到 keyValueStore
。我不希望这发生。相反,我宁愿停止流并在 keyValueStore
清空后继续。这可能吗?
我可以扔 StreamsException
吗?
try {
bulkClient.postRequestBulkApi(url, bulkRequest);
keyValueStore.delete(entry.key);
} catch (BulkApiException e) {
throw new StreamsException(e);
}
这会终止我的流应用程序并导致进程终止吗?
- 只有在确保记录已被 API 成功处理后,才应从状态存储中删除记录,因此删除第一个
keyValueStore.delete(entry.key);
并保留第二个。如果不是,那么当keyValueStore.delete
致力于基础更新日志主题但您的消息尚未成功处理时,您可能会丢失一些消息,因此它最多只能保证一个。 - 只需将调用 API 代码包裹在一个无限循环中并继续尝试直到记录成功处理,您的处理器将不会使用来自上述处理器节点的新消息,因为它 运行 在同一个 StreamThread 中:
private void sendEntriesFromStore() {
KeyValueIterator<Integer, ArrayList<UserEvent>> iterator = keyValueStore.all();
while (iterator.hasNext()) {
KeyValue<Integer, ArrayList<UserEvent>> entry = iterator.next();
//remove this state store delete code : keyValueStore.delete(entry.key);
BulkRequest bulkRequest = new BulkRequest(entry.key, entry.value);
if (bulkRequest.getLocation() != null) {
URI url = bulkClient.buildURIPath(bulkRequest);
while (true) {
try {
bulkClient.postRequestBulkApi(url, bulkRequest);
keyValueStore.delete(entry.key);//only delete after successfully process the message to achieve at least one processing guarantee
break;
} catch (BulkApiException e) {
logger.warn(e.getMessage(), e.fillInStackTrace());
}
}
}
}
iterator.close();
count = 0;
}
- 是的,您可以抛出 StreamsException,此
StreamTask
将在重新平衡期间迁移到另一个 StreamThread,可能在示例应用程序实例上。如果 API 在所有 StreamThread 死亡之前一直导致异常,您的应用程序将不会自动退出并接收以下异常,您应该添加自定义 StreamsException 处理程序以在所有流线程死亡时退出您的应用程序KafkaStreams#setUncaughtExceptionHandler
或监听 Stream State 变化(到 ERROR 状态):
All stream threads have died. The instance will be in error state and should be closed.
最后我使用了一个简单的 KafkaConsumer
而不是 KafkaStreams
,但最重要的是我将 BulkApiException
更改为扩展 RuntimeException
,我抛出在我登录后再次。所以现在它看起来如下:
} catch (BulkApiException bae) {
logger.error(bae.getMessage(), bae.fillInStackTrace());
throw new BulkApiException();
} finally {
consumer.close();
int exitCode = SpringApplication.exit(ctx, () -> 1);
System.exit(exitCode);
}
这样应用退出,k8s重启pod。那是因为如果我尝试转发请求的 api 已关闭,那么继续阅读消息就没有意义了。因此,在另一个 api 备份之前,k8s 将重新启动一个 pod。