在猪中分组时如何处理倾斜数据
how to handle skewed data while grouping in pig
我正在执行一组操作,其中一个 reduce 任务 运行 非常长。下面是示例代码片段和问题描述,
inp =load 'input' using PigStorage('|') AS(f1,f2,f3,f4,f5);
grp_inp = GROUP inp BY (f1,f2) parallel 300;
由于数据存在偏差,即一个键的值太多,一个 reducer 运行 4 小时。其余所有 reduce 任务在 1 分钟左右完成。
我能做些什么来解决这个问题,有什么替代方法吗?任何帮助将不胜感激。谢谢!
您可能需要检查几件事:-
1>过滤掉f1和f2均为NULL的记录(如果有的话)
2> 如果可能,尝试通过实现代数接口来使用 hadoop 组合器 :-
https://www.safaribooksonline.com/library/view/programming-pig/9781449317881/ch10s02.html
3> 使用自定义分区器使用另一个键在 reducer 中分发数据。
这是我用来在加入后对偏斜数据进行分区的示例代码(同样可以在分组后使用):-
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.io.NullableTuple;
import org.apache.pig.impl.io.PigNullableWritable;
public class KeyPartitioner extends Partitioner<PigNullableWritable, Writable> {
/**
* Here key contains value of current key used for partitioning and Writable
* value conatins all fields from your tuple. I used my 5th field from tuple to do partitioning as I knew it has evenly distributed value.
**/
@Override
public int getPartition(PigNullableWritable key, Writable value, int numPartitions) {
Tuple valueTuple = (Tuple) ((NullableTuple) value).getValueAsPigType();
try {
if (valueTuple.size() > 5) {
Object hashObj = valueTuple.get(5);
Integer keyHash = Integer.parseInt(hashObj.toString());
int partitionNo = Math.abs(keyHash) % numPartitions;
return partitionNo;
} else {
if (valueTuple.size() > 0) {
return (Math.abs(valueTuple.get(1).hashCode())) % numPartitions;
}
}
} catch (NumberFormatException | ExecException ex) {
Logger.getLogger(KeyPartitioner.class.getName()).log(Level.SEVERE, null, ex);
}
return (Math.abs(key.hashCode())) % numPartitions;
}
}
我正在执行一组操作,其中一个 reduce 任务 运行 非常长。下面是示例代码片段和问题描述,
inp =load 'input' using PigStorage('|') AS(f1,f2,f3,f4,f5);
grp_inp = GROUP inp BY (f1,f2) parallel 300;
由于数据存在偏差,即一个键的值太多,一个 reducer 运行 4 小时。其余所有 reduce 任务在 1 分钟左右完成。
我能做些什么来解决这个问题,有什么替代方法吗?任何帮助将不胜感激。谢谢!
您可能需要检查几件事:-
1>过滤掉f1和f2均为NULL的记录(如果有的话)
2> 如果可能,尝试通过实现代数接口来使用 hadoop 组合器 :-
https://www.safaribooksonline.com/library/view/programming-pig/9781449317881/ch10s02.html
3> 使用自定义分区器使用另一个键在 reducer 中分发数据。
这是我用来在加入后对偏斜数据进行分区的示例代码(同样可以在分组后使用):-
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.io.NullableTuple;
import org.apache.pig.impl.io.PigNullableWritable;
public class KeyPartitioner extends Partitioner<PigNullableWritable, Writable> {
/**
* Here key contains value of current key used for partitioning and Writable
* value conatins all fields from your tuple. I used my 5th field from tuple to do partitioning as I knew it has evenly distributed value.
**/
@Override
public int getPartition(PigNullableWritable key, Writable value, int numPartitions) {
Tuple valueTuple = (Tuple) ((NullableTuple) value).getValueAsPigType();
try {
if (valueTuple.size() > 5) {
Object hashObj = valueTuple.get(5);
Integer keyHash = Integer.parseInt(hashObj.toString());
int partitionNo = Math.abs(keyHash) % numPartitions;
return partitionNo;
} else {
if (valueTuple.size() > 0) {
return (Math.abs(valueTuple.get(1).hashCode())) % numPartitions;
}
}
} catch (NumberFormatException | ExecException ex) {
Logger.getLogger(KeyPartitioner.class.getName()).log(Level.SEVERE, null, ex);
}
return (Math.abs(key.hashCode())) % numPartitions;
}
}