相当于 KafkaUtils.createRDD 的简单 Spark 结构化流,即通过指定偏移量将 kafka 主题读取到 RDD?
Simple Spark Structured Streaming equivalent of KafkaUtils.createRDD, i.e. read kafka topic to RDD by specifying offsets?
如何通过指定开始和结束偏移量将kafka主题中的数据读取到RDD?
KafkaUtils.createRDD
是 是实验性的 API 相当不愉快(它 returns 一个大臃肿 Java ConsumerRecord
class 甚至不可序列化并将其放在 KafkaRDD
中,它会覆盖许多方法(如 persist)以仅抛出异常。
我想要的是这样一个直截了当的API:
case class Message(key: String,
value: String,
offset: Long,
timestamp: Long)
def readKafka(topic: String, offsetsByPartition: Map[Int, (Long, Long)])
(config: KafkaConfig, sc: SparkContext): RDD[Message]
或类似的东西 key: Array[Byte]
和 value: Array[Byte]
要从带有偏移量的 kafka 中读取,代码看起来像参考的那样 here
val df =
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
.option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")
.load()
以上将读取偏移量内可用的数据,然后您可以将列转换为字符串,并转换为您的对象Message
。
val messageRDD: RDD[Message] =
df.select(
col("key").cast("string"),
col("value").cast("string"),
col("offset").cast("long"),
col("timestamp").cast("long")
).as[Message].rdd
如何通过指定开始和结束偏移量将kafka主题中的数据读取到RDD?
KafkaUtils.createRDD
是 是实验性的 API 相当不愉快(它 returns 一个大臃肿 Java ConsumerRecord
class 甚至不可序列化并将其放在 KafkaRDD
中,它会覆盖许多方法(如 persist)以仅抛出异常。
我想要的是这样一个直截了当的API:
case class Message(key: String,
value: String,
offset: Long,
timestamp: Long)
def readKafka(topic: String, offsetsByPartition: Map[Int, (Long, Long)])
(config: KafkaConfig, sc: SparkContext): RDD[Message]
或类似的东西 key: Array[Byte]
和 value: Array[Byte]
要从带有偏移量的 kafka 中读取,代码看起来像参考的那样 here
val df =
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
.option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")
.load()
以上将读取偏移量内可用的数据,然后您可以将列转换为字符串,并转换为您的对象Message
。
val messageRDD: RDD[Message] =
df.select(
col("key").cast("string"),
col("value").cast("string"),
col("offset").cast("long"),
col("timestamp").cast("long")
).as[Message].rdd