如何加载所有已经从 Kafka 发布的记录?

How to load all records that were already published from Kafka?

我有一个 pyspark 结构流 python 应用程序设置如下

from pyspark.sql import SparkSession

spark = SparkSession\
    .builder\
    .appName("data streaming app")\
    .getOrCreate()


data_raw = spark.readStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers", "kafkahost:9092")\
    .option("subscribe", "my_topic")\
    .load()

query = data_raw.writeStream\
    .outputMode("append")\
    .format("console")\
    .option("truncate", "false")\
    .trigger(processingTime="5 seconds")\
    .start()\
    .awaitTermination()

所有显示的都是这个

+---+-----+-----+---------+------+---------+-------------+
|key|value|topic|partition|offset|timestamp|timestampType|
+---+-----+-----+---------+------+---------+-------------+
+---+-----+-----+---------+------+---------+-------------+

19/03/04 22:00:50 INFO streaming.StreamExecution: Streaming query made progress: {
  "id" : "ab24bd30-6e2d-4c2a-92a2-ddad66906a5b",
  "runId" : "29592d76-892c-4b29-bcda-f4ef02aa1390",
  "name" : null,
  "timestamp" : "2019-03-04T22:00:49.389Z",
  "numInputRows" : 0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "addBatch" : 852,
    "getBatch" : 180,
    "getOffset" : 135,
    "queryPlanning" : 107,
    "triggerExecution" : 1321,
    "walCommit" : 27
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[my_topic]]",
    "startOffset" : null,
    "endOffset" : {
      "my_topic" : {
        "0" : 303
      }
    },
    "numInputRows" : 0,
    "processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@74fad4a5"
  }
}

如您所见,my_topic 那里有 303 条消息,但我无法显示。其他信息包括我正在使用 confluent Kafka JDBC 连接器查询 oracle 数据库并将行存储到 kafka 主题中。我有一个 avro 架构注册表设置。如果需要,我也会分享这些 属性 个文件。

有谁知道发生了什么事吗?

作为流式应用程序,此 Spark Structure 流式处理仅在消息发布后立即读取消息。为了测试目的,我想做的是阅读主题中的所有内容。为此,您只需在 readStream 中添加一个额外选项,即 option("startingOffsets", "earliest").

data_raw = spark.readStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers", "kafkahost:9092")\
    .option("subscribe", "my_topic")\
    .option("startingOffsets", "earliest")
    .load()