使用火花流分析事件中心消息

Parsing Event Hub messages using spark streaming

我正在尝试使用 spark 流和 scala 解析从 Azure blob 文件事件生成的 Azure 事件中心消息。

import org.apache.spark.eventhubs.{ConnectionStringBuilder, EventHubsConf}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._


object eventhub {
  def main(args: Array[String]): Unit = {
val spark = SparkSession
  .builder()
  .appName("Event Hub")
  //.config("spark.some.config.option", "some-value")
  .master("local")
  .getOrCreate()
import spark.implicits._


// Event hub configurations
// Replace values below with yours
val eventHubName = "xxx"
val eventHubNSConnStr = "Endpoint=xxxxx"
val connStr = ConnectionStringBuilder(eventHubNSConnStr).setEventHubName(eventHubName).build

val customEventhubParameters = EventHubsConf(connStr).setMaxEventsPerTrigger(5)
val incomingStream = spark.readStream.format("eventhubs")
                    .options(customEventhubParameters.toMap).load()
incomingStream.printSchema



val testSchema = new StructType()
  //.add("offset", StringType)
  //.add("Time", StringType)
  //.add("Timestamp", LongType)
  .add ("Body", new ArrayType( new StructType()
  .add("topic", StringType)
  .add("subject", StringType)
  .add("eventType", StringType)
  .add("eventTime", StringType)
  .add("id", StringType)
  .add("data", new StructType()
    .add("api", StringType)
    .add("clientRequestId", StringType)
    .add("requestId", StringType)
    .add("eTag", StringType)
    .add("contentType", StringType)
    .add("contentLength", LongType)
    .add("blobType", StringType)
    .add("url", StringType)
    .add("sequencer", StringType)
    .add("storageDiagnostics", new StructType()
      .add("batchId", StringType)))
  .add("dataVersion", StringType)
  .add("metadataVersion", StringType), false))



 // Event Hub message format is JSON and contains "body" field
 // Body is binary, so you cast it to string to see the actual content of the message

val messages = incomingStream.select($"body".cast(StringType)).alias("body")
  //.select(explode($"body")).alias("newbody")
  .select(from_json($"body",testSchema)).alias("newbody")
    .select("newbody.*")

/*
Output of val messages = incomingStream.select($"body".cast(StringType)).alias("body")

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|body                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[{"topic":"A1","subject":"A2","eventType":"A3","eventTime":"2019-07-26T17:00:32.4820786Z","id":"1","data":{"api":"PutBlob","clientRequestId":"A4","requestId":"A5","eTag":"A6","contentType":"A7","contentLength":496,"blobType":"BlockBlob","url":"https://test.blob.core.windows.net/test/20190726125719.csv","sequencer":"1","storageDiagnostics":{"batchId":"1"}},"dataVersion":"","metadataVersion":"1"}]|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

*/
    messages.writeStream
      .outputMode("append")
      .format("console")
      .option("truncate", false)
      .start()
      .awaitTermination()

  }

}

这是原始传入流和 "body"

的结构
root
 |-- body: binary (nullable = true)
 |-- partition: string (nullable = true)
 |-- offset: string (nullable = true)
 |-- sequenceNumber: long (nullable = true)
 |-- enqueuedTime: timestamp (nullable = true)
 |-- publisher: string (nullable = true)
 |-- partitionKey: string (nullable = true)
 |-- properties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- systemProperties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

root
 |-- body: string (nullable = true)

查看 "body" 的输出,感觉像一个数组,似乎需要展开,但 "body" 数据类型变成字符串,它抱怨使用 "explode"函数。

当我传递架构时,此时解析不正确,因为它是字符串,我不确定结构到底应该是什么以及如何获得要解析的 JSON 结构。目前我得到 NULL 输出,因为它显然在 JSON 解析时失败。任何意见表示赞赏。感谢您的帮助。

根据上面打印的 body 输出,似乎没有名称为 Body 的元素,这就是它 returning null 的原因,请使用下面修改后的架构定义,应该会有帮助。

val testSchema = new StructType()
      .add("topic", StringType)
      .add("subject", StringType)
      .add("eventType", StringType)
      .add("eventTime", StringType)
      .add("id", StringType)
      .add("data", new StructType()
        .add("api", StringType)
        .add("clientRequestId", StringType)
        .add("requestId", StringType)
        .add("eTag", StringType)
        .add("contentType", StringType)
        .add("contentLength", LongType)
        .add("blobType", StringType)
        .add("url", StringType)
        .add("sequencer", StringType)
        .add("storageDiagnostics", new StructType()
          .add("batchId", StringType)))
      .add("dataVersion", StringType)
      .add("metadataVersion", StringType)

如果您的输入负载在数组中包含多个对象,则具有上述架构的 from_json 将 return 为空。如果您希望数组中有多个对象,那么下面的架构应该有所帮助。

 val testSchema = new ArrayType(new StructType()
  .add("topic", StringType)
  .add("subject", StringType)
  .add("eventType", StringType)
  .add("eventTime", StringType)
  .add("id", StringType)
  .add("data", new StructType()
    .add("api", StringType)
    .add("clientRequestId", StringType)
    .add("requestId", StringType)
    .add("eTag", StringType)
    .add("contentType", StringType)
    .add("contentLength", LongType)
    .add("blobType", StringType)
    .add("url", StringType)
    .add("sequencer", StringType)
    .add("storageDiagnostics", new StructType()
      .add("batchId", StringType)))
  .add("dataVersion", StringType)
  .add("metadataVersion", StringType),false)