如何在 Apache Flink 中 flatMap 到数据库?

How to flatMap to database in Apache Flink?

我正在使用 Apache Flink 尝试从 Kafka 获取 JSON 条记录到 InfluxDB,在此过程中将它们从一条 JSON 条记录拆分为多个 InfluxDB 点。

我找到了 flatMap 转换,感觉它符合目的。核心代码如下所示:

DataStream<InfluxDBPoint> dataStream = stream.flatMap(new FlatMapFunction<JsonConsumerRecord, InfluxDBPoint>() {
    @Override
    public void flatMap(JsonConsumerRecord record, Collector<InfluxDBPoint> out) throws Exception {
        Iterator<Entry<String, JsonNode>> iterator = //...

        while (iterator.hasNext()) {
            // extract point from input
            InfluxDBPoint point = //...

            out.collect(point);
        }
    }
});

出于某种原因,我只将其中一个收集到的点流式传输到数据库中。

即使我打印出所有映射的条目,它似乎也能正常工作:dataStream.print() 产生:

org.apache.flink.streaming.connectors.influxdb.InfluxDBPoint@144fd091
org.apache.flink.streaming.connectors.influxdb.InfluxDBPoint@57256d1
org.apache.flink.streaming.connectors.influxdb.InfluxDBPoint@28c38504
org.apache.flink.streaming.connectors.influxdb.InfluxDBPoint@2d3a66b3

我是不是误解了flatMap或者 Influx 连接器中可能存在一些错误?

这个问题实际上与 Influx 中的一个序列(由其标签集和测量 as seen here 定义)每次只能有一个点 有关,因此尽管我的字段不同,但最后一个点用相同的时间值覆盖了所有之前的点。