Kafka stream groupBy 基于时间戳

Kafka stream groupBy based on timestamp

我使用 kafka 作为投票应用程序,用户可以在其中选择候选人并在 1 小时的时间范围内更改选择。

因为这个适合KTable,所以我用的是kafka stream app。但是,有时间范围要求,意味着我只需要在特定时间范围内 groupBy().count()(例如从 10:00-11:00)。

如何使用 Kafka Stream 实现此目的 API?
据我所知,Kafka(我使用 Kafka 2.3)将发布的时间戳放在元数据上,但如何访问它?我正在考虑使用基于时间戳

.filter()

我也看到 windowing documentation 但似乎时间是相对的(例如最后 1 小时)而不是固定的(10:00-11:00)。

谢谢

实际上 Tumbling windowFixed-size, non-overlapping, gap-less windows。在您的用例中,window 持续时间为一小时,例如,将创建 window 10:00-11:00(开始包含,结束排除):

kStream
    .groupByKey()
    .windowedBy(TimeWindows.of(Duration.ofHours(1)))
    .count();

蒂莫西,

要访问记录的时间戳,您可以使用 transformValues() 操作。 您提供的 ValuesTransformer 可以访问 ProcessorContext,您可以在 ValueTransformer.transform() 方法中调用 ProcessorContex.timestamp()。 如果时间戳在所需范围内,return 记录,否则 return 为空。然后在 transformValues() 之后添加 filter() 以删除您拒绝的记录。

这是一个我认为可行的例子

class GroupByTimestampExample {

  public static void main(String[] args) {

    final StreamsBuilder builder = new StreamsBuilder();
    // You need to update the the time fields these are just placeholders
    long earliest = Instant.now().toEpochMilli();
    long latest = Instant.now().toEpochMilli() + (60 * 60 * 1000);

    final ValueTransformerSupplier<String, String> valueTransformerSupplier = new TimeFilteringTransformer(earliest, latest);

    final KTable<String, Long> voteTable = builder.<String, String>stream("topic")
                                            .transformValues(valueTransformerSupplier)
                                            .filter((k, v) -> v != null)
                                            .groupByKey()
                                            .count();

  }




  static final class TimeFilteringTransformer implements ValueTransformerSupplier<String, String> {

    private final long earliest;
    private final long latest;

    public TimeFilteringTransformer(final long earliest, final long latest) {
      this.earliest = earliest;
      this.latest = latest;
    }

    @Override
    public ValueTransformer<String, String> get() {
      return new ValueTransformer<String, String>() {
        private ProcessorContext processorContext;

        @Override
        public void init(ProcessorContext context) {
          processorContext = context;
        }

        @Override
        public String transform(String value) {
         long ts = processorContext.timestamp();
         if (ts >= earliest && ts <= latest) {
            return value;
         }
         return null;
        }

        @Override
        public void close() {

        }
      };
    }
  }
}

告诉我进展如何。