从 Spark Streaming 中的 ConsumerRecord 值创建 RDD

Creating an RDD from ConsumerRecord Value in Spark Streaming

我正在尝试创建基于 ConsumerRecord 值的 XmlRelation。

 val value = record.value();

    logger.info(".processRecord() : Value ={}" , value)
    if(value !=null) {

      val rdd = spark.sparkContext.parallelize(List(new String(value)))

当我尝试根据我收到 NullPointerException 的值创建 RDD 时。

org.apache.spark.SparkException: Job aborted due to stage failure:

这是因为我无法创建 RDD,因为我无法在工作节点上获取 sparkContext。显然我无法将此信息发送回 Driver,因为这是一个无限流。

我有什么选择。

另一种方法是将此记录数据与 Header 信息一起写入另一个主题,然后将其写回到另一个主题,并让另一个流式处理作业处理该信息。

我得到的 ConsumerRecord 值是字符串 (XML),我想使用现有模式将其解析为 RDD 并进一步处理。

谢谢 萨提什

我可以使用下面的代码并让它工作

val xmlStringDF:DataFrame = batchDF.selectExpr("value").filter($"value".isNotNull)

          logger.info(".convert() : xmlStringDF Schema ={}",xmlStringDF.schema.treeString)

          val rdd: RDD[String] = xmlStringDF.as[String].rdd

          logger.info(".convert() : Before converting String DataFrame into XML DataFrame")

          val relation = XmlRelation(
            () => rdd,
            None,
            parameters.toMap,
            xmlSchema)(spark.sqlContext)
          val xmlDF = spark.baseRelationToDataFrame(relation)