相当于 KafkaUtils.createRDD 的简单 Spark 结构化流,即通过指定偏移量将 kafka 主题读取到 RDD?

Simple Spark Structured Streaming equivalent of KafkaUtils.createRDD, i.e. read kafka topic to RDD by specifying offsets?

如何通过指定开始和结束偏移量将kafka主题中的数据读取到RDD?

KafkaUtils.createRDD 是实验性的 API 相当不愉快(它 returns 一个大臃肿 Java ConsumerRecord class 甚至不可序列化并将其放在 KafkaRDD 中,它会覆盖许多方法(如 persist)以仅抛出异常。

我想要的是这样一个直截了当的API:

case class Message(key: String, 
                   value: String, 
                   offset: Long, 
                   timestamp: Long)

def readKafka(topic: String, offsetsByPartition: Map[Int, (Long, Long)])
             (config: KafkaConfig, sc: SparkContext): RDD[Message]

或类似的东西 key: Array[Byte]value: Array[Byte]

要从带有偏移量的 kafka 中读取,代码看起来像参考的那样 here

val df = 
  spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1,topic2")
  .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
  .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")
  .load()

以上将读取偏移量内可用的数据,然后您可以将列转换为字符串,并转换为您的对象Message

val messageRDD: RDD[Message] = 
  df.select(
    col("key").cast("string"), 
    col("value").cast("string"), 
    col("offset").cast("long"),
    col("timestamp").cast("long")
  ).as[Message].rdd