Spark structured streaming read from kafka json 编码问题
Spark structured streaming read from kafka json encoding issue
我很难使用 Spark Structured Streaming 读取 kafka 主题中的 JSON 数据。
上下文:
我正在构建一个简单的管道,我使用 kafka 从 MongoDb(这个数据库经常从另一个应用程序填充)读取数据,然后我想在 Spark 中获取这些数据。
为此,我正在使用 Spark Structured Streaming,它似乎有效。
这是我的代码:
import org.apache.spark.rdd
import org.apache.spark.sql.avro._
import org.apache.spark.sql.{Column, SparkSession}
import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.types.{ArrayType, DataTypes, StructType}
import org.apache.spark.sql.functions.explode
import org.apache.spark.sql.functions.schema_of_json
object KafkaToParquetLbcAutomation extends App {
val spark = SparkSession
.builder
.appName("Kafka-Parquet-Writer")
.master("local")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
import spark.implicits._
val kafkaRawDf = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers",BROKER IP)
.option("subscribe", "test")
.option("startingOffsets", "earliest")
.load()
val testJsonDf = kafkaRawDf.selectExpr("CAST(value AS STRING)")
//affichage des data
val query = testJsonDf
.writeStream
.outputMode("append")
.format("console")
.queryName("test")
.start()
.awaitTermination()
}
看完那些JSON的数据我想做一些改造
这里开始问题,由于我无法解码的奇怪编码,我无法解析 JSON 数据。
因此我无法继续我的管道。
我应该如何获取我的数据:
{
"field 1" : "value 1 ",
}
(还有很多其他字段)
我实际上是如何获取数据的:
VoituresXhttps://URL.fr/voitures/87478648654.htm�https://img5.url.fr/ad-image/49b7c279087d0cce09123a66557b71d09c01a6d2.jpg�https://img7.url.fr/ad-image/eab7e65419c17542840204fa529b02e64771adbb.jpg�https://img7.urln.fr/ad-image/701b547690e48f11a6e0a1a9e72811cc76fe803e.jpg
问题可能出在分隔符或类似的地方。
你能帮帮我吗
谢谢
问题解决,
kafka 连接器代码中的错误配置。
我只需要将此字段添加到连接器即可:
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable":"false",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable":"false",
与 Spark 无关
我很难使用 Spark Structured Streaming 读取 kafka 主题中的 JSON 数据。
上下文:
我正在构建一个简单的管道,我使用 kafka 从 MongoDb(这个数据库经常从另一个应用程序填充)读取数据,然后我想在 Spark 中获取这些数据。
为此,我正在使用 Spark Structured Streaming,它似乎有效。
这是我的代码:
import org.apache.spark.rdd
import org.apache.spark.sql.avro._
import org.apache.spark.sql.{Column, SparkSession}
import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.types.{ArrayType, DataTypes, StructType}
import org.apache.spark.sql.functions.explode
import org.apache.spark.sql.functions.schema_of_json
object KafkaToParquetLbcAutomation extends App {
val spark = SparkSession
.builder
.appName("Kafka-Parquet-Writer")
.master("local")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
import spark.implicits._
val kafkaRawDf = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers",BROKER IP)
.option("subscribe", "test")
.option("startingOffsets", "earliest")
.load()
val testJsonDf = kafkaRawDf.selectExpr("CAST(value AS STRING)")
//affichage des data
val query = testJsonDf
.writeStream
.outputMode("append")
.format("console")
.queryName("test")
.start()
.awaitTermination()
}
看完那些JSON的数据我想做一些改造
这里开始问题,由于我无法解码的奇怪编码,我无法解析 JSON 数据。
因此我无法继续我的管道。
我应该如何获取我的数据:
{
"field 1" : "value 1 ",
}
(还有很多其他字段)
我实际上是如何获取数据的:
VoituresXhttps://URL.fr/voitures/87478648654.htm�https://img5.url.fr/ad-image/49b7c279087d0cce09123a66557b71d09c01a6d2.jpg�https://img7.url.fr/ad-image/eab7e65419c17542840204fa529b02e64771adbb.jpg�https://img7.urln.fr/ad-image/701b547690e48f11a6e0a1a9e72811cc76fe803e.jpg
问题可能出在分隔符或类似的地方。
你能帮帮我吗
谢谢
问题解决,
kafka 连接器代码中的错误配置。
我只需要将此字段添加到连接器即可:
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable":"false",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable":"false",
与 Spark 无关