当我们有几个非连接列名称相同时,如何在连接后获取内部连接的各个数据集?

How to get individual datasets of inner join after join when we have few non-join column names same?

您好,有如下数据

+----+------------+------+-------------------+
|company_id|city    |state     | updated_date|
+----+------------+------+-------------------+
|  111|       city1|    state1 | 1990-12-01|
|  222|       city2|    state2 | 1991-12-01|
+----+------------+------+-----------------+

+----+------------+------+-------------------+
|companyId|city    |state     |    zipcode|
+----+------------+------+-------------------+
|  111|       city1|    state1 | 111111  |
|  222|       city2|    state2 | 222222   |
+----+------------+------+-----------------+

我正在对 companyId 进行连接,如下所示

Dataset<Row> joinDs = firstDs.join(secondDs,  firstDs.col("company_id").equalTo(secondDs.col("companyId")), "inner");

joinDs 有歧义列 "city" & "state"

问题

  1. 如何处理歧义列?有什么方法可以区分歧义列和 joinDs 吗?

  2. 如何 select 来自 joinDs 的特定列,其中很少来自 "firstDs" 而很少来自 "secondDs"?

将静态 dF 加入蒸汽 dF/s?

When join the static dataframe "secondDs" (from hdfs) with streaming dataframe "firstDs" ( from kafka) as below

 Dataset<Row> joinUpdatedRecordsDs =   firstDs.join(secondDs, firstDs.col("companyId").equalTo(secondDs.col("company_id"), "inner"));

没有给出任何结果 在火花批处理中工作正常或给出结果的地方相同。

我在这里做错了什么?如何解决这个问题?

错误:

Caused by: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 385, Column 34: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 385, Column 34: A method named "toString" is not declared in any enclosing class nor any supertype, nor through a static import
    at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:1304)
    at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon.load(CodeGenerator.scala:1376)

There are two ways to implement this. one is data frame and another one is using Spark-Sql.

数据框:

var df1 = spark.createDataFrame(Seq((111,"city1","state1","1990-12-01"),(222,"city2","state2","1991-12-01"))).toDF("company_id","city","state","updated_date")

df1.show
+----------+-----+------+------------+
|company_id| city| state|updated_date|
+----------+-----+------+------------+
|       111|city1|state1|  1990-12-01|
|       222|city2|state2|  1991-12-01|
+----------+-----+------+------------+

var df2 = spark.createDataFrame(Seq((111,"city1","state1",111111),(222,"city2","state2",222222))).toDF("company_id","city","state","zipcode")

scala> df1.join(df2,Seq("company_id"),"inner").show
+----------+-----+------+------------+-----+------+-------+
|company_id| city| state|updated_date| city| state|zipcode|
+----------+-----+------+------------+-----+------+-------+
|       111|city1|state1|  1990-12-01|city1|state1| 111111|
|       222|city2|state2|  1991-12-01|city2|state2| 222222|
+----------+-----+------+------------+-----+------+-------+

使用 spark 执行内部连接 ​​select

scala> df1.select("company_id","city","updated_date").join(df2.select("company_id","state","zipcode"),Seq("company_id"),"inner").show
+----------+-----+------------+------+-------+
|company_id| city|updated_date| state|zipcode|
+----------+-----+------------+------+-------+
|       111|city1|  1990-12-01|state1| 111111|
|       222|city2|  1991-12-01|state2| 222222|
+----------+-----+------------+------+-------+

Spark-Sql

将两个 Dataframe 注册为临时 table 然后使用 spark sql

执行连接
scala> df1.registerTempTable("temp1")

scala> df2.registerTempTable("temp2")

scala> spark.sql("select a.company_id,a.city,a.updated_date,b.state,b.zipcode from temp1 a inner join temp2 as b on a.company_id = b.company_id  ").show

+----------+-----+------------+------+-------+
|company_id| city|updated_date| state|zipcode|
+----------+-----+------------+------+-------+
|       111|city1|  1990-12-01|state1| 111111|
|       222|city2|  1991-12-01|state2| 222222|
+----------+-----+------------+------+-------+

如果您对此有任何疑问,请告诉我。快乐的 HAdoop