Scala 中的变量替换

Variable substitution in scala

我在 scala 中有两个数据帧,它们都具有来自两个不同表但结构相同的数据(srcdataframetgttable)。我必须根据复合主键和 select 几列加入这两列,并附加两列代码如下:

for(i <- 2 until numCols) {
  srcdataframe.as("A")
    .join(tgttable.as("B"), $"A.INSTANCE_ID" === $"B.INSTANCE_ID" && 
       $"A.CONTRACT_LINE_ID" === $"B.CONTRACT_LINE_ID", "inner")
    .filter($"A." + srcColnm(i) =!= $"B." + srcColnm(i))
    .select($"A.INSTANCE_ID",
            $"A.CONTRACT_LINE_ID",
           "$"+"\""+"A."+srcColnm(i)+"\""+","+"$"+"\""+"B."+srcColnm(i)+"\"")
    .withColumn("MisMatchedCol",lit("\""+srcColnm(i)+"\""))
    .withColumn("LastRunDate",current_timestamp.cast("long"))
    .createOrReplaceTempView("IPF_1M_Mismatch"); 

hiveSQLContext.sql("Insert into table xxxx.f2f_Mismatch1 select t.* from (select * from IPF_1M_Mismatch) t");}

以下是我正在尝试做的事情:

  1. srcdataframetgttable 的内部联接基于 instance_idcontract_line_id
  2. Select 仅 instance_idcontract_line_idmismatched_col_valuesmismatched_col_nmtimestamp
  3. 的硬编码
  4. srcColnm(i) 是一个 array 字符串,其中包含要比较的非主键。

但是,我无法在 for 循环中解析数据框中的变量。我尝试查找解决方案 here and 。我知道这可能是因为spark仅在编译时替换变量的方式,在这种情况下我不确定如何解决它。

您可以简单地使用字符串或 col() 函数来代替使用 $ 创建列。我还建议在 for 之外执行 join,因为这是一项昂贵的操作。稍微更改了代码,解决问题的主要区别在于 select:

val df =  srcdataframe.as("A")
  .join(tgttable.as("B"), Seq("INSTANCE_ID", "CONTRACT_LINE_ID"), "inner")

for(columnName <- srcColnm) {
  df.filter(col("A." + columnName) =!= col("B." + columnName))
    .select("INSTANCE_ID", "CONTRACT_LINE_ID", "A." + columnName, "B." + columnName)
    .withColumn("MisMatchedCol", lit(columnName))
    .withColumn("LastRunDate", current_timestamp().cast("long"))
    .createOrReplaceTempView("IPF_1M_Mismatch")

  // Hive command
}

关于select中的问题:

$col() 函数的缩写,它是按名称 selecting 数据框中的一列。 select 中的问题是前两个参数 col("A.INSTANCE_ID")col("A.CONTRACT_LINE_ID") are two columns ($replaced bycol()` 为了清楚起见)。

但是,接下来的两个参数是字符串。不可能混合这两者,要么所有参数都应该是列,要么都是字符串。由于您使用 "A."+srcColnm(i) 构建列名 $ 无法使用,但是,您可以使用 col("A."+srcColnm(i)).