将 Spark 结构化流输出写入 Kafka 主题
Writing Spark Structured Streaming Output to a Kafka Topic
我有一个简单的结构化流应用程序,它只从一个 Kafka 主题读取数据并写入另一个主题。
SparkConf conf = new SparkConf()
.setMaster("local[*]")
.setAppName("test");
SparkSession spark = SparkSession
.builder()
.config(conf)
.getOrCreate();
Dataset<Row> dataset = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "start")
.load();
StreamingQuery query = dataset
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("checkpointLocation", "checkpoint")
.option("topic", "end")
.start();
query.awaitTermination(20000);
有两条关于主题 start
的消息需要处理。此代码无一例外地运行,但是没有消息以主题 end
结束。这个例子有什么问题?
问题是消息已经在流中并且起始偏移量未设置为 "earliest"。
Dataset<Row> dataset = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", start.getTopicName())
.option("startingOffsets", "earliest")
.load();
我有一个简单的结构化流应用程序,它只从一个 Kafka 主题读取数据并写入另一个主题。
SparkConf conf = new SparkConf()
.setMaster("local[*]")
.setAppName("test");
SparkSession spark = SparkSession
.builder()
.config(conf)
.getOrCreate();
Dataset<Row> dataset = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "start")
.load();
StreamingQuery query = dataset
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("checkpointLocation", "checkpoint")
.option("topic", "end")
.start();
query.awaitTermination(20000);
有两条关于主题 start
的消息需要处理。此代码无一例外地运行,但是没有消息以主题 end
结束。这个例子有什么问题?
问题是消息已经在流中并且起始偏移量未设置为 "earliest"。
Dataset<Row> dataset = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", start.getTopicName())
.option("startingOffsets", "earliest")
.load();