慢火花应用程序 - java
Slow spark application - java
我正在尝试创建一个采用 lat、long、timestamp[= 数据集的 spark 应用程序24=] 点并增加单元格计数(如果它们在网格单元格内)。网格由以 lon、lat 和 time 作为 z 轴的 3d 单元格组成。
现在我已经完成了应用程序并且它做了它应该做的,但是扫描整个数据集需要几个小时(~9g)。我的集群由 3 个节点组成,每个节点有 4 个核心,8g 内存,我目前使用 6 个执行器,每个节点有 1 个核心和 2g。
我猜我可以对代码进行相当多的优化,但是我的代码中是否存在导致此延迟的大错误?
//Create a JavaPairRDD with tuple elements. For each String line of lines we split the string
//and assign latitude, longitude and timestamp of each line to sdx,sdy and sdt. Then we check if the data point of
//that line is contained in a cell of the centroids list. If it is then a new tuple is returned
//with key the latitude, Longitude and timestamp (split by ",") of that cell and value 1.
JavaPairRDD<String, Integer> pairs = lines.mapToPair(x -> {
String sdx = x.split(" ")[2];
String sdy = x.split(" ")[3];
String sdt = x.split(" ")[0];
double dx = Double.parseDouble(sdx);
double dy = Double.parseDouble(sdy);
int dt = Integer.parseInt(sdt);
List<Integer> t = brTime.getValue();
List<Point2D.Double> p = brCoo.getValue();
double dist = brDist.getValue();
int dur = brDuration.getValue();
for(int timeCounter=0; timeCounter<t.size(); timeCounter++) {
for ( int cooCounter=0; cooCounter < p.size(); cooCounter++) {
double cx = p.get(cooCounter).getX();
double cy = p.get(cooCounter).getY();
int ct = t.get(timeCounter);
String scx = Double.toString(cx);
String scy = Double.toString(cy);
String sct = Integer.toString(ct);
if (dx > (cx-dist) && dx <= (cx+dist)) {
if (dy > (cy-dist) && dy <= (cy+dist)) {
if (dt > (ct-dur) && dt <= (ct+dur)) {
return new Tuple2<String, Integer>(scx+","+scy+","+sct,1);
}
}
}
}
}
return new Tuple2<String, Integer>("Out Of Bounds",1);
});
可能导致 运行 像这样的 Spark 映射成本的最大因素之一与 RDD 上下文之外的数据访问有关,这意味着驱动程序交互。在您的情况下,至少有 4 个变量访问器会发生这种情况:brTime
、brCoo
、brDist
和 brDuration
。看起来您正在通过 String#split
而不是利用内置函数进行一些行解析。最后,scx
、scy
和 sct
都是为每个循环计算的,尽管只有当它们的数字对应物通过一系列检查时才会返回它们,这意味着浪费了 CPU 周期和额外的 GC。
在没有实际审查工作计划的情况下,很难说上述是否会使绩效达到可接受的水平。检查您的历史服务器应用程序日志,看看是否有任何阶段占用了您的时间 - 一旦您确定了其中的罪魁祸首,这就是真正需要优化的地方。
我尝试了 mappartitionstopair 并且还移动了 scx、scy 和 sct 的计算,以便仅当点通过条件时才计算它们。应用程序的速度仅 17 分钟就得到了显着提升!我相信 mappartitionsopair 是最大的因素。非常感谢 Mks 和 bsplosion!
尝试使用 mapPartitions 它会更快,请参阅此示例 link;另一件事是将这部分代码放在循环 timeCounter 之外
我正在尝试创建一个采用 lat、long、timestamp[= 数据集的 spark 应用程序24=] 点并增加单元格计数(如果它们在网格单元格内)。网格由以 lon、lat 和 time 作为 z 轴的 3d 单元格组成。
现在我已经完成了应用程序并且它做了它应该做的,但是扫描整个数据集需要几个小时(~9g)。我的集群由 3 个节点组成,每个节点有 4 个核心,8g 内存,我目前使用 6 个执行器,每个节点有 1 个核心和 2g。
我猜我可以对代码进行相当多的优化,但是我的代码中是否存在导致此延迟的大错误?
//Create a JavaPairRDD with tuple elements. For each String line of lines we split the string
//and assign latitude, longitude and timestamp of each line to sdx,sdy and sdt. Then we check if the data point of
//that line is contained in a cell of the centroids list. If it is then a new tuple is returned
//with key the latitude, Longitude and timestamp (split by ",") of that cell and value 1.
JavaPairRDD<String, Integer> pairs = lines.mapToPair(x -> {
String sdx = x.split(" ")[2];
String sdy = x.split(" ")[3];
String sdt = x.split(" ")[0];
double dx = Double.parseDouble(sdx);
double dy = Double.parseDouble(sdy);
int dt = Integer.parseInt(sdt);
List<Integer> t = brTime.getValue();
List<Point2D.Double> p = brCoo.getValue();
double dist = brDist.getValue();
int dur = brDuration.getValue();
for(int timeCounter=0; timeCounter<t.size(); timeCounter++) {
for ( int cooCounter=0; cooCounter < p.size(); cooCounter++) {
double cx = p.get(cooCounter).getX();
double cy = p.get(cooCounter).getY();
int ct = t.get(timeCounter);
String scx = Double.toString(cx);
String scy = Double.toString(cy);
String sct = Integer.toString(ct);
if (dx > (cx-dist) && dx <= (cx+dist)) {
if (dy > (cy-dist) && dy <= (cy+dist)) {
if (dt > (ct-dur) && dt <= (ct+dur)) {
return new Tuple2<String, Integer>(scx+","+scy+","+sct,1);
}
}
}
}
}
return new Tuple2<String, Integer>("Out Of Bounds",1);
});
可能导致 运行 像这样的 Spark 映射成本的最大因素之一与 RDD 上下文之外的数据访问有关,这意味着驱动程序交互。在您的情况下,至少有 4 个变量访问器会发生这种情况:brTime
、brCoo
、brDist
和 brDuration
。看起来您正在通过 String#split
而不是利用内置函数进行一些行解析。最后,scx
、scy
和 sct
都是为每个循环计算的,尽管只有当它们的数字对应物通过一系列检查时才会返回它们,这意味着浪费了 CPU 周期和额外的 GC。
在没有实际审查工作计划的情况下,很难说上述是否会使绩效达到可接受的水平。检查您的历史服务器应用程序日志,看看是否有任何阶段占用了您的时间 - 一旦您确定了其中的罪魁祸首,这就是真正需要优化的地方。
我尝试了 mappartitionstopair 并且还移动了 scx、scy 和 sct 的计算,以便仅当点通过条件时才计算它们。应用程序的速度仅 17 分钟就得到了显着提升!我相信 mappartitionsopair 是最大的因素。非常感谢 Mks 和 bsplosion!
尝试使用 mapPartitions 它会更快,请参阅此示例 link;另一件事是将这部分代码放在循环 timeCounter 之外