如何让 KTable 只发出最新的更新?
How to have a KTable emits only the latest updates?
我的 KTable
在每次更新时发出,而不是仅在最新更新时发出。
请参阅下面的代码(在 Scala 中):
object SimpleTable extends App {
val topic = "simple-table"
val prodProps = new Properties()
prodProps.put("bootstrap.servers", "localhost:9092")
prodProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
prodProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
prodProps.put("acks", "1")
prodProps.put("retries", "3")
val producer = new KafkaProducer[String, String](prodProps)
producer.send(new ProducerRecord[String, String](topic, "key1", "value1"))
producer.send(new ProducerRecord[String, String](topic, "key2", "value2"))
producer.send(new ProducerRecord[String, String](topic, "key3", "value3"))
producer.send(new ProducerRecord[String, String](topic, "key1", "value11"))
producer.send(new ProducerRecord[String, String](topic, "key2", "value22"))
producer.send(new ProducerRecord[String, String](topic, "key3", "value33"))
producer.close()
val streamProps = new Properties()
streamProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "simple-table-app1")
streamProps.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
//streamProps.put(ConsumerConfig.GROUP_ID_CONFIG, "group11")
//streamProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "client11")
//streamProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
//streamProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "18000")
//streamProps.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "18000")
//streamProps.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "10485760")
//streamProps.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "1")
//streamProps.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "10000")
//streamProps.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1)
//streamProps.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, classOf[WallclockTimestampExtractor])
import org.apache.kafka.streams.scala.Serdes._
implicit val consumeSerdes: Consumed[String, String] = Consumed.`with`[String, String]
val builder = new StreamsBuilder()
val simpleTable: KTable[String, String] = builder.table[String, String](topic)
simpleTable.toStream.print(Printed.toSysOut[String, String].withLabel("simple-table"))
val streams = new KafkaStreams(builder.build(), streamProps)
streams.start()
Thread.sleep(10000)
streams.close()
}
此应用正在显示:
[simple-table]: key1, value1
[simple-table]: key2, value2
[simple-table]: key3, value3
[simple-table]: key1, value11
[simple-table]: key2, value22
[simple-table]: key3, value33
我应该只有最新的 3 行。请帮忙。
更新
按照下面的解决方案,当我像这样创建 KTable 时一切正常:
val simpleTable: KTable[String, String] =
builder.table[String, String](topic, Materialized.as[String, String, KeyValueStore[Bytes, Array[Byte]]]("simple-table-store"))
我从这个 得到了答案。
用于使用早于 2.2 的旧版本的 kafka-streams 的代码。
复制粘贴
In Kafka 2.2, an optimization was introduced to reduce the resource footprint of Kafka Streams. A KTable is not necessarily materialized if it's not required for the computation. This holds for your case, because mapValues() can be computed on-the-fly. Because the KTable is not materialized, there is no cache and thus each input record produces one output record.
Compare: https://issues.apache.org/jira/browse/KAFKA-6036
If you want to enforce KTable materialization, you can pass in Materilized.as("someStoreName") into StreamsBuilder#table() method.
我的 KTable
在每次更新时发出,而不是仅在最新更新时发出。
请参阅下面的代码(在 Scala 中):
object SimpleTable extends App {
val topic = "simple-table"
val prodProps = new Properties()
prodProps.put("bootstrap.servers", "localhost:9092")
prodProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
prodProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
prodProps.put("acks", "1")
prodProps.put("retries", "3")
val producer = new KafkaProducer[String, String](prodProps)
producer.send(new ProducerRecord[String, String](topic, "key1", "value1"))
producer.send(new ProducerRecord[String, String](topic, "key2", "value2"))
producer.send(new ProducerRecord[String, String](topic, "key3", "value3"))
producer.send(new ProducerRecord[String, String](topic, "key1", "value11"))
producer.send(new ProducerRecord[String, String](topic, "key2", "value22"))
producer.send(new ProducerRecord[String, String](topic, "key3", "value33"))
producer.close()
val streamProps = new Properties()
streamProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "simple-table-app1")
streamProps.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
//streamProps.put(ConsumerConfig.GROUP_ID_CONFIG, "group11")
//streamProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "client11")
//streamProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
//streamProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "18000")
//streamProps.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "18000")
//streamProps.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "10485760")
//streamProps.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "1")
//streamProps.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "10000")
//streamProps.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1)
//streamProps.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, classOf[WallclockTimestampExtractor])
import org.apache.kafka.streams.scala.Serdes._
implicit val consumeSerdes: Consumed[String, String] = Consumed.`with`[String, String]
val builder = new StreamsBuilder()
val simpleTable: KTable[String, String] = builder.table[String, String](topic)
simpleTable.toStream.print(Printed.toSysOut[String, String].withLabel("simple-table"))
val streams = new KafkaStreams(builder.build(), streamProps)
streams.start()
Thread.sleep(10000)
streams.close()
}
此应用正在显示:
[simple-table]: key1, value1
[simple-table]: key2, value2
[simple-table]: key3, value3
[simple-table]: key1, value11
[simple-table]: key2, value22
[simple-table]: key3, value33
我应该只有最新的 3 行。请帮忙。
更新
按照下面的解决方案,当我像这样创建 KTable 时一切正常:
val simpleTable: KTable[String, String] =
builder.table[String, String](topic, Materialized.as[String, String, KeyValueStore[Bytes, Array[Byte]]]("simple-table-store"))
我从这个
用于使用早于 2.2 的旧版本的 kafka-streams 的代码。
复制粘贴
In Kafka 2.2, an optimization was introduced to reduce the resource footprint of Kafka Streams. A KTable is not necessarily materialized if it's not required for the computation. This holds for your case, because mapValues() can be computed on-the-fly. Because the KTable is not materialized, there is no cache and thus each input record produces one output record.
Compare: https://issues.apache.org/jira/browse/KAFKA-6036
If you want to enforce KTable materialization, you can pass in Materilized.as("someStoreName") into StreamsBuilder#table() method.