如何从 Spark-Structured-Streaming 中的嵌套 XML_String 中提取信息
How to extract information from a nested XML_String in Spark-Structured-Streaming
我有一个连接到 ActiveMQ 的 spark 结构应用程序。应用程序从主题接收消息。这些消息采用 StringXML 的形式。我想从这个 nested-XML 中提取信息。我该怎么做?
我参考了 ,但无法在 Scala 中实现类似的东西。
XML格式:
<CofiResults>
<ExecutionTime>20201103153839</ExecutionTime>
<FilterClass>S </FilterClass>
<InputData format="something" id="someID"><ns2:FrdReq xmlns:ns2="http://someone.com">
<HeaderSegment xmlns="https://somelink.com">
<Version>6</Version>
<SequenceNb>1</SequenceNb>
</HeaderSegment>
.
.
.
我的代码:
val df = spark.readStream
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
.option("brokerUrl", brokerUrl_)
.option("topic", topicName_)
.option("persistence", "memory")
.option("cleanSession", "true")
.option("username", username_)
.option("password", password_)
.load()
val payload_ = df.select('payload cast "string") // This payload IS the XMLString
现在我需要从上面的XML.
中提取ExecutionTime
、Version
和其他字段
您可以使用 SQL built-in 函数 xpath
等从嵌套的 XML 结构中提取数据。
给定一个嵌套的 XML 喜欢(为简单起见,我省略了任何标签参数)
<CofiResults>
<ExecutionTime>20201103153839</ExecutionTime>
<FilterClass>S</FilterClass>
<InputData>
<ns2>
<HeaderSegment>
<Version>6</Version>
<SequenceNb>1</SequenceNb>
</HeaderSegment>
</ns2>
</InputData>
</CofiResults>
然后您可以在 selectExpr
语句中使用那些 SQL 函数(没有 createOrReplaceTempView
),如下所示:
.selectExpr("CAST(payload AS STRING) as payload")
.selectExpr(
"xpath(payload, '/CofiResults/ExecutionTime/text()') as ExecutionTimeAsArryString",
"xpath_long(payload, '/CofiResults/ExecutionTime/text()') as ExecutionTimeAsLong",
"xpath_string(payload, '/CofiResults/ExecutionTime/text()') as ExecutionTimeAsString",
"xpath_int(payload, '/CofiResults/InputData/ns2/HeaderSegment/Version/text()') as VersionAsInt")
请记住,xpath
函数将 return 一个 Array 字符串,而您可能会发现将值提取为字符串甚至 Long 更方便.在带有控制台接收器流的 Spark 3.0.1 中应用上面的代码将导致:
+-------------------------+-------------------+---------------------+------------+
|ExecutionTimeAsArryString|ExecutionTimeAsLong|ExecutionTimeAsString|VersionAsInt|
+-------------------------+-------------------+---------------------+------------+
|[20201103153839] |20201103153839 |20201103153839 |6 |
+-------------------------+-------------------+---------------------+------------+
我有一个连接到 ActiveMQ 的 spark 结构应用程序。应用程序从主题接收消息。这些消息采用 StringXML 的形式。我想从这个 nested-XML 中提取信息。我该怎么做?
我参考了
XML格式:
<CofiResults>
<ExecutionTime>20201103153839</ExecutionTime>
<FilterClass>S </FilterClass>
<InputData format="something" id="someID"><ns2:FrdReq xmlns:ns2="http://someone.com">
<HeaderSegment xmlns="https://somelink.com">
<Version>6</Version>
<SequenceNb>1</SequenceNb>
</HeaderSegment>
.
.
.
我的代码:
val df = spark.readStream
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
.option("brokerUrl", brokerUrl_)
.option("topic", topicName_)
.option("persistence", "memory")
.option("cleanSession", "true")
.option("username", username_)
.option("password", password_)
.load()
val payload_ = df.select('payload cast "string") // This payload IS the XMLString
现在我需要从上面的XML.
中提取ExecutionTime
、Version
和其他字段
您可以使用 SQL built-in 函数 xpath
等从嵌套的 XML 结构中提取数据。
给定一个嵌套的 XML 喜欢(为简单起见,我省略了任何标签参数)
<CofiResults>
<ExecutionTime>20201103153839</ExecutionTime>
<FilterClass>S</FilterClass>
<InputData>
<ns2>
<HeaderSegment>
<Version>6</Version>
<SequenceNb>1</SequenceNb>
</HeaderSegment>
</ns2>
</InputData>
</CofiResults>
然后您可以在 selectExpr
语句中使用那些 SQL 函数(没有 createOrReplaceTempView
),如下所示:
.selectExpr("CAST(payload AS STRING) as payload")
.selectExpr(
"xpath(payload, '/CofiResults/ExecutionTime/text()') as ExecutionTimeAsArryString",
"xpath_long(payload, '/CofiResults/ExecutionTime/text()') as ExecutionTimeAsLong",
"xpath_string(payload, '/CofiResults/ExecutionTime/text()') as ExecutionTimeAsString",
"xpath_int(payload, '/CofiResults/InputData/ns2/HeaderSegment/Version/text()') as VersionAsInt")
请记住,xpath
函数将 return 一个 Array 字符串,而您可能会发现将值提取为字符串甚至 Long 更方便.在带有控制台接收器流的 Spark 3.0.1 中应用上面的代码将导致:
+-------------------------+-------------------+---------------------+------------+
|ExecutionTimeAsArryString|ExecutionTimeAsLong|ExecutionTimeAsString|VersionAsInt|
+-------------------------+-------------------+---------------------+------------+
|[20201103153839] |20201103153839 |20201103153839 |6 |
+-------------------------+-------------------+---------------------+------------+