mapreduce 中的默认排序是否使用 WritableComparable class 或 comapreTo() 方法中定义的 Comparator?

Does default sorting in mapreduce uses Comparator defined in WritableComparable class or the comapreTo() method?

在输出从 mapper 传递到 reducer 之前,mapreduce 中如何进行排序。如果我的映射器输出键是 IntWritable 类型,它是否使用 IntWritable class 中定义的比较器或 class 中的 compareTo 方法,如果是,如何进行调用。如果不是怎么排序,怎么调用?

在 Mapper 框架负责为我们比较所有默认数据类型(如 IntWritable、DoubleWritable e.t.c 之后 e.t.c ... 但是如果您有用户定义的键类型,则需要实现 WritableComparable 接口。

WritableComparables 可以相互比较,通常是通过比较器。任何要在 Hadoop Map-Reduce 框架中用作键的类型都应该实现此接口。

请注意,hashCode() 在 Hadoop 中经常用于分区键。重要的是,您的 hashCode() returns 实现在 JVM 的不同实例中产生相同的结果。另请注意,Object 中的默认 hashCode() 实现不满足此 属性.

示例:

public class MyWritableComparable implements WritableComparable {
   // Some data
   private int counter;
   private long timestamp;

   public void write(DataOutput out) throws IOException {
     out.writeInt(counter);
     out.writeLong(timestamp);
   }

   public void readFields(DataInput in) throws IOException {
     counter = in.readInt();
     timestamp = in.readLong();
   }

   public int compareTo(MyWritableComparable o) {
     int thisValue = this.value;
     int thatValue = o.value;
     return (thisValue < thatValue ? -1 : (thisValue==thatValue ? 0 : 1));
   }

   public int hashCode() {
     final int prime = 31;
     int result = 1;
     result = prime * result + counter;
     result = prime * result + (int) (timestamp ^ (timestamp >>> 32));
     return result
   }
 }

发件人:https://hadoop.apache.org/docs/current/api/org/apache/hadoop/io/WritableComparable.html

Map 作业输出首先被收集,然后发送到 Partitioner,负责确定将数据发送到哪个 Reducer(虽然它还没有被 reduce() 调用分组)。默认的 Partitioner 使用 Key 的 hashCode() 方法和 Reducers 数量的模数来做到这一点。

之后,将调用Comparator 对Map 输出进行排序。流程看起来像这样:

收集器 --> 分区器 --> 溢出 --> 比较器 --> 本地磁盘 (HDFS) <-- MapOutputServlet

然后每个 Reducer 将从分区程序分配给它的映射器复制数据,并将其传递给 Grouper,Grouper 将确定如何为单个 Reducer 函数调用对记录进行分组:

MapOutputServlet --> 复制到本地磁盘 (HDFS) --> 组 --> Reduce

在函数调用之前,记录还将经过排序阶段以确定它们到达减速器的顺序。 Sorter(WritableComparator())会调用Key的compareTo()WritableComparable()接口)方法。

为了给您一个更好的主意,下面是您将如何为自定义组合键实现基本 compareTo()、分组器和排序器:

public class CompositeKey implements WritableComparable<CompositeKey> {
    IntWritable primaryField = new IntWritable();
    IntWritable secondaryField = new IntWritable();

    public CompositeKey(IntWritable p, IntWritable s) {
        this.primaryField.set(p);
        this.secondaryField = s;
    }

    public void write(DataOutput out) throws IOException {
        this.primaryField.write(out);
        this.secondaryField.write(out);
    }

    public void readFields(DataInput in) throws IOException {
        this.primaryField.readFields(in);
        this.secondaryField.readFields(in);
    }

    // Called by the partitionner to group map outputs to same reducer instance
    // If the hash source is simple (primary type or so), a simple call to their hashCode() method is good enough
    public int hashCode() {
        return this.primaryField.hashCode();
    }

    @Override
    public int compareTo(CompositeKey other) {
        if (this.getPrimaryField().equals(other.getPrimaryField())) {
            return this.getSecondaryField().compareTo(other.getSecondaryField());
        } else {
            return this.getPrimaryField().compareTo(other.getPrimaryField());
        }
    }
}


public class CompositeGroupingComparator extends WritableComparator {
    public CompositeGroupingComparator() {
        super(CompositeKey.class, true);
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        CompositeKey first = (CompositeKey) a;
        CompositeKey second = (CompositeKey) b;

        return first.getPrimaryField().compareTo(second.getPrimaryField());
    }
}

public class CompositeSortingComparator extends WritableComparator {
    public CompositeSortingComparator() {
        super (CompositeKey.class, true);
    }

    @Override
    public int compare (WritableComparable a, WritableComparable b){
        CompositeKey first = (CompositeKey) a;
        CompositeKey second = (CompositeKey) b;

        return first.compareTo(second);
    }
}