如何识别以kafka为源的spark结构化流中消息的来源?
How to identify the origin of messages in spark structured streaming with kafka as a source?
我有一个用例,我必须在 spark structured streaming 中订阅 kafka 中的多个主题。然后我必须解析每条消息并从中形成一个三角湖 table。我已经使解析器和消息(以 xml 的形式)正确解析并形成 delta-lake table。但是,到目前为止,我只订阅了一个主题。我想订阅多个主题,并且基于主题,它应该转到专门为该特定主题制作的解析器。所以基本上我想在所有消息处理时识别主题名称,以便我可以将它们发送到所需的解析器并进一步处理。
这就是我访问来自不同主题的消息的方式。但是,我不知道如何在处理传入消息时识别传入消息的来源。
val stream_dataframe = spark.readStream
.format(ConfigSetting.getString("source"))
.option("kafka.bootstrap.servers", ConfigSetting.getString("bootstrap_servers"))
.option("kafka.ssl.truststore.location", ConfigSetting.getString("trustfile_location"))
.option("kafka.ssl.truststore.password", ConfigSetting.getString("truststore_password"))
.option("kafka.sasl.mechanism", ConfigSetting.getString("sasl_mechanism"))
.option("kafka.security.protocol", ConfigSetting.getString("kafka_security_protocol"))
.option("kafka.sasl.jaas.config",ConfigSetting.getString("jass_config"))
.option("encoding",ConfigSetting.getString("encoding"))
.option("startingOffsets",ConfigSetting.getString("starting_offset_duration"))
.option("subscribe",ConfigSetting.getString("topics_name"))
.option("failOnDataLoss",ConfigSetting.getString("fail_on_dataloss"))
.load()
var cast_dataframe = stream_dataframe.select(col("value").cast(StringType))
cast_dataframe = cast_dataframe.withColumn("parsed_column",parser(col("value"))) // Parser is the udf, made to parse the xml from the topic.
如何在 spark 结构化流处理中识别消息的主题名称?
根据 official documentation(强调我的)
Each row in the source has the following schema:
Column Type
key binary
value binary
topic string
partition int
...
如您所见,输入主题是输出模式的一部分,无需任何特殊操作即可访问。
我有一个用例,我必须在 spark structured streaming 中订阅 kafka 中的多个主题。然后我必须解析每条消息并从中形成一个三角湖 table。我已经使解析器和消息(以 xml 的形式)正确解析并形成 delta-lake table。但是,到目前为止,我只订阅了一个主题。我想订阅多个主题,并且基于主题,它应该转到专门为该特定主题制作的解析器。所以基本上我想在所有消息处理时识别主题名称,以便我可以将它们发送到所需的解析器并进一步处理。
这就是我访问来自不同主题的消息的方式。但是,我不知道如何在处理传入消息时识别传入消息的来源。
val stream_dataframe = spark.readStream
.format(ConfigSetting.getString("source"))
.option("kafka.bootstrap.servers", ConfigSetting.getString("bootstrap_servers"))
.option("kafka.ssl.truststore.location", ConfigSetting.getString("trustfile_location"))
.option("kafka.ssl.truststore.password", ConfigSetting.getString("truststore_password"))
.option("kafka.sasl.mechanism", ConfigSetting.getString("sasl_mechanism"))
.option("kafka.security.protocol", ConfigSetting.getString("kafka_security_protocol"))
.option("kafka.sasl.jaas.config",ConfigSetting.getString("jass_config"))
.option("encoding",ConfigSetting.getString("encoding"))
.option("startingOffsets",ConfigSetting.getString("starting_offset_duration"))
.option("subscribe",ConfigSetting.getString("topics_name"))
.option("failOnDataLoss",ConfigSetting.getString("fail_on_dataloss"))
.load()
var cast_dataframe = stream_dataframe.select(col("value").cast(StringType))
cast_dataframe = cast_dataframe.withColumn("parsed_column",parser(col("value"))) // Parser is the udf, made to parse the xml from the topic.
如何在 spark 结构化流处理中识别消息的主题名称?
根据 official documentation(强调我的)
Each row in the source has the following schema:
Column Type
key binary
value binary
topic string
partition int...
如您所见,输入主题是输出模式的一部分,无需任何特殊操作即可访问。