使用 Spark 从 Kafka 主题中的特定分区流式传输数据
Stream data using Spark from a partiticular partition within Kafka topics
我已经看到与
类似的问题
但我仍然想知道是否无法从特定分区流式传输数据?我在Spark Streaming订阅方法中使用了Kafka Consumer Strategies。
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams,
offsets)
这是我尝试订阅主题和分区的代码片段,
val topics = Array("cdc-classic")
val topic="cdc-classic"
val partition=2;
val offsets=
Map(new TopicPartition(topic, partition) -> 2L)//I am not clear with this line, (I tried to set topic and partition number as 2)
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams,offsets))
但是当我 运行 这段代码时,我得到以下异常,
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 0.0 failed 1 times, most recent failure: Lost task 5.0 in stage 0.0 (TID 5, localhost, executor driver): org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {cdc-classic-2=2}
at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:878)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:525)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1110)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:70)
Caused by: org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {cdc-classic-2=2}
at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:878)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:525)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1110)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
P.S: cdc-classic 是17分区的主题名
Kafka的分区是Spark的并行化单元。因此,即使从技术上讲它在某种程度上是可能的,但它没有意义,因为所有数据都将由单个执行程序处理。您可以简单地以 KafkaConsumer
:
启动进程,而不是使用 Spark
String topic = "foo";
TopicPartition partition0 = new TopicPartition(topic, 0);
TopicPartition partition1 = new TopicPartition(topic, 1);
consumer.assign(Arrays.asList(partition0, partition1));
(https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html)
如果您想利用 Spark 自动重试,您可以简单地创建一个 Docker 图像,然后使用具有适当重试配置的 Kubernetes 启动它。
关于Spark,如果你真的要用它,你应该检查一下你读取的分区的偏移量是多少。可能您提供了一个不正确的消息,它 returns 您 "out of range" 偏移消息(可能以 0 开头?)。
在此行指定分区号和分区的起始偏移量以流式传输数据,
Map(new TopicPartition(topic, partition) -> 2L)
其中,
partition为分区号
2L指分区的起始偏移量
然后我们可以从选定的分区流式传输数据。
我已经看到与
但我仍然想知道是否无法从特定分区流式传输数据?我在Spark Streaming订阅方法中使用了Kafka Consumer Strategies。
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offsets)
这是我尝试订阅主题和分区的代码片段,
val topics = Array("cdc-classic")
val topic="cdc-classic"
val partition=2;
val offsets=
Map(new TopicPartition(topic, partition) -> 2L)//I am not clear with this line, (I tried to set topic and partition number as 2)
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams,offsets))
但是当我 运行 这段代码时,我得到以下异常,
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 0.0 failed 1 times, most recent failure: Lost task 5.0 in stage 0.0 (TID 5, localhost, executor driver): org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {cdc-classic-2=2}
at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:878)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:525)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1110)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:70)
Caused by: org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {cdc-classic-2=2}
at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:878)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:525)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1110)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
P.S: cdc-classic 是17分区的主题名
Kafka的分区是Spark的并行化单元。因此,即使从技术上讲它在某种程度上是可能的,但它没有意义,因为所有数据都将由单个执行程序处理。您可以简单地以 KafkaConsumer
:
String topic = "foo";
TopicPartition partition0 = new TopicPartition(topic, 0);
TopicPartition partition1 = new TopicPartition(topic, 1);
consumer.assign(Arrays.asList(partition0, partition1));
(https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html)
如果您想利用 Spark 自动重试,您可以简单地创建一个 Docker 图像,然后使用具有适当重试配置的 Kubernetes 启动它。
关于Spark,如果你真的要用它,你应该检查一下你读取的分区的偏移量是多少。可能您提供了一个不正确的消息,它 returns 您 "out of range" 偏移消息(可能以 0 开头?)。
在此行指定分区号和分区的起始偏移量以流式传输数据,
Map(new TopicPartition(topic, partition) -> 2L)
其中,
partition为分区号
2L指分区的起始偏移量
然后我们可以从选定的分区流式传输数据。