如何从 Spark-Structured-Streaming 中的嵌套 XML_String 中提取信息

How to extract information from a nested XML_String in Spark-Structured-Streaming

我有一个连接到 ActiveMQ 的 spark 结构应用程序。应用程序从主题接收消息。这些消息采用 StringXML 的形式。我想从这个 nested-XML 中提取信息。我该怎么做?

我参考了 ,但无法在 Scala 中实现类似的东西。

XML格式:

<CofiResults>
  <ExecutionTime>20201103153839</ExecutionTime>
  <FilterClass>S </FilterClass>
  <InputData format="something" id="someID"><ns2:FrdReq xmlns:ns2="http://someone.com">
    <HeaderSegment xmlns="https://somelink.com">
        <Version>6</Version>
        <SequenceNb>1</SequenceNb>
    </HeaderSegment>
.
.
.

我的代码:

val df = spark.readStream
            .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
            .option("brokerUrl", brokerUrl_)
            .option("topic", topicName_)
            .option("persistence", "memory")
            .option("cleanSession", "true")
            .option("username", username_)
            .option("password", password_)
            .load()

val payload_ = df.select('payload cast "string") // This payload IS the XMLString

现在我需要从上面的XML.

中提取ExecutionTimeVersion和其他字段

您可以使用 SQL built-in 函数 xpath 等从嵌套的 XML 结构中提取数据。

给定一个嵌套的 XML 喜欢(为简单起见,我省略了任何标签参数)

<CofiResults>
  <ExecutionTime>20201103153839</ExecutionTime>
  <FilterClass>S</FilterClass>
  <InputData>
    <ns2>
      <HeaderSegment>
        <Version>6</Version>
        <SequenceNb>1</SequenceNb>
      </HeaderSegment>
    </ns2>
  </InputData>
</CofiResults>

然后您可以在 selectExpr 语句中使用那些 SQL 函数(没有 createOrReplaceTempView),如下所示:

  .selectExpr("CAST(payload AS STRING) as payload")
  .selectExpr(
    "xpath(payload, '/CofiResults/ExecutionTime/text()') as ExecutionTimeAsArryString",
    "xpath_long(payload, '/CofiResults/ExecutionTime/text()') as ExecutionTimeAsLong",
    "xpath_string(payload, '/CofiResults/ExecutionTime/text()') as ExecutionTimeAsString",
    "xpath_int(payload, '/CofiResults/InputData/ns2/HeaderSegment/Version/text()') as VersionAsInt")

请记住,xpath 函数将 return 一个 Array 字符串,而您可能会发现将值提取为字符串甚至 Long 更方便.在带有控制台接收器流的 Spark 3.0.1 中应用上面的代码将导致:

+-------------------------+-------------------+---------------------+------------+
|ExecutionTimeAsArryString|ExecutionTimeAsLong|ExecutionTimeAsString|VersionAsInt|
+-------------------------+-------------------+---------------------+------------+
|[20201103153839]         |20201103153839     |20201103153839       |6           |
+-------------------------+-------------------+---------------------+------------+