将多个 RDD/数据帧插入全局视图

inserting mulitiple RDDs / dataframes to a global view

我正在使用 spark streaming 每 10 分钟从 kakfa broker 接收一次通话记录。我想将这些记录插入到一​​些 temptable(全局?)中,并在收到来自 Kakfa 的消息后立即继续插入。

请注意我不想存储在 hive 中。每次插入后,我想检查特定号码的呼叫是否超过 20(例如)。下面是我编写的代码,它将每个 rdd 转换为 df,然后创建一个临时视图。但是,我猜该视图将仅包含最后一个 RDD。如何保持在同一视图中插入记录并 运行 sql 之后?

val topics = Array("AIRDRMAIN", "")
val messages = KafkaUtils.createDirectStream[String, String](
      streamingContext,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )
val Lines = messages.map(line => line.value())
val AirDRStream = Lines.map(AirDRFilter.parseAirDR)
AirDRStream.foreachRDD(foreachFunc = rdd => {
  System.out.println("--- New RDD with " + rdd.count() + " records");
  if (rdd.count() == 0) {
     println("---WANG No logs received in this time interval=================")
  } else {
     val sqlContext = SparkSession
       .builder()
       .appName("Spark SQL basic example")
       .getOrCreate()
   import sqlContext.implicits._
   rdd.toDF().createOrReplaceTempView("AIR")
   val FilteredDR = sqlContext.sql("select refillProfileID, count(*) from AIR group by refillProfileID")
   FilteredDR.show()
   }
  })
  streamingContext.start()
  streamingContext.awaitTermination()

下面是添加 globalTempView 逻辑后更新的代码。

val schema_string = "subscriberNumber, originNodeType, originHostName, originOperatorID, originTimeStamp, currentServiceClass, voucherBasedRefill, transactionAmount, refillProfileID, voucherGroupID, externalData1, externalData2"
val schema_rdd = StructType(schema_string.split(",")
                 .map(fieldName => StructField(fieldName, StringType, true)))
val init_df = sqlContext.createDataFrame(sc.emptyRDD[Row], schema_rdd)

println("initial count of initial RDD is " + init_df.count())

init_df.createGlobalTempView("AIRGLOBAL")

AirDRStream.foreachRDD(foreachFunc = rdd => {
  System.out.println("--- New RDD with " + rdd.count() + " records");
  if (rdd.count() == 0) {
    println("--- No logs received in this time interval=================")
  } else {
    init_df.union(rdd.toDF())
    println("after union count of initial  RDD is " + init_df.count())
    rdd.toDF().createOrReplaceTempView("AIR")
    val FilteredDR = sqlContext.sql("select  count(*) from AIR ")
    val globalviewinsert = sqlContext.sql("Insert into global_temp.AIRGLOBAL select * from AIR ")
    val globalview = sqlContext.sql("SELECT COUNT(*) FROM  global_temp.AIRGLOBAL ")
    FilteredDR.show()
    globalviewinsert.show()
    globalview.show()
  }
})
streamingContext.start()
streamingContext.awaitTermination()

您可以创建全局临时视图。引用文档

Temporary views in Spark SQL are session-scoped and will disappear if the session that creates it terminates. If you want to have a temporary view that is shared among all sessions and keep alive until the Spark application terminates, you can create a global temporary view. Global temporary view is tied to a system preserved database global_temp, and we must use the qualified name to refer it, e.g. SELECT * FROM global_temp.view1.

// Register the DataFrame as a global temporary view
df.createGlobalTempView("people")

// Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

// Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show()