处理器 API:批量 POST 请求存储在 KeyValueStore 中的事件

Processor API: bulk POST request for events stored in KeyValueStore

正如此处所建议的那样 我已经使用处理器API 将传入的请求存储在KeyValueStore 中。每 100 个事件我想发送一个 POST 请求。所以我这样做了:

public class BulkProcessor implements Processor<byte[], UserEvent> {

    private KeyValueStore<Integer, ArrayList<UserEvent>> keyValueStore;

    private BulkAPIClient bulkClient;

    private String storeName;

    private ProcessorContext context;

    private int count;

    @Autowired
    public BulkProcessor(String storeName, BulkClient bulkClient) {
        this.storeName = storeName;
        this.bulkClient = bulkClient;
    }

    @Override
    public void init(ProcessorContext context) {
        this.context = context;
        keyValueStore = (KeyValueStore<Integer, ArrayList<UserEvent>>) context.getStateStore(storeName);
        count = 0;
        // to check every 15 minutes if there are any remainders in the store that are not sent yet
        this.context.schedule(Duration.ofMinutes(15), PunctuationType.WALL_CLOCK_TIME, (timestamp) -> {
            if (count > 0) {
                sendEntriesFromStore();
            }
        });
    }

    @Override
    public void process(byte[] key, UserEvent value) {
        int userGroupId = Integer.valueOf(value.getUserGroupId());
        ArrayList<UserEvent> userEventArrayList = keyValueStore.get(userGroupId);
        if (userEventArrayList == null) {
            userEventArrayList = new ArrayList<>();
        }
        userEventArrayList.add(value);
        keyValueStore.put(userGroupId, userEventArrayList);
        if (count == 100) {
            sendEntriesFromStore();
        }
    }

    private void sendEntriesFromStore() {
        KeyValueIterator<Integer, ArrayList<UserEvent>> iterator = keyValueStore.all();
        while (iterator.hasNext()) {
            KeyValue<Integer, ArrayList<UserEvent>> entry = iterator.next();
            keyValueStore.delete(entry.key);
            BulkRequest bulkRequest = new BulkRequest(entry.key, entry.value);
            if (bulkRequest.getLocation() != null) {
                URI url = bulkClient.buildURIPath(bulkRequest);
                bulkClient.postRequestBulkApi(url, bulkRequest);
            }
        }
        iterator.close();
        count = 0;
    }

    @Override
    public void close() {
    }
}

我不确定添加 count 是否是线程安全的,或者这是否是实现它的正确方法。目前我也只从一个分区读取。 所以我的问题是:

Is this thread-safe?

是的,它是线程安全的。每个线程使用 ProcessorSupplier 创建它自己的 Processor instance/object.

Is this a good way to send bulk POST requests within the Processor API?

我觉得整体不错。