如何在流批流式连接中定义连接条件?
How to define a join condition in stream-batch streaming join?
我在 java 1.8 中使用 spark-sql-2.4.1v。和卡夫卡版本 spark-sql-kafka-0-10_2.11_2.4.3.
我正在尝试将静态数据帧(即元数据)与另一个流式数据帧连接起来,如下所示:
Dataset<Row> streamingDs = //read from kafka topic
Dataset<Row> staticDf= //read from oracle meta-data table.
Dataset<Row> joinDf = streamingDs.as("c").join(staticDf.as("i") ,
"c.code = i.industry_code"
);
尽管我在数据框中有各自的列数据,但它给出了以下错误。
线程 "main" org.apache.spark.sql.AnalysisException 中出现异常:无法在联接左侧解析 USING 列 c.code = i.industry_code
。左侧栏:[id, transactionDate, companyName,code];
我试过如下:
Dataset<Row> joinDf = streamingDs.as("c").join(staticDf.as("i") ,
"c.code = i.industry_code",
"inner"
);
这给出了以下错误:
The method join(Dataset, String) in the type Dataset is not applicable for the arguments (Dataset, String, String)
tl;dr c.code = i.industry_code
被认为是要连接的列的名称(不是连接表达式)。
修改代码如下:
streamingDs.as("c").join(staticDf.as("i")) // INNER JOIN is the default
.where("c.code = i.industry_code")
给你,下面的代码甚至读取每批次的最新更新的维度数据,但请记住新的维度数据(在我的例子中,国家信息必须在一个新文件中)。
package com.capone.streaming.BraodcastJoin
import com.amazonaws.services.dynamodbv2.model.AttributeValue
import com.capone.streaming.BroadCastStreamJoin.getClass
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{DataFrame, ForeachWriter, Row, SparkSession}
import org.apache.spark.sql.functions.{broadcast, expr}
import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
object BroadCastStreamJoin2 {
def main(args: Array[String]) = {
@transient lazy val logger: Logger = Logger.getLogger(getClass.getName)
Logger.getLogger("akka").setLevel(Level.WARN)
Logger.getLogger("org").setLevel(Level.ERROR)
Logger.getLogger("com.amazonaws").setLevel(Level.ERROR)
Logger.getLogger("com.amazon.ws").setLevel(Level.ERROR)
Logger.getLogger("io.netty").setLevel(Level.ERROR)
val spark = SparkSession
.builder()
.master("local")
.getOrCreate()
val schemaUntyped1 = StructType(
Array(
StructField("id", StringType),
StructField("customrid", StringType),
StructField("customername", StringType),
StructField("countrycode", StringType),
StructField("timestamp_column_fin_1", TimestampType)
))
val schemaUntyped2 = StructType(
Array(
StructField("id", StringType),
StructField("countrycode", StringType),
StructField("countryname", StringType),
StructField("timestamp_column_fin_2", TimestampType)
))
import org.apache.spark.sql.streaming.Trigger
val factDf1 = spark.readStream
.schema(schemaUntyped1)
.option("header", "true")
//.option("maxFilesPerTrigger", 1)
.csv("src/main/resources/broadcasttest/fact")
var countrDf: Option[DataFrame] = None: Option[DataFrame]
def readDim() = {
val dimDf2 = spark.read
.schema(schemaUntyped2)
.option("header", "true")
.csv("src/main/resources/broadcasttest/dimension")
if (countrDf != None) {
countrDf.get.unpersist()
}
countrDf = Some(
dimDf2
.withColumnRenamed("id", "id_2")
.withColumnRenamed("countrycode", "countrycode_2"))
countrDf.get.show()
}
factDf1.writeStream
.outputMode("append")
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.show(10)
readDim()
batchDF
.join(
countrDf.get,
expr(
"""
countrycode_2 = countrycode
"""
),
"leftOuter"
)
.show
}
.start()
.awaitTermination()
}
}
我在 java 1.8 中使用 spark-sql-2.4.1v。和卡夫卡版本 spark-sql-kafka-0-10_2.11_2.4.3.
我正在尝试将静态数据帧(即元数据)与另一个流式数据帧连接起来,如下所示:
Dataset<Row> streamingDs = //read from kafka topic
Dataset<Row> staticDf= //read from oracle meta-data table.
Dataset<Row> joinDf = streamingDs.as("c").join(staticDf.as("i") ,
"c.code = i.industry_code"
);
尽管我在数据框中有各自的列数据,但它给出了以下错误。
线程 "main" org.apache.spark.sql.AnalysisException 中出现异常:无法在联接左侧解析 USING 列 c.code = i.industry_code
。左侧栏:[id, transactionDate, companyName,code];
我试过如下:
Dataset<Row> joinDf = streamingDs.as("c").join(staticDf.as("i") ,
"c.code = i.industry_code",
"inner"
);
这给出了以下错误:
The method join(Dataset, String) in the type Dataset is not applicable for the arguments (Dataset, String, String)
tl;dr c.code = i.industry_code
被认为是要连接的列的名称(不是连接表达式)。
修改代码如下:
streamingDs.as("c").join(staticDf.as("i")) // INNER JOIN is the default
.where("c.code = i.industry_code")
给你,下面的代码甚至读取每批次的最新更新的维度数据,但请记住新的维度数据(在我的例子中,国家信息必须在一个新文件中)。
package com.capone.streaming.BraodcastJoin
import com.amazonaws.services.dynamodbv2.model.AttributeValue
import com.capone.streaming.BroadCastStreamJoin.getClass
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{DataFrame, ForeachWriter, Row, SparkSession}
import org.apache.spark.sql.functions.{broadcast, expr}
import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
object BroadCastStreamJoin2 {
def main(args: Array[String]) = {
@transient lazy val logger: Logger = Logger.getLogger(getClass.getName)
Logger.getLogger("akka").setLevel(Level.WARN)
Logger.getLogger("org").setLevel(Level.ERROR)
Logger.getLogger("com.amazonaws").setLevel(Level.ERROR)
Logger.getLogger("com.amazon.ws").setLevel(Level.ERROR)
Logger.getLogger("io.netty").setLevel(Level.ERROR)
val spark = SparkSession
.builder()
.master("local")
.getOrCreate()
val schemaUntyped1 = StructType(
Array(
StructField("id", StringType),
StructField("customrid", StringType),
StructField("customername", StringType),
StructField("countrycode", StringType),
StructField("timestamp_column_fin_1", TimestampType)
))
val schemaUntyped2 = StructType(
Array(
StructField("id", StringType),
StructField("countrycode", StringType),
StructField("countryname", StringType),
StructField("timestamp_column_fin_2", TimestampType)
))
import org.apache.spark.sql.streaming.Trigger
val factDf1 = spark.readStream
.schema(schemaUntyped1)
.option("header", "true")
//.option("maxFilesPerTrigger", 1)
.csv("src/main/resources/broadcasttest/fact")
var countrDf: Option[DataFrame] = None: Option[DataFrame]
def readDim() = {
val dimDf2 = spark.read
.schema(schemaUntyped2)
.option("header", "true")
.csv("src/main/resources/broadcasttest/dimension")
if (countrDf != None) {
countrDf.get.unpersist()
}
countrDf = Some(
dimDf2
.withColumnRenamed("id", "id_2")
.withColumnRenamed("countrycode", "countrycode_2"))
countrDf.get.show()
}
factDf1.writeStream
.outputMode("append")
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.show(10)
readDim()
batchDF
.join(
countrDf.get,
expr(
"""
countrycode_2 = countrycode
"""
),
"leftOuter"
)
.show
}
.start()
.awaitTermination()
}
}