从 Kafka 消费数据时,如何跳过 multi-line 中的 header 记录?
How to skip header in multi-line records when consuming data from Kafka?
我想知道如何在 spark 从 kafka 消费数据时跳过 headers。
我的 kafka 主题在特定时间包含如下消息:
name,age,year
ton,33,2018
fon,34,2019
每当我使用来自 kafka 的数据时,我想跳过 header 部分。
我的 spark 结构化流媒体消费者如下;
val kafkaDatademostr = spark.readStream.format("kafka").option("kafka.bootstrap.servers","fffff.dl.ggg.com:8023").option("subscribe","dfo").option("kafka.security.protocol","SASL_PLAINTEXT").load
val interval=kafkaDatademostr.select(col("value").cast("string"),col("timestamp")).alias("csv").select("csv.*")
有人可以帮助我如何在使用来自 kafka 的数据时跳过 header 部分吗?我是 Spark 结构化流媒体的新手
value
列是 Kafka 记录的值,在您的情况下,它看起来只是一个 multi-line 字符串(带有换行符和第一行,您称之为 header). Spark 不知道,所以你必须在从 Kafka 中提取记录后自己解析记录。
顺便说一句,它类似于 Spark SQL 中的 Kafka 数据源。将 readStream
替换为 read
并查看您自己。
我想知道如何在 spark 从 kafka 消费数据时跳过 headers。
我的 kafka 主题在特定时间包含如下消息:
name,age,year
ton,33,2018
fon,34,2019
每当我使用来自 kafka 的数据时,我想跳过 header 部分。
我的 spark 结构化流媒体消费者如下;
val kafkaDatademostr = spark.readStream.format("kafka").option("kafka.bootstrap.servers","fffff.dl.ggg.com:8023").option("subscribe","dfo").option("kafka.security.protocol","SASL_PLAINTEXT").load
val interval=kafkaDatademostr.select(col("value").cast("string"),col("timestamp")).alias("csv").select("csv.*")
有人可以帮助我如何在使用来自 kafka 的数据时跳过 header 部分吗?我是 Spark 结构化流媒体的新手
value
列是 Kafka 记录的值,在您的情况下,它看起来只是一个 multi-line 字符串(带有换行符和第一行,您称之为 header). Spark 不知道,所以你必须在从 Kafka 中提取记录后自己解析记录。
顺便说一句,它类似于 Spark SQL 中的 Kafka 数据源。将 readStream
替换为 read
并查看您自己。