如何在遗留 Spark Streaming 中使用 foreachRDD

How to use foreachRDD in legacy Spark Streaming

我在使用 foreachRDD 处理 CSV 数据时出现异常。这是我的代码

  case class Person(name: String, age: Long)
  val conf = new SparkConf()
  conf.setMaster("local[*]")
  conf.setAppName("CassandraExample").set("spark.driver.allowMultipleContexts", "true")
  val ssc = new StreamingContext(conf, Seconds(10))
  val smDstream=ssc.textFileStream("file:///home/sa/testFiles")

  smDstream.foreachRDD((rdd,time) => {
  val peopleDF = rdd.map(_.split(",")).map(attributes => 
  Person(attributes(0), attributes(1).trim.toInt)).toDF()
  peopleDF.createOrReplaceTempView("people")
  val teenagersDF = spark.sql("insert into table devDB.stam SELECT name, age 
  FROM people WHERE age BETWEEN 13 AND 29")
  //teenagersDF.show  
    })
  ssc.checkpoint("hdfs://go/hive/warehouse/devDB.db")
  ssc.start()

我收到以下错误 java.io.NotSerializableException: DStream 检查点已启用,但 DStream 及其函数不可序列化 org.apache.spark.streaming.StreamingContext 序列化堆栈: - 对象不可序列化(class:org.apache.spark.streaming.StreamingContext,值:org.apache.spark.streaming.StreamingContext@1263422a) - 字段(class:$iw,名称:ssc,类型:class org.apache.spark.streaming.StreamingContext)

请帮忙

这个问题不再有意义,因为 dStreams 正在被弃用/放弃。

代码中有几件事需要考虑,因此很难收集到确切的问题。话虽如此,我也不得不考虑一下,因为我不是序列化专家。

您可以找到一些尝试直接写入 Hive table 而不是路径的帖子,在我的回答中我使用了一种方法,但您可以使用您的 Spark SQL 方法为TempView写作,这都是可能的。

我模拟了来自 QueueStream 的输入,因此不需要应用拆分。如果您遵循相同的 "global" 方法,则可以根据自己的情况进行调整。我选择写入一个在需要时创建的镶木地板文件。您可以创建您的 tempView,然后根据您的初始方法使用 spark.sql。

The Output Operations on DStreams are:

  • print()
  • saveAsTextFiles(prefix, [suffix])
  • saveAsObjectFiles(prefix, [suffix])
  • saveAsHadoopFiles(prefix, [suffix])
  • foreachRDD(func)

foreachRDD

The most generic output operator that applies a function, func, to each RDD generated from the stream. This function should push the data in each RDD to an external system, such as saving the RDD to files, or writing it over the network to a database. Note that the function func is executed in the driver process running the streaming application, and will usually have RDD actions in it that will force the computation of the streaming RDDs.

It states saving to files, but it can do what you want via foreachRDD, albeit I assumed the idea was to external systems. Saving to files is quicker in my view as opposed to going through steps to write a table directly. You want to offload data asap with Streaming as volumes are typically high.

两步:

In a separate class to the Streaming Class - run under Spark 2.4:

case class Person(name: String, age: Int)

Then the Streaming logic you need to apply - you may need some imports that I have in my notebook otherwise as I ran this under DataBricks:

import org.apache.spark.sql.SparkSession
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable
import org.apache.spark.sql.SaveMode

val spark = SparkSession
           .builder
           .master("local[4]")
           .config("spark.driver.cores", 2)
           .appName("forEachRDD")
           .getOrCreate()

val sc = spark.sparkContext
val ssc = new StreamingContext(spark.sparkContext, Seconds(1)) 

val rddQueue = new mutable.Queue[RDD[List[(String, Int)]]]()
val QS = ssc.queueStream(rddQueue) 

QS.foreachRDD(q => {
   if(!q.isEmpty) {   
      val q_flatMap = q.flatMap{x=>x}
      val q_withPerson = q_flatMap.map(field => Person(field._1, field._2))
      val df = q_withPerson.toDF()      

      df.write
        .format("parquet")
        .mode(SaveMode.Append)
        .saveAsTable("SO_Quest_BigD")
   }
 }
)

ssc.start()
for (c <- List(List(("Fred",53), ("John",22), ("Mary",76)), List(("Bob",54), ("Johnny",92), ("Margaret",15)), List(("Alfred",21), ("Patsy",34), ("Sylvester",7)) )) {
   rddQueue += ssc.sparkContext.parallelize(List(c))
} 
ssc.awaitTermination()