从 Spark Streaming 中的 ConsumerRecord 值创建 RDD
Creating an RDD from ConsumerRecord Value in Spark Streaming
我正在尝试创建基于 ConsumerRecord 值的 XmlRelation。
val value = record.value();
logger.info(".processRecord() : Value ={}" , value)
if(value !=null) {
val rdd = spark.sparkContext.parallelize(List(new String(value)))
当我尝试根据我收到 NullPointerException 的值创建 RDD 时。
org.apache.spark.SparkException: Job aborted due to stage failure:
这是因为我无法创建 RDD,因为我无法在工作节点上获取 sparkContext。显然我无法将此信息发送回 Driver,因为这是一个无限流。
我有什么选择。
另一种方法是将此记录数据与 Header 信息一起写入另一个主题,然后将其写回到另一个主题,并让另一个流式处理作业处理该信息。
我得到的 ConsumerRecord 值是字符串 (XML),我想使用现有模式将其解析为 RDD 并进一步处理。
谢谢
萨提什
我可以使用下面的代码并让它工作
val xmlStringDF:DataFrame = batchDF.selectExpr("value").filter($"value".isNotNull)
logger.info(".convert() : xmlStringDF Schema ={}",xmlStringDF.schema.treeString)
val rdd: RDD[String] = xmlStringDF.as[String].rdd
logger.info(".convert() : Before converting String DataFrame into XML DataFrame")
val relation = XmlRelation(
() => rdd,
None,
parameters.toMap,
xmlSchema)(spark.sqlContext)
val xmlDF = spark.baseRelationToDataFrame(relation)
我正在尝试创建基于 ConsumerRecord 值的 XmlRelation。
val value = record.value();
logger.info(".processRecord() : Value ={}" , value)
if(value !=null) {
val rdd = spark.sparkContext.parallelize(List(new String(value)))
当我尝试根据我收到 NullPointerException 的值创建 RDD 时。
org.apache.spark.SparkException: Job aborted due to stage failure:
这是因为我无法创建 RDD,因为我无法在工作节点上获取 sparkContext。显然我无法将此信息发送回 Driver,因为这是一个无限流。
我有什么选择。
另一种方法是将此记录数据与 Header 信息一起写入另一个主题,然后将其写回到另一个主题,并让另一个流式处理作业处理该信息。
我得到的 ConsumerRecord 值是字符串 (XML),我想使用现有模式将其解析为 RDD 并进一步处理。
谢谢 萨提什
我可以使用下面的代码并让它工作
val xmlStringDF:DataFrame = batchDF.selectExpr("value").filter($"value".isNotNull)
logger.info(".convert() : xmlStringDF Schema ={}",xmlStringDF.schema.treeString)
val rdd: RDD[String] = xmlStringDF.as[String].rdd
logger.info(".convert() : Before converting String DataFrame into XML DataFrame")
val relation = XmlRelation(
() => rdd,
None,
parameters.toMap,
xmlSchema)(spark.sqlContext)
val xmlDF = spark.baseRelationToDataFrame(relation)