flatmap 内的 Spark map 以复制笛卡尔连接
Spark map inside flatmap to replicate cartesian join
我正在尝试模糊连接两个数据集,一个是报价,一个是销售。为了论证,加入属性是名字、姓氏、出生日期和电子邮件。
我有超过 2600 万的报价和超过 100 万的销售额。客户可能没有使用一个或多个属性的准确信息,所以我给他们每个匹配项 (1,1,1,1) 的分数,其中所有匹配项 (0,0,0,0) 其中 none匹配。
所以我最终得到了类似于
的结果
q1, s1, (0,0,1,0)
q1, s2, (0,1,0,1)
q1, s3, (1,1,1,1)
q2, s1, (1,0,0,1)
...
q26000000 s1 (0,1,0,0)
所以我认为这等同于我正在管理我为引号制作大量分区的笛卡尔积
val quotesRaw = sc.textfile(....)
val quotes = quotesRaw.repartition(quotesRaw.count().toInt() / 100000)
val sales = sc.textfile(...)
val sb = sc.broadcast(sales.collect())
quotes.mapPartitions(p=> (
p.flatMap(q => (
sb.value.map(s =>
q._1, s._1, ( if q._2 == s._2 1 else 0, etc)
)
)
如果我保持较低的数字,这一切都有效,比如 2600 万个报价,但只有 1000 次销售,但如果我 运行 它会在 运行ning[=13= 时停止响应所有销售]
我运行使用以下配置对其进行设置。
spark-submit --conf spark.akka.frameSize=1024 --conf spark.executor.memory=3g --num-executors=30 --driver-memory 6g --class SalesMatch --deploy-mode client --master yarn SalesMatching-0.0.1-SNAPSHOT.jar hdfs://cluster:8020/data_import/Sales/SourceSales/2014/09/01/SourceSales_20140901.txt hdfs://cluster:8020/data_import/CDS/Enquiry/2014/01/01/EnquiryBackFill_20140101.txt hdfs://cluster:8020/tmp/_salesdata_matches_new
这里有什么明显不正确的地方吗?
假设每个分区有 100k 个报价,总大小为 40MB 的销售额为 1100 万,您的代码每个分区生成大约 4TB 的数据,因此您的工作人员不太可能处理这个,而且绝对不能在内存中完成。
我假设您只对势均力敌的比赛感兴趣,因此尽早过滤是有意义的。稍微简化您的代码(据我所知,没有理由使用 mapPartitions
):
// Check if match is close enough, where T is type of (q._1, s._1, (...))
def isCloseMatch(match: T): Boolean = ???
quotes.flatMap(q => sb.value
.map(s => (q._1, s._1, (....))) // Map as before
.filter(isCloseMatch) // yield only close matches
)
一般说明:
- 从 RDD 创建广播是一个昂贵的过程。首先,您必须将所有数据传输给驱动程序,然后在工作人员之间分发。这意味着重复serialization/deserialization、网络流量和存储数据的成本
对于像这样相对简单的操作,使用高级 Spark 可能是个好主意 SQL API:
import org.apache.spark.sql.DataFrame
val salesDF: DataFrame = ???
val salesDF: DataFrame = ???
val featureCols: Seq[String] = ???
val threshold: Int = ???
val inds = featureCols // Boolean columns
.map(col => (quotesDF(col) === salesDF(col)).alias(s"${col}_ind"))
val isSimilar = inds // sum(q == s) > threshold
.map(c => c.cast("integer").alias(c.toString))
.reduce(_ + _)
.geq(threshold)
val combined = quotesDF
.join(salesDF, isSimilar, "left")
我正在尝试模糊连接两个数据集,一个是报价,一个是销售。为了论证,加入属性是名字、姓氏、出生日期和电子邮件。
我有超过 2600 万的报价和超过 100 万的销售额。客户可能没有使用一个或多个属性的准确信息,所以我给他们每个匹配项 (1,1,1,1) 的分数,其中所有匹配项 (0,0,0,0) 其中 none匹配。
所以我最终得到了类似于
的结果q1, s1, (0,0,1,0)
q1, s2, (0,1,0,1)
q1, s3, (1,1,1,1)
q2, s1, (1,0,0,1)
...
q26000000 s1 (0,1,0,0)
所以我认为这等同于我正在管理我为引号制作大量分区的笛卡尔积
val quotesRaw = sc.textfile(....)
val quotes = quotesRaw.repartition(quotesRaw.count().toInt() / 100000)
val sales = sc.textfile(...)
val sb = sc.broadcast(sales.collect())
quotes.mapPartitions(p=> (
p.flatMap(q => (
sb.value.map(s =>
q._1, s._1, ( if q._2 == s._2 1 else 0, etc)
)
)
如果我保持较低的数字,这一切都有效,比如 2600 万个报价,但只有 1000 次销售,但如果我 运行 它会在 运行ning[=13= 时停止响应所有销售]
我运行使用以下配置对其进行设置。
spark-submit --conf spark.akka.frameSize=1024 --conf spark.executor.memory=3g --num-executors=30 --driver-memory 6g --class SalesMatch --deploy-mode client --master yarn SalesMatching-0.0.1-SNAPSHOT.jar hdfs://cluster:8020/data_import/Sales/SourceSales/2014/09/01/SourceSales_20140901.txt hdfs://cluster:8020/data_import/CDS/Enquiry/2014/01/01/EnquiryBackFill_20140101.txt hdfs://cluster:8020/tmp/_salesdata_matches_new
这里有什么明显不正确的地方吗?
假设每个分区有 100k 个报价,总大小为 40MB 的销售额为 1100 万,您的代码每个分区生成大约 4TB 的数据,因此您的工作人员不太可能处理这个,而且绝对不能在内存中完成。
我假设您只对势均力敌的比赛感兴趣,因此尽早过滤是有意义的。稍微简化您的代码(据我所知,没有理由使用 mapPartitions
):
// Check if match is close enough, where T is type of (q._1, s._1, (...))
def isCloseMatch(match: T): Boolean = ???
quotes.flatMap(q => sb.value
.map(s => (q._1, s._1, (....))) // Map as before
.filter(isCloseMatch) // yield only close matches
)
一般说明:
- 从 RDD 创建广播是一个昂贵的过程。首先,您必须将所有数据传输给驱动程序,然后在工作人员之间分发。这意味着重复serialization/deserialization、网络流量和存储数据的成本
对于像这样相对简单的操作,使用高级 Spark 可能是个好主意 SQL API:
import org.apache.spark.sql.DataFrame val salesDF: DataFrame = ??? val salesDF: DataFrame = ??? val featureCols: Seq[String] = ??? val threshold: Int = ??? val inds = featureCols // Boolean columns .map(col => (quotesDF(col) === salesDF(col)).alias(s"${col}_ind")) val isSimilar = inds // sum(q == s) > threshold .map(c => c.cast("integer").alias(c.toString)) .reduce(_ + _) .geq(threshold) val combined = quotesDF .join(salesDF, isSimilar, "left")