将 groupByKey 转换为 reduceByKey
Convert a groupByKey to reduceByKey
我读到 reducebyKey
是大型数据集的更好选择,可以减少数据混洗,从而提高性能。
我正在尝试转换我对 groupByKey
的使用。首先必须将其转换为 rdd 为:
val linksNew = links.map(convertToRelationship)
.flatMap(bidirRelationship)
links 是一个数据集,而数据集 api 没有 reduceByKey
。使用 reduceByKey
时 .groupByKey(_._1)
的等价物是什么?
val linksfinal = linksNew.rdd.reduceByKey(???)
实际代码:
val biLinks = links
.map(convertToRelationship)
.flatMap(bidirRelationship)
.groupByKey(_._1)
.reduceGroups((left, right) => combineBidirerRelationships(left,right))
.map(_._2._2)
数据集的架构,就在使用 groupByKey(_._1)
:
之前
数据集中的一些实际数据:
不确定它是否更有效,但是,应该可以转换为 reduceByKey
,因为您在 groupByKey
之后直接执行 reduceGroups
。使用部分提供的代码的简短示例:
val biLinks = links
.map(convertToRelationship)
.flatMap(bidirRelationship)
.rdd
.map{row => (row.getAs[String](0), row.getAs[Relationship](1))} // See explanation below
.reduceByKey((left, right) => combineBidirerRelationships(left, right))
.map(_._2._2)
根据使用 .rdd
后数据框的外观,可能需要进行额外的转换。从数据帧转换时,生成的 rdd 将是 RDD[Row]
。但是,要使 reduceByKey()
工作,需要类型为 RDD[(A,B)]
的元组 rdd,其中 A
和 B
是类型(它们本身也可以是元组)。
rdd.map(...)
转换如何与 structs
配合使用的简短示例:
case class Relationship(a: Long, b: Long)
val df = spark.createDataFrame(Seq((1, Relationship(3L, 2L)), (2, Relationship(20L, 7L)))).toDF()
val rdd = df.rdd.map{ row => (row.getAs[String](0), row.getAs[Relationship](1))}
这里给出了需要的元组rdd类型,这里RDD[(String, Relationship)]
.
I read that reducebyKey is a better option on large datasets to reduce shuffle and or shuffles on reduce side, and enhance performance.
不是。您混淆了 "old" RDD API 其中 groupByKey
具有不同的语义。
In Dataset
API groupByKey
+ reduceGroups
使用与旧 API 中的 reduceByKey
类似的执行模型。事实上,转换为 RDD 使用效率较低的洗牌机制并且非常昂贵,所以你只会让事情变得更糟。
我读到 reducebyKey
是大型数据集的更好选择,可以减少数据混洗,从而提高性能。
我正在尝试转换我对 groupByKey
的使用。首先必须将其转换为 rdd 为:
val linksNew = links.map(convertToRelationship)
.flatMap(bidirRelationship)
links 是一个数据集,而数据集 api 没有 reduceByKey
。使用 reduceByKey
时 .groupByKey(_._1)
的等价物是什么?
val linksfinal = linksNew.rdd.reduceByKey(???)
实际代码:
val biLinks = links
.map(convertToRelationship)
.flatMap(bidirRelationship)
.groupByKey(_._1)
.reduceGroups((left, right) => combineBidirerRelationships(left,right))
.map(_._2._2)
数据集的架构,就在使用 groupByKey(_._1)
:
数据集中的一些实际数据:
不确定它是否更有效,但是,应该可以转换为 reduceByKey
,因为您在 groupByKey
之后直接执行 reduceGroups
。使用部分提供的代码的简短示例:
val biLinks = links
.map(convertToRelationship)
.flatMap(bidirRelationship)
.rdd
.map{row => (row.getAs[String](0), row.getAs[Relationship](1))} // See explanation below
.reduceByKey((left, right) => combineBidirerRelationships(left, right))
.map(_._2._2)
根据使用 .rdd
后数据框的外观,可能需要进行额外的转换。从数据帧转换时,生成的 rdd 将是 RDD[Row]
。但是,要使 reduceByKey()
工作,需要类型为 RDD[(A,B)]
的元组 rdd,其中 A
和 B
是类型(它们本身也可以是元组)。
rdd.map(...)
转换如何与 structs
配合使用的简短示例:
case class Relationship(a: Long, b: Long)
val df = spark.createDataFrame(Seq((1, Relationship(3L, 2L)), (2, Relationship(20L, 7L)))).toDF()
val rdd = df.rdd.map{ row => (row.getAs[String](0), row.getAs[Relationship](1))}
这里给出了需要的元组rdd类型,这里RDD[(String, Relationship)]
.
I read that reducebyKey is a better option on large datasets to reduce shuffle and or shuffles on reduce side, and enhance performance.
不是。您混淆了 "old" RDD API 其中 groupByKey
具有不同的语义。
In Dataset
API groupByKey
+ reduceGroups
使用与旧 API 中的 reduceByKey
类似的执行模型。事实上,转换为 RDD 使用效率较低的洗牌机制并且非常昂贵,所以你只会让事情变得更糟。