如何刷新火花流中加载的数据框内容?

How to refresh loaded dataframe contents in spark streaming?

使用 spark-sql 2.4.1 和 kafka 进行实时流式传输。 我有以下用例

  1. Need to load a meta-data from hdfs for joining with streaming dataframe from kafka.
  2. streaming data record's particular columns should be looked up in meta-data dataframe particular colums(col-X) data. If found pick meta-data column(col-Y) data Else not found , insert streaming record/column data into meta-data dataframe i.e. into hdfs. I.e. it should be looked up if streaming dataframe contain same data again.

作为在 spark 作业开始时加载的元数据,如何在流式作业中再次刷新其内容以查找并加入另一个流式数据帧?

编辑 这个解决方案更精细并且可以工作(适用于所有用例)。
对于在不更改文件或从数据库读取数据的情况下将数据附加到现有文件的更简单的情况,可以使用更简单的解决方案
这是因为数据帧(和底层 RDD)分区只创建一次,每次使用数据帧时都会读取数据。 (除非被spark缓存)


如果负担得起,您可以尝试(重新)阅读每个微细菌中的 元数据数据框

更好的方法是将 元数据数据帧 放入缓存中(不要与 spark 缓存数据帧混淆)。缓存类似于映射,不同之处在于它不会给插入的条目超过配置的生存时间。

在您的代码中,您将尝试为每个微批次从缓存中获取此 元数据数据帧 一次。如果缓存 return null。您将再次读取数据框,放入缓存,然后使用数据框。

Cache class 会是

import scala.collection.mutable

// cache class to store the dataframe
class Cache[K, V](timeToLive: Long) extends mutable.Map[K, V] {
  private var keyValueStore = mutable.HashMap[K, (V, Long)]()

  override def get(key: K):Option[V] = {
    keyValueStore.get(key) match {
      case Some((value, insertedAt)) if insertedAt+timeToLive > System.currentTimeMillis => Some(value)
      case _ => None
    }
  }

  override def iterator: Iterator[(K, V)] = keyValueStore.iterator
    .filter({
      case (key, (value, insertedAt)) => insertedAt+timeToLive > System.currentTimeMillis
    }).map(x => (x._1, x._2._1))

  override def -=(key: K): this.type = {
    keyValueStore-=key
    this
  }

  override def +=(kv: (K, V)): this.type = {
    keyValueStore += ((kv._1, (kv._2, System.currentTimeMillis())))
    this
  }
}

通过缓存访问元数据数据帧的逻辑

import org.apache.spark.sql.DataFrame

object DataFrameCache {
  lazy val cache = new Cache[String, DataFrame](600000) // ten minutes timeToLive

  def readMetaData: DataFrame = ???

  def getMetaData: DataFrame = {
    cache.get("metadataDF") match {
      case Some(df) => df
      case None => {
        val metadataDF = readMetaData
        cache.put("metadataDF", metadataDF)
        metadataDF
      }
    }
  }
}

我可能误解了这个问题,但刷新元数据数据框应该是开箱即用的支持功能。

您根本不需要做任何事情。

我们来看例子:

// a batch dataframe
val metadata = spark.read.text("metadata.txt")
scala> metadata.show
+-----+
|value|
+-----+
|hello|
+-----+

// a streaming dataframe
val stream = spark.readStream.text("so")

// join on the only value column
stream.join(metadata, "value").writeStream.format("console").start

只要 so 目录中的文件内容与 metadata.txt 文件相匹配,您就应该将数据帧打印到控制台。

-------------------------------------------
Batch: 1
-------------------------------------------
+-----+
|value|
+-----+
|hello|
+-----+

metadata.txt 更改为 world,并且只有来自新文件的世界才会匹配。

下面是我在 spark 2.4.5 中针对与流的左外连接所遵循的场景 join.Below 进程正在推动 spark 读取最新的维度数据更改。

进程用于具有批次维度的 Stream Join(始终更新)

第 1 步:-

在启动 Spark 流作业之前:- 确保维度批处理数据文件夹只有一个文件,并且该文件应至少有一条记录(由于某些原因,放置空文件不起作用)。

第 2 步:- 开始你的流式作业并在kafka流中添加流记录

第 3 步:- 用值覆盖暗数据(文件应同名不要更改,维度文件夹应只有一个文件) 注意:- 不要使用 spark 写入此文件夹使用 Java 或 Scala filesystem.io 覆盖文件或 bash 删除文件并替换为具有相同名称的新数据文件。

第 4 步:- 在下一批中,spark 能够在加入 kafka 流时读取更新的维度数据...

示例代码:-

package com.broccoli.streaming.streamjoinupdate

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
import org.apache.spark.sql.{DataFrame, SparkSession}

object BroadCastStreamJoin3 {

  def main(args: Array[String]): Unit = {
    @transient lazy val logger: Logger = Logger.getLogger(getClass.getName)

    Logger.getLogger("akka").setLevel(Level.WARN)
    Logger.getLogger("org").setLevel(Level.ERROR)
    Logger.getLogger("com.amazonaws").setLevel(Level.ERROR)
    Logger.getLogger("com.amazon.ws").setLevel(Level.ERROR)
    Logger.getLogger("io.netty").setLevel(Level.ERROR)

    val spark = SparkSession
      .builder()
      .master("local")
      .getOrCreate()

    val schemaUntyped1 = StructType(
      Array(
        StructField("id", StringType),
        StructField("customrid", StringType),
        StructField("customername", StringType),
        StructField("countrycode", StringType),
        StructField("timestamp_column_fin_1", TimestampType)
      ))

    val schemaUntyped2 = StructType(
      Array(
        StructField("id", StringType),
        StructField("countrycode", StringType),
        StructField("countryname", StringType),
        StructField("timestamp_column_fin_2", TimestampType)
      ))

    val factDf1 = spark.readStream
      .schema(schemaUntyped1)
      .option("header", "true")
      .csv("src/main/resources/broadcasttest/fact")


    val dimDf3 = spark.read
      .schema(schemaUntyped2)
      .option("header", "true")
      .csv("src/main/resources/broadcasttest/dimension")
      .withColumnRenamed("id", "id_2")
      .withColumnRenamed("countrycode", "countrycode_2")

    import spark.implicits._

    factDf1
      .join(
        dimDf3,
        $"countrycode_2" <=> $"countrycode",
        "inner"
      )
      .writeStream
      .format("console")
      .outputMode("append")
      .start()
      .awaitTermination

  }
}

谢谢 斯里