FlinkKafkaConsumer010 在使用 setStartFromTimestamp 设置时不起作用
FlinkKafkaConsumer010 doesn't work when set with setStartFromTimestamp
我正在使用 flink streaming 和 flink-connector-kafka 来处理来自 kafka 的数据。当我用 setStartFromTimestamp(1586852770000L) 配置 FlinkKafkaConsumer010 时,此时,kafka 主题 A 中所有数据的时间都在 1586852770000L 之前,然后我向主题 A 的分区 0 和分区 4 发送一些消息(主题 A 有 6 个分区,当前系统时间已经在 1586852770000L 之后)。但是我的 flink 程序不使用主题 A 的任何数据。这是一个问题吗?
如果我停止我的 flink 程序并重新启动它,它可以使用主题 A 的分区 0 和分区 4 的数据,但如果我将数据发送到另一个分区,它仍然不会使用其他 4 个分区的任何数据4 个分区,除非我再次重新启动我的 flink 程序。
kafka的日志如下:
2020-04-15 11:48:46,447 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Sending ListOffsetRequest (type=ListOffsetRequest, replicaId=-1, partitionTimestamps={TopicA-4=1586836800000}, minVersion=1) to broker server1:9092 (id: 185 rack: null)
2020-04-15 11:48:46,463 TRACE org.apache.kafka.clients.NetworkClient - Sending {replica_id=-1,topics=[{topic=TopicA,partitions=[{partition=0,timestamp=1586836800000}]}]} to node 184.
2020-04-15 11:48:46,466 TRACE org.apache.kafka.clients.NetworkClient - Completed receive from node 185, for key 2, received {responses=[{topic=TopicA,partition_responses=[{partition=4,error_code=0,timestamp=1586852770000,offset=4}]}]}
2020-04-15 11:48:46,467 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Received ListOffsetResponse {responses=[{topic=TopicA,partition_responses=[{partition=4,error_code=0,timestamp=1586852770000,offset=4}]}]} from broker server1:9092 (id: 185 rack: null)
2020-04-15 11:48:46,467 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Handling ListOffsetResponse response for TopicA-4. Fetched offset 4, timestamp 1586852770000
2020-04-15 11:48:46,448 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Sending ListOffsetRequest (type=ListOffsetRequest, replicaId=-1, partitionTimestamps={TopicA-0=1586836800000}, minVersion=1) to broker server2:9092 (id: 184 rack: null)
2020-04-15 11:48:46,463 TRACE org.apache.kafka.clients.NetworkClient - Sending {replica_id=-1,topics=[{topic=TopicA,partitions=[{partition=0,timestamp=1586836800000}]}]} to node 184.
2020-04-15 11:48:46,467 TRACE org.apache.kafka.clients.NetworkClient - Completed receive from node 184, for key 2, received {responses=[{topic=TopicA,partition_responses=[{partition=0,error_code=0,timestamp=1586863210000,offset=47}]}]}
2020-04-15 11:48:46,467 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Received ListOffsetResponse {responses=[{topic=TopicA,partition_responses=[{partition=0,error_code=0,timestamp=1586863210000,offset=47}]}]} from broker server2:9092 (id: 184 rack: null)
2020-04-15 11:48:46,467 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Handling ListOffsetResponse response for TopicA-0. Fetched offset 47, timestamp 1586863210000
2020-04-15 11:48:46,448 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Sending ListOffsetRequest (type=ListOffsetRequest, replicaId=-1, partitionTimestamps={TopicA-2=1586836800000}, minVersion=1) to broker server3:9092 (id: 183 rack: null)
2020-04-15 11:48:46,465 TRACE org.apache.kafka.clients.NetworkClient - Sending {replica_id=-1,topics=[{topic=TopicA,partitions=[{partition=2,timestamp=1586836800000}]}]} to node 183.
2020-04-15 11:48:46,468 TRACE org.apache.kafka.clients.NetworkClient - Completed receive from node 183, for key 2, received {responses=[{topic=TopicA,partition_responses=[{partition=2,error_code=0,timestamp=-1,offset=-1}]}]}
2020-04-15 11:48:46,468 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Received ListOffsetResponse {responses=[{topic=TopicA,partition_responses=[{partition=2,error_code=
0,timestamp=-1,offset=-1}]}]} from broker server3:9092 (id: 183 rack: null)
2020-04-15 11:48:46,468 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Handling ListOffsetResponse response for TopicA-2. Fetched offset -1, timestamp -1
2020-04-15 11:48:46,481 INFO org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Consumer subtask 0 will start reading the following 2 partitions from timestamp 1586836800000: [KafkaTopicPartition{topic='TopicA', partition=4}, KafkaTopicPartition{topic='TopicA', partition=0}]
从日志来看,除了partition-0和partition-4,其他4个partition的offset都是-1。为什么 return 偏移量是 -1 而不是最新的偏移量?
在Kafka客户端的代码中(Fetcher.java,line: 674-680)
// Handle v1 and later response
log.debug("Handling ListOffsetResponse response for {}. Fetched offset {}, timestamp {}",topicPartition, partitionData.offset, partitionData.timestamp);
if (partitionData.offset != ListOffsetResponse.UNKNOWN_OFFSET) {
OffsetData offsetData = new OffsetData(partitionData.offset, partitionData.timestamp);
timestampOffsetMap.put(topicPartition, offsetData);
}
ListOffsetResponse.UNKNOWN_OFFSET 的值为 -1 。所以过滤掉其他4个partition,kafka consumer不会消费其他4个partition的数据。
我的Flink版本是1.9.2,flink kafka connertor是
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.11</artifactId>
<version>1.9.2</version>
</dependency>
flink kafka connector文档如下:
setStartFromTimestamp(long): Start from the specified timestamp. For
each partition, the record whose timestamp is larger than or equal to
the specified timestamp will be used as the start position. If a
partition’s latest record is earlier than the timestamp, the partition
will simply be read from the latest record.
测试程序代码:
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.junit.Test
class TestFlinkKafka {
@Test
def testFlinkKafkaDemo: Unit ={
//1. set up the streaming execution environment.
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic( TimeCharacteristic.ProcessingTime)
// To use fault tolerant Kafka Consumers, checkpointing needs to be enabled at the execution environment
env.enableCheckpointing(60000)
//2. kafka source
val topic = "message"
val schema = new SimpleStringSchema()
//server1:9092,server2:9092,server3:9092
val props = getKafkaConsumerProperties("localhost:9092","flink-streaming-client", "latest")
val consumer = new FlinkKafkaConsumer010(topic, schema, props)
//consume data from special timestamp's offset
//2020/4/14 20:0:0
//consumer.setStartFromTimestamp(1586865600000L)
//2020/4/15 20:0:0
consumer.setStartFromTimestamp(1586952000000L)
consumer.setCommitOffsetsOnCheckpoints(true)
//3. transform
val stream = env.addSource(consumer)
.map(x => x)
//4. sink
stream.print()
//5. execute
env.execute("testFlinkKafkaConsumer")
}
def getKafkaConsumerProperties(brokerList:String, groupId:String, offsetReset:String): Properties ={
val props = new Properties()
props.setProperty("bootstrap.servers", brokerList)
props.setProperty("group.id", groupId)
props.setProperty("auto.offset.reset", offsetReset)
props.setProperty("flink.partition-discovery.interval-millis", "30000")
props
}
}
为kafka设置日志级别:
log4j.logger.org.apache.kafka=TRACE
创建kafka主题:
kafka-topics --zookeeper localhost:2181/kafka --create --topic message --partitions 6 --replication-factor 1
向kafka主题发送消息
kafka-console-producer --broker-list localhost:9092 --topic message
{"name":"tom"}
{"name":"michael"}
此问题已通过将 Flink/Kafka 连接器升级到更新的通用连接器 -- FlinkKafkaConsumer
-- 可从 flink-connector-kafka_2.11
获得解决。从 1.0.0 开始,建议将此版本的连接器用于 Kafka 的所有版本。对于 Kafka 0.10.x 或 0.11.x,最好使用版本特定的 flink-connector-kafka-0.10_2.11
或 flink-connector-kafka-0.11_2.11
连接器。 (在所有情况下,如果您使用的是 Scala 2.12,请将 2.12 替换为 2.11。)
有关 Flink 的 Kafka 连接器的更多信息,请参阅 Flink documentation。
我正在使用 flink streaming 和 flink-connector-kafka 来处理来自 kafka 的数据。当我用 setStartFromTimestamp(1586852770000L) 配置 FlinkKafkaConsumer010 时,此时,kafka 主题 A 中所有数据的时间都在 1586852770000L 之前,然后我向主题 A 的分区 0 和分区 4 发送一些消息(主题 A 有 6 个分区,当前系统时间已经在 1586852770000L 之后)。但是我的 flink 程序不使用主题 A 的任何数据。这是一个问题吗?
如果我停止我的 flink 程序并重新启动它,它可以使用主题 A 的分区 0 和分区 4 的数据,但如果我将数据发送到另一个分区,它仍然不会使用其他 4 个分区的任何数据4 个分区,除非我再次重新启动我的 flink 程序。
kafka的日志如下:
2020-04-15 11:48:46,447 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Sending ListOffsetRequest (type=ListOffsetRequest, replicaId=-1, partitionTimestamps={TopicA-4=1586836800000}, minVersion=1) to broker server1:9092 (id: 185 rack: null)
2020-04-15 11:48:46,463 TRACE org.apache.kafka.clients.NetworkClient - Sending {replica_id=-1,topics=[{topic=TopicA,partitions=[{partition=0,timestamp=1586836800000}]}]} to node 184.
2020-04-15 11:48:46,466 TRACE org.apache.kafka.clients.NetworkClient - Completed receive from node 185, for key 2, received {responses=[{topic=TopicA,partition_responses=[{partition=4,error_code=0,timestamp=1586852770000,offset=4}]}]}
2020-04-15 11:48:46,467 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Received ListOffsetResponse {responses=[{topic=TopicA,partition_responses=[{partition=4,error_code=0,timestamp=1586852770000,offset=4}]}]} from broker server1:9092 (id: 185 rack: null)
2020-04-15 11:48:46,467 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Handling ListOffsetResponse response for TopicA-4. Fetched offset 4, timestamp 1586852770000
2020-04-15 11:48:46,448 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Sending ListOffsetRequest (type=ListOffsetRequest, replicaId=-1, partitionTimestamps={TopicA-0=1586836800000}, minVersion=1) to broker server2:9092 (id: 184 rack: null)
2020-04-15 11:48:46,463 TRACE org.apache.kafka.clients.NetworkClient - Sending {replica_id=-1,topics=[{topic=TopicA,partitions=[{partition=0,timestamp=1586836800000}]}]} to node 184.
2020-04-15 11:48:46,467 TRACE org.apache.kafka.clients.NetworkClient - Completed receive from node 184, for key 2, received {responses=[{topic=TopicA,partition_responses=[{partition=0,error_code=0,timestamp=1586863210000,offset=47}]}]}
2020-04-15 11:48:46,467 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Received ListOffsetResponse {responses=[{topic=TopicA,partition_responses=[{partition=0,error_code=0,timestamp=1586863210000,offset=47}]}]} from broker server2:9092 (id: 184 rack: null)
2020-04-15 11:48:46,467 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Handling ListOffsetResponse response for TopicA-0. Fetched offset 47, timestamp 1586863210000
2020-04-15 11:48:46,448 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Sending ListOffsetRequest (type=ListOffsetRequest, replicaId=-1, partitionTimestamps={TopicA-2=1586836800000}, minVersion=1) to broker server3:9092 (id: 183 rack: null)
2020-04-15 11:48:46,465 TRACE org.apache.kafka.clients.NetworkClient - Sending {replica_id=-1,topics=[{topic=TopicA,partitions=[{partition=2,timestamp=1586836800000}]}]} to node 183.
2020-04-15 11:48:46,468 TRACE org.apache.kafka.clients.NetworkClient - Completed receive from node 183, for key 2, received {responses=[{topic=TopicA,partition_responses=[{partition=2,error_code=0,timestamp=-1,offset=-1}]}]}
2020-04-15 11:48:46,468 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Received ListOffsetResponse {responses=[{topic=TopicA,partition_responses=[{partition=2,error_code=
0,timestamp=-1,offset=-1}]}]} from broker server3:9092 (id: 183 rack: null)
2020-04-15 11:48:46,468 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Handling ListOffsetResponse response for TopicA-2. Fetched offset -1, timestamp -1
2020-04-15 11:48:46,481 INFO org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Consumer subtask 0 will start reading the following 2 partitions from timestamp 1586836800000: [KafkaTopicPartition{topic='TopicA', partition=4}, KafkaTopicPartition{topic='TopicA', partition=0}]
从日志来看,除了partition-0和partition-4,其他4个partition的offset都是-1。为什么 return 偏移量是 -1 而不是最新的偏移量?
在Kafka客户端的代码中(Fetcher.java,line: 674-680)
// Handle v1 and later response
log.debug("Handling ListOffsetResponse response for {}. Fetched offset {}, timestamp {}",topicPartition, partitionData.offset, partitionData.timestamp);
if (partitionData.offset != ListOffsetResponse.UNKNOWN_OFFSET) {
OffsetData offsetData = new OffsetData(partitionData.offset, partitionData.timestamp);
timestampOffsetMap.put(topicPartition, offsetData);
}
ListOffsetResponse.UNKNOWN_OFFSET 的值为 -1 。所以过滤掉其他4个partition,kafka consumer不会消费其他4个partition的数据。
我的Flink版本是1.9.2,flink kafka connertor是
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.11</artifactId>
<version>1.9.2</version>
</dependency>
flink kafka connector文档如下:
setStartFromTimestamp(long): Start from the specified timestamp. For each partition, the record whose timestamp is larger than or equal to the specified timestamp will be used as the start position. If a partition’s latest record is earlier than the timestamp, the partition will simply be read from the latest record.
测试程序代码:
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.junit.Test
class TestFlinkKafka {
@Test
def testFlinkKafkaDemo: Unit ={
//1. set up the streaming execution environment.
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic( TimeCharacteristic.ProcessingTime)
// To use fault tolerant Kafka Consumers, checkpointing needs to be enabled at the execution environment
env.enableCheckpointing(60000)
//2. kafka source
val topic = "message"
val schema = new SimpleStringSchema()
//server1:9092,server2:9092,server3:9092
val props = getKafkaConsumerProperties("localhost:9092","flink-streaming-client", "latest")
val consumer = new FlinkKafkaConsumer010(topic, schema, props)
//consume data from special timestamp's offset
//2020/4/14 20:0:0
//consumer.setStartFromTimestamp(1586865600000L)
//2020/4/15 20:0:0
consumer.setStartFromTimestamp(1586952000000L)
consumer.setCommitOffsetsOnCheckpoints(true)
//3. transform
val stream = env.addSource(consumer)
.map(x => x)
//4. sink
stream.print()
//5. execute
env.execute("testFlinkKafkaConsumer")
}
def getKafkaConsumerProperties(brokerList:String, groupId:String, offsetReset:String): Properties ={
val props = new Properties()
props.setProperty("bootstrap.servers", brokerList)
props.setProperty("group.id", groupId)
props.setProperty("auto.offset.reset", offsetReset)
props.setProperty("flink.partition-discovery.interval-millis", "30000")
props
}
}
为kafka设置日志级别:
log4j.logger.org.apache.kafka=TRACE
创建kafka主题:
kafka-topics --zookeeper localhost:2181/kafka --create --topic message --partitions 6 --replication-factor 1
向kafka主题发送消息
kafka-console-producer --broker-list localhost:9092 --topic message
{"name":"tom"}
{"name":"michael"}
此问题已通过将 Flink/Kafka 连接器升级到更新的通用连接器 -- FlinkKafkaConsumer
-- 可从 flink-connector-kafka_2.11
获得解决。从 1.0.0 开始,建议将此版本的连接器用于 Kafka 的所有版本。对于 Kafka 0.10.x 或 0.11.x,最好使用版本特定的 flink-connector-kafka-0.10_2.11
或 flink-connector-kafka-0.11_2.11
连接器。 (在所有情况下,如果您使用的是 Scala 2.12,请将 2.12 替换为 2.11。)
有关 Flink 的 Kafka 连接器的更多信息,请参阅 Flink documentation。