Spark Structured Streaming foreach Sink 自定义编写器无法从 Kafka 主题读取数据
Spark Structured Streaming foreach Sink custom writer is not able to read data from Kafka topic
我有 spark 结构化流式作业,可以从 kafka 主题中读取它。但是,在订阅主题时,作业不是使用 foreach 编写器将数据写入控制台或将其转储到数据库。
我有 class DBWriter extends ForeachWriter<Row>
仍然从未调用此 class 的 open, process, close
方法。
如果您需要更多信息,请告诉我。
已按照 Spark Kafka integration guide 的说明进行操作。仍然无法正常工作。
Spark 版本 2.3.1
卡夫卡 0.10.0
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>2.3.1</version>
</dependency>
我的代码是:
spark.readStream().format("kafka").option.option("kafka.bootstrap.servers", "YOUR.HOST:PORT1,YOUR.HOST:PORT2")
.option("subscribe", "TOPIC1")
.option("startingOffsets", "latest") // read data from the end of the stream
.load()
和
Dataset<Row> selectDf = dataframe.select(dataframe.col("key")
.cast("string"),org.apache.spark.sql.functions.from_json(dataframe.col("value")
.cast("string"), schema).alias("data"));
selectDf.writeStream()
.trigger(Trigger.ProcessingTime(1000))
.foreach(new DBWriterSink())
.option("checkpointLocation","/tmp/chp_path/")
输入数据的格式如下:
数据采用 json 格式:
{"input_source_data":
{ "key1":"value1",
"key2": "value2"
}
}
实际问题是kafka配置设置不正确。
主题订阅不成功,握手失败。正确更正kafka属性后。
能够读取数据,它正在额外设置这些属性。删除它后,它开始工作。
能够读取消息并查看正在调用的 ForEachWriter。
properties.put("security.protocol", "SSL");
我有 spark 结构化流式作业,可以从 kafka 主题中读取它。但是,在订阅主题时,作业不是使用 foreach 编写器将数据写入控制台或将其转储到数据库。
我有 class DBWriter extends ForeachWriter<Row>
仍然从未调用此 class 的 open, process, close
方法。
如果您需要更多信息,请告诉我。
已按照 Spark Kafka integration guide 的说明进行操作。仍然无法正常工作。
Spark 版本 2.3.1 卡夫卡 0.10.0
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>2.3.1</version>
</dependency>
我的代码是:
spark.readStream().format("kafka").option.option("kafka.bootstrap.servers", "YOUR.HOST:PORT1,YOUR.HOST:PORT2")
.option("subscribe", "TOPIC1")
.option("startingOffsets", "latest") // read data from the end of the stream
.load()
和
Dataset<Row> selectDf = dataframe.select(dataframe.col("key")
.cast("string"),org.apache.spark.sql.functions.from_json(dataframe.col("value")
.cast("string"), schema).alias("data"));
selectDf.writeStream()
.trigger(Trigger.ProcessingTime(1000))
.foreach(new DBWriterSink())
.option("checkpointLocation","/tmp/chp_path/")
输入数据的格式如下:
数据采用 json 格式:
{"input_source_data":
{ "key1":"value1",
"key2": "value2"
}
}
实际问题是kafka配置设置不正确。 主题订阅不成功,握手失败。正确更正kafka属性后。 能够读取数据,它正在额外设置这些属性。删除它后,它开始工作。 能够读取消息并查看正在调用的 ForEachWriter。
properties.put("security.protocol", "SSL");