在 Spark 1.6 中加入数据帧时没有发生广播

Broadcast not happening while joining dataframes in Spark 1.6

下面是我运行的示例代码。当此 spark 作业运行时,Dataframe 连接正在使用 sortmergejoin 而不是 broadcastjoin 进行。

def joinedDf (sqlContext: SQLContext,
              txnTable:   DataFrame,
              countriesDfBroadcast: Broadcast[DataFrame]): 
              DataFrame = {
                    txnTable.as("df1").join((countriesDfBroadcast.value).withColumnRenamed("CNTRY_ID", "DW_CNTRY_ID").as("countries"),
                    $"df1.USER_CNTRY_ID" === $"countries.DW_CNTRY_ID", "inner")
              }
joinedDf(sqlContext, txnTable, countriesDfBroadcast).write.parquet("temp")  

即使我在连接语句中指定了 broadcast() 提示,broadcastjoin 也没有发生。

优化器正在对数据帧进行哈希分区,这导致了数据倾斜。

有人见过这种行为吗?

我 运行 在 yarn 上使用 Spark 1.6 和 HiveContext 作为 SQLContext。 spark 作业在 200 个执行程序上运行。 txnTable 的数据大小为 240GB,countriesDf 的数据大小为 5mb。

您广播 DataFrame 的方式和访问方式都不正确。

  • 标准广播不能用于处理分布式数据结构。如果你想在 DataFrame 上执行广播加入,你应该使用 broadcast 函数,它标记给定 DataFrame 用于广播:

    import org.apache.spark.sql.functions.broadcast
    
    val countriesDf: DataFrame = ???
    val tmp: DataFrame = broadcast(
      countriesDf.withColumnRenamed("CNTRY_ID", "DW_CNTRY_ID").as("countries")
    ) 
    
    txnTable.as("df1").join(
      broadcast(tmp), $"df1.USER_CNTRY_ID" === $"countries.DW_CNTRY_ID", "inner")
    

    在内部它将 collect tmp 不从内部转换然后广播。

  • 加入参数被急切地评估。甚至可以将 SparkContext.broadcast 与分布式数据结构一起使用,广播值在调用 join 之前在本地进行评估。这就是为什么您的函数可以正常工作但不执行广播连接的原因。