处理器 API:批量 POST 请求存储在 KeyValueStore 中的事件
Processor API: bulk POST request for events stored in KeyValueStore
正如此处所建议的那样 我已经使用处理器API 将传入的请求存储在KeyValueStore 中。每 100 个事件我想发送一个 POST
请求。所以我这样做了:
public class BulkProcessor implements Processor<byte[], UserEvent> {
private KeyValueStore<Integer, ArrayList<UserEvent>> keyValueStore;
private BulkAPIClient bulkClient;
private String storeName;
private ProcessorContext context;
private int count;
@Autowired
public BulkProcessor(String storeName, BulkClient bulkClient) {
this.storeName = storeName;
this.bulkClient = bulkClient;
}
@Override
public void init(ProcessorContext context) {
this.context = context;
keyValueStore = (KeyValueStore<Integer, ArrayList<UserEvent>>) context.getStateStore(storeName);
count = 0;
// to check every 15 minutes if there are any remainders in the store that are not sent yet
this.context.schedule(Duration.ofMinutes(15), PunctuationType.WALL_CLOCK_TIME, (timestamp) -> {
if (count > 0) {
sendEntriesFromStore();
}
});
}
@Override
public void process(byte[] key, UserEvent value) {
int userGroupId = Integer.valueOf(value.getUserGroupId());
ArrayList<UserEvent> userEventArrayList = keyValueStore.get(userGroupId);
if (userEventArrayList == null) {
userEventArrayList = new ArrayList<>();
}
userEventArrayList.add(value);
keyValueStore.put(userGroupId, userEventArrayList);
if (count == 100) {
sendEntriesFromStore();
}
}
private void sendEntriesFromStore() {
KeyValueIterator<Integer, ArrayList<UserEvent>> iterator = keyValueStore.all();
while (iterator.hasNext()) {
KeyValue<Integer, ArrayList<UserEvent>> entry = iterator.next();
keyValueStore.delete(entry.key);
BulkRequest bulkRequest = new BulkRequest(entry.key, entry.value);
if (bulkRequest.getLocation() != null) {
URI url = bulkClient.buildURIPath(bulkRequest);
bulkClient.postRequestBulkApi(url, bulkRequest);
}
}
iterator.close();
count = 0;
}
@Override
public void close() {
}
}
我不确定添加 count
是否是线程安全的,或者这是否是实现它的正确方法。目前我也只从一个分区读取。
所以我的问题是:
- 这是线程安全的吗?
- 这是在处理器 API 内发送批量 POST 请求的好方法吗?
Is this thread-safe?
是的,它是线程安全的。每个线程使用 ProcessorSupplier
创建它自己的 Processor
instance/object.
Is this a good way to send bulk POST requests within the Processor API?
我觉得整体不错。
正如此处所建议的那样POST
请求。所以我这样做了:
public class BulkProcessor implements Processor<byte[], UserEvent> {
private KeyValueStore<Integer, ArrayList<UserEvent>> keyValueStore;
private BulkAPIClient bulkClient;
private String storeName;
private ProcessorContext context;
private int count;
@Autowired
public BulkProcessor(String storeName, BulkClient bulkClient) {
this.storeName = storeName;
this.bulkClient = bulkClient;
}
@Override
public void init(ProcessorContext context) {
this.context = context;
keyValueStore = (KeyValueStore<Integer, ArrayList<UserEvent>>) context.getStateStore(storeName);
count = 0;
// to check every 15 minutes if there are any remainders in the store that are not sent yet
this.context.schedule(Duration.ofMinutes(15), PunctuationType.WALL_CLOCK_TIME, (timestamp) -> {
if (count > 0) {
sendEntriesFromStore();
}
});
}
@Override
public void process(byte[] key, UserEvent value) {
int userGroupId = Integer.valueOf(value.getUserGroupId());
ArrayList<UserEvent> userEventArrayList = keyValueStore.get(userGroupId);
if (userEventArrayList == null) {
userEventArrayList = new ArrayList<>();
}
userEventArrayList.add(value);
keyValueStore.put(userGroupId, userEventArrayList);
if (count == 100) {
sendEntriesFromStore();
}
}
private void sendEntriesFromStore() {
KeyValueIterator<Integer, ArrayList<UserEvent>> iterator = keyValueStore.all();
while (iterator.hasNext()) {
KeyValue<Integer, ArrayList<UserEvent>> entry = iterator.next();
keyValueStore.delete(entry.key);
BulkRequest bulkRequest = new BulkRequest(entry.key, entry.value);
if (bulkRequest.getLocation() != null) {
URI url = bulkClient.buildURIPath(bulkRequest);
bulkClient.postRequestBulkApi(url, bulkRequest);
}
}
iterator.close();
count = 0;
}
@Override
public void close() {
}
}
我不确定添加 count
是否是线程安全的,或者这是否是实现它的正确方法。目前我也只从一个分区读取。
所以我的问题是:
- 这是线程安全的吗?
- 这是在处理器 API 内发送批量 POST 请求的好方法吗?
Is this thread-safe?
是的,它是线程安全的。每个线程使用 ProcessorSupplier
创建它自己的 Processor
instance/object.
Is this a good way to send bulk POST requests within the Processor API?
我觉得整体不错。