使用 MapReduce 在 HBase 中插入多行

Multiple rows insertion in HBase using MapReduce

我想从每个映射器批量插入 N 行到 HBase table。我目前知道有两种方法可以做到这一点:

  1. 创建 Put objects and use put(List<Put> puts) method of HTable 个实例的列表,并确保禁用 autoFlush 参数。
  2. 使用TableOutputFormatclass并使用context.write(rowKey, put)方法。

哪个更好?

第一种方式不需要context.write(),因为hTable.put(putsList)方法直接把数据放在table中。我的映射器 class 正在扩展 Class Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>,那么我应该为 KEYOUTVALUEOUT 使用什么 classes?

在第二种方式中,我必须调用 context.write(rowKey, put) N 次。有什么方法可以将 context.write() 用于 Put 操作列表?

有没有其他方法可以用 MapReduce 做到这一点?

提前致谢。

I prefer second option where batching is natural(no need for list of puts) for mapreduce.... to have deep insight please see my second point

1) 您的第一个选项 List<Put> 通常用于独立 Hbase Java 客户端。在内部,它由 hbase.client.write.buffer 控制,如下面的一个配置 xmls

<property>
         <name>hbase.client.write.buffer</name>
         <value>20971520</value> // around 2 mb i guess
 </property>

默认值为 2mb 大小。一旦你的缓冲区被填满,它就会刷新所有实际插入到你的 table 中的 puts。这与 #2

中解释的 BufferedMutator 的方式相同

2) 关于第二个选项,如果你看到 TableOutputFormat 文档

org.apache.hadoop.hbase.mapreduce
Class TableOutputFormat<KEY>

java.lang.Object
org.apache.hadoop.mapreduce.OutputFormat<KEY,Mutation>
org.apache.hadoop.hbase.mapreduce.TableOutputFormat<KEY>
All Implemented Interfaces:
org.apache.hadoop.conf.Configurable

@InterfaceAudience.Public
@InterfaceStability.Stable
public class TableOutputFormat<KEY>
extends org.apache.hadoop.mapreduce.OutputFormat<KEY,Mutation>
implements org.apache.hadoop.conf.Configurable
Convert Map/Reduce output and write it to an HBase table. The KEY is ignored

while the output value must be either a Put or a Delete instance.

-- 通过 code 查看此内容的其他方式如下所示。

/**
     * Writes a key/value pair into the table.
     *
     * @param key  The key.
     * @param value  The value.
     * @throws IOException When writing fails.
     * @see RecordWriter#write(Object, Object)
     */
    @Override
    public void write(KEY key, Mutation value)
    throws IOException {
      if (!(value instanceof Put) && !(value instanceof Delete)) {
        throw new IOException("Pass a Delete or a Put");
      }
      mutator.mutate(value);
    }
  }

conclusion : context.write(rowkey,putlist) It is not possible with API.

然而,BufferedMutator(来自上面代码中的 mutator.mutate)表示

Map/reduce jobs benefit from batching, but have no natural flush point. {@code BufferedMutator} receives the puts from the M/R job and will batch puts based on some heuristic, such as the accumulated size of the puts, and submit batches of puts asynchronously so that the M/R logic can continue without interruption.

so your batching is natural(with BufferedMutator) as aforementioned