如何从 kafka 中的两个生产者摄取数据并使用 spark 结构化流加入?

How to ingest data from two producers in kafka and join using spark structured streaming?

我正在尝试从两个 kafka 主题中读取数据,但我无法加入并找到最终数据框。 我的 kafka 主题是 CSVStreamRetail 和 OrderItems。

val spark = SparkSession
      .builder
      .appName("Spark-Stream-Example")
      .master("local[*]")
      .config("spark.sql.warehouse.dir", "file:///C:/temp")
      .getOrCreate()

    val ordersSchema = new StructType()
      .add("order_id", IntegerType)
      .add("order_date", StringType)
      .add("order_customer_id", IntegerType)
      .add("order_status", StringType)

    val orderItemsSchema = new StructType()
      .add("order_item_id",IntegerType)
      .add("order_item_order_id",IntegerType)
      .add("order_item_product_id",IntegerType)
      .add("order_item_quantity",IntegerType)
      .add("order_item_subtotal",DoubleType)
      .add("order_item_product_price", DoubleType)

    import spark.implicits._

    val df1 = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "CSVStreamRetail")
      .load()

    val df2 = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "OrderItems")
      .load()

    val ordersDF = df1.selectExpr("CAST(value AS STRING)", "CAST(timestamp as TIMESTAMP)").as[(String,Timestamp)]
      .select(from_json($"value", ordersSchema).as("orders_data"),$"timestamp")
      .select("orders_data.*","timestamp")

    val orderItemsDF = df2.selectExpr("CAST(value as STRING)", "CAST(timestamp as TIMESTAMP)").as[(String,Timestamp)]
      .select(from_json($"value",orderItemsSchema).as("order_items_data"),$"timestamp")
      .select("order_items_data.*","timestamp")

    val finalDF = orderItemsDF.join(ordersDF, orderItemsDF("order_item_order_id")===ordersDF("order_id"))

    finalDF
      .writeStream
      .format("console")
      .option("truncate", "false")
      .start()
      .awaitTermination()

我收到的输出是一个空数据帧。

首先请检查您是否在 kafka 主题中收到数据。 在流-流连接的情况下,您应该始终至少在一个流中提供水印。我看到您想执行内部联接。 所以我添加了 200 seconds 水印,现在它在输出数据帧中显示数据。

val spark = SparkSession
  .builder
  .appName("Spark-Stream-Example")
  .master("local[*]")
  .config("spark.sql.warehouse.dir", "file:///C:/temp")
  .getOrCreate()

val ordersSchema = new StructType()
  .add("order_id", IntegerType)
  .add("order_date", StringType)
  .add("order_customer_id", IntegerType)
  .add("order_status", StringType)

val orderItemsSchema = new StructType()
  .add("order_item_id",IntegerType)
  .add("order_item_order_id",IntegerType)
  .add("order_item_product_id",IntegerType)
  .add("order_item_quantity",IntegerType)
  .add("order_item_subtotal",DoubleType)
  .add("order_item_product_price", DoubleType)

import spark.implicits._

val df1 = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "CSVStreamRetail")
  .load()

val df2 = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "OrderItems")
  .load()

val ordersDF = df1.selectExpr("CAST(value AS STRING)", "CAST(timestamp as TIMESTAMP)").as[(String,Timestamp)]
  .select(from_json($"value", ordersSchema).as("orders_data"),$"timestamp")
  .select("orders_data.*","timestamp")
  .withWatermark("timestamp","200 seconds")

val orderItemsDF = df2.selectExpr("CAST(value as STRING)", "CAST(timestamp as TIMESTAMP)").as[(String,Timestamp)]
  .select(from_json($"value",orderItemsSchema).as("order_items_data"),$"timestamp")
  .select("order_items_data.*","timestamp")
  .withWatermark("timestamp","200 seconds")

val finalDF = orderItemsDF.join(ordersDF, orderItemsDF("order_item_order_id")===ordersDF("order_id"))

finalDF
  .writeStream
  .format("console")
  .option("truncate", "false")
  .start()
  .awaitTermination()

使用事件时间戳加入。 让我知道这是否有帮助。