当我们有几个非连接列名称相同时,如何在连接后获取内部连接的各个数据集?
How to get individual datasets of inner join after join when we have few non-join column names same?
您好,有如下数据
+----+------------+------+-------------------+
|company_id|city |state | updated_date|
+----+------------+------+-------------------+
| 111| city1| state1 | 1990-12-01|
| 222| city2| state2 | 1991-12-01|
+----+------------+------+-----------------+
+----+------------+------+-------------------+
|companyId|city |state | zipcode|
+----+------------+------+-------------------+
| 111| city1| state1 | 111111 |
| 222| city2| state2 | 222222 |
+----+------------+------+-----------------+
我正在对 companyId 进行连接,如下所示
Dataset<Row> joinDs = firstDs.join(secondDs, firstDs.col("company_id").equalTo(secondDs.col("companyId")), "inner");
joinDs 有歧义列 "city" & "state"
问题
如何处理歧义列?有什么方法可以区分歧义列和 joinDs 吗?
如何 select 来自 joinDs 的特定列,其中很少来自 "firstDs" 而很少来自 "secondDs"?
将静态 dF 加入蒸汽 dF/s?
When join the static dataframe "secondDs" (from hdfs) with streaming
dataframe "firstDs" ( from kafka) as below
Dataset<Row> joinUpdatedRecordsDs = firstDs.join(secondDs, firstDs.col("companyId").equalTo(secondDs.col("company_id"), "inner"));
没有给出任何结果
在火花批处理中工作正常或给出结果的地方相同。
我在这里做错了什么?如何解决这个问题?
错误:
Caused by: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 385, Column 34: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 385, Column 34: A method named "toString" is not declared in any enclosing class nor any supertype, nor through a static import
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:1304)
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon.load(CodeGenerator.scala:1376)
There are two ways to implement this. one is data frame and another one is using Spark-Sql.
数据框:
var df1 = spark.createDataFrame(Seq((111,"city1","state1","1990-12-01"),(222,"city2","state2","1991-12-01"))).toDF("company_id","city","state","updated_date")
df1.show
+----------+-----+------+------------+
|company_id| city| state|updated_date|
+----------+-----+------+------------+
| 111|city1|state1| 1990-12-01|
| 222|city2|state2| 1991-12-01|
+----------+-----+------+------------+
var df2 = spark.createDataFrame(Seq((111,"city1","state1",111111),(222,"city2","state2",222222))).toDF("company_id","city","state","zipcode")
scala> df1.join(df2,Seq("company_id"),"inner").show
+----------+-----+------+------------+-----+------+-------+
|company_id| city| state|updated_date| city| state|zipcode|
+----------+-----+------+------------+-----+------+-------+
| 111|city1|state1| 1990-12-01|city1|state1| 111111|
| 222|city2|state2| 1991-12-01|city2|state2| 222222|
+----------+-----+------+------------+-----+------+-------+
使用 spark 执行内部连接 select
scala> df1.select("company_id","city","updated_date").join(df2.select("company_id","state","zipcode"),Seq("company_id"),"inner").show
+----------+-----+------------+------+-------+
|company_id| city|updated_date| state|zipcode|
+----------+-----+------------+------+-------+
| 111|city1| 1990-12-01|state1| 111111|
| 222|city2| 1991-12-01|state2| 222222|
+----------+-----+------------+------+-------+
Spark-Sql
将两个 Dataframe 注册为临时 table 然后使用 spark sql
执行连接
scala> df1.registerTempTable("temp1")
scala> df2.registerTempTable("temp2")
scala> spark.sql("select a.company_id,a.city,a.updated_date,b.state,b.zipcode from temp1 a inner join temp2 as b on a.company_id = b.company_id ").show
+----------+-----+------------+------+-------+
|company_id| city|updated_date| state|zipcode|
+----------+-----+------------+------+-------+
| 111|city1| 1990-12-01|state1| 111111|
| 222|city2| 1991-12-01|state2| 222222|
+----------+-----+------------+------+-------+
如果您对此有任何疑问,请告诉我。快乐的 HAdoop
您好,有如下数据
+----+------------+------+-------------------+
|company_id|city |state | updated_date|
+----+------------+------+-------------------+
| 111| city1| state1 | 1990-12-01|
| 222| city2| state2 | 1991-12-01|
+----+------------+------+-----------------+
+----+------------+------+-------------------+
|companyId|city |state | zipcode|
+----+------------+------+-------------------+
| 111| city1| state1 | 111111 |
| 222| city2| state2 | 222222 |
+----+------------+------+-----------------+
我正在对 companyId 进行连接,如下所示
Dataset<Row> joinDs = firstDs.join(secondDs, firstDs.col("company_id").equalTo(secondDs.col("companyId")), "inner");
joinDs 有歧义列 "city" & "state"
问题
如何处理歧义列?有什么方法可以区分歧义列和 joinDs 吗?
如何 select 来自 joinDs 的特定列,其中很少来自 "firstDs" 而很少来自 "secondDs"?
将静态 dF 加入蒸汽 dF/s?
When join the static dataframe "secondDs" (from hdfs) with streaming dataframe "firstDs" ( from kafka) as below
Dataset<Row> joinUpdatedRecordsDs = firstDs.join(secondDs, firstDs.col("companyId").equalTo(secondDs.col("company_id"), "inner"));
没有给出任何结果 在火花批处理中工作正常或给出结果的地方相同。
我在这里做错了什么?如何解决这个问题?
错误:
Caused by: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 385, Column 34: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 385, Column 34: A method named "toString" is not declared in any enclosing class nor any supertype, nor through a static import
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:1304)
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon.load(CodeGenerator.scala:1376)
There are two ways to implement this. one is data frame and another one is using Spark-Sql.
数据框:
var df1 = spark.createDataFrame(Seq((111,"city1","state1","1990-12-01"),(222,"city2","state2","1991-12-01"))).toDF("company_id","city","state","updated_date")
df1.show
+----------+-----+------+------------+
|company_id| city| state|updated_date|
+----------+-----+------+------------+
| 111|city1|state1| 1990-12-01|
| 222|city2|state2| 1991-12-01|
+----------+-----+------+------------+
var df2 = spark.createDataFrame(Seq((111,"city1","state1",111111),(222,"city2","state2",222222))).toDF("company_id","city","state","zipcode")
scala> df1.join(df2,Seq("company_id"),"inner").show
+----------+-----+------+------------+-----+------+-------+
|company_id| city| state|updated_date| city| state|zipcode|
+----------+-----+------+------------+-----+------+-------+
| 111|city1|state1| 1990-12-01|city1|state1| 111111|
| 222|city2|state2| 1991-12-01|city2|state2| 222222|
+----------+-----+------+------------+-----+------+-------+
使用 spark 执行内部连接 select
scala> df1.select("company_id","city","updated_date").join(df2.select("company_id","state","zipcode"),Seq("company_id"),"inner").show
+----------+-----+------------+------+-------+
|company_id| city|updated_date| state|zipcode|
+----------+-----+------------+------+-------+
| 111|city1| 1990-12-01|state1| 111111|
| 222|city2| 1991-12-01|state2| 222222|
+----------+-----+------------+------+-------+
Spark-Sql
将两个 Dataframe 注册为临时 table 然后使用 spark sql
执行连接scala> df1.registerTempTable("temp1")
scala> df2.registerTempTable("temp2")
scala> spark.sql("select a.company_id,a.city,a.updated_date,b.state,b.zipcode from temp1 a inner join temp2 as b on a.company_id = b.company_id ").show
+----------+-----+------------+------+-------+
|company_id| city|updated_date| state|zipcode|
+----------+-----+------------+------+-------+
| 111|city1| 1990-12-01|state1| 111111|
| 222|city2| 1991-12-01|state2| 222222|
+----------+-----+------------+------+-------+
如果您对此有任何疑问,请告诉我。快乐的 HAdoop