如何加载所有已经从 Kafka 发布的记录?
How to load all records that were already published from Kafka?
我有一个 pyspark 结构流 python 应用程序设置如下
from pyspark.sql import SparkSession
spark = SparkSession\
.builder\
.appName("data streaming app")\
.getOrCreate()
data_raw = spark.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", "kafkahost:9092")\
.option("subscribe", "my_topic")\
.load()
query = data_raw.writeStream\
.outputMode("append")\
.format("console")\
.option("truncate", "false")\
.trigger(processingTime="5 seconds")\
.start()\
.awaitTermination()
所有显示的都是这个
+---+-----+-----+---------+------+---------+-------------+
|key|value|topic|partition|offset|timestamp|timestampType|
+---+-----+-----+---------+------+---------+-------------+
+---+-----+-----+---------+------+---------+-------------+
19/03/04 22:00:50 INFO streaming.StreamExecution: Streaming query made progress: {
"id" : "ab24bd30-6e2d-4c2a-92a2-ddad66906a5b",
"runId" : "29592d76-892c-4b29-bcda-f4ef02aa1390",
"name" : null,
"timestamp" : "2019-03-04T22:00:49.389Z",
"numInputRows" : 0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"addBatch" : 852,
"getBatch" : 180,
"getOffset" : 135,
"queryPlanning" : 107,
"triggerExecution" : 1321,
"walCommit" : 27
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KafkaSource[Subscribe[my_topic]]",
"startOffset" : null,
"endOffset" : {
"my_topic" : {
"0" : 303
}
},
"numInputRows" : 0,
"processedRowsPerSecond" : 0.0
} ],
"sink" : {
"description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@74fad4a5"
}
}
如您所见,my_topic
那里有 303 条消息,但我无法显示。其他信息包括我正在使用 confluent Kafka JDBC 连接器查询 oracle 数据库并将行存储到 kafka 主题中。我有一个 avro 架构注册表设置。如果需要,我也会分享这些 属性 个文件。
有谁知道发生了什么事吗?
作为流式应用程序,此 Spark Structure 流式处理仅在消息发布后立即读取消息。为了测试目的,我想做的是阅读主题中的所有内容。为此,您只需在 readStream
中添加一个额外选项,即 option("startingOffsets", "earliest")
.
data_raw = spark.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", "kafkahost:9092")\
.option("subscribe", "my_topic")\
.option("startingOffsets", "earliest")
.load()
我有一个 pyspark 结构流 python 应用程序设置如下
from pyspark.sql import SparkSession
spark = SparkSession\
.builder\
.appName("data streaming app")\
.getOrCreate()
data_raw = spark.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", "kafkahost:9092")\
.option("subscribe", "my_topic")\
.load()
query = data_raw.writeStream\
.outputMode("append")\
.format("console")\
.option("truncate", "false")\
.trigger(processingTime="5 seconds")\
.start()\
.awaitTermination()
所有显示的都是这个
+---+-----+-----+---------+------+---------+-------------+
|key|value|topic|partition|offset|timestamp|timestampType|
+---+-----+-----+---------+------+---------+-------------+
+---+-----+-----+---------+------+---------+-------------+
19/03/04 22:00:50 INFO streaming.StreamExecution: Streaming query made progress: {
"id" : "ab24bd30-6e2d-4c2a-92a2-ddad66906a5b",
"runId" : "29592d76-892c-4b29-bcda-f4ef02aa1390",
"name" : null,
"timestamp" : "2019-03-04T22:00:49.389Z",
"numInputRows" : 0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"addBatch" : 852,
"getBatch" : 180,
"getOffset" : 135,
"queryPlanning" : 107,
"triggerExecution" : 1321,
"walCommit" : 27
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KafkaSource[Subscribe[my_topic]]",
"startOffset" : null,
"endOffset" : {
"my_topic" : {
"0" : 303
}
},
"numInputRows" : 0,
"processedRowsPerSecond" : 0.0
} ],
"sink" : {
"description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@74fad4a5"
}
}
如您所见,my_topic
那里有 303 条消息,但我无法显示。其他信息包括我正在使用 confluent Kafka JDBC 连接器查询 oracle 数据库并将行存储到 kafka 主题中。我有一个 avro 架构注册表设置。如果需要,我也会分享这些 属性 个文件。
有谁知道发生了什么事吗?
作为流式应用程序,此 Spark Structure 流式处理仅在消息发布后立即读取消息。为了测试目的,我想做的是阅读主题中的所有内容。为此,您只需在 readStream
中添加一个额外选项,即 option("startingOffsets", "earliest")
.
data_raw = spark.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", "kafkahost:9092")\
.option("subscribe", "my_topic")\
.option("startingOffsets", "earliest")
.load()