使用 MapReduce 并行查询 HBase 的行键列表

Parallel querying HBase for List of row keys using MapReduce

我想在 HBase 中执行查询操作以使用提供的行键列表获取记录。由于 MapReduce 中的 Mappers 是并行工作的,所以我想使用它。

行键的输入列表将在 ~100000 的范围内,我已经为映射器创建了一个 customInputFormat,它将向每个映射器提供 1000 个行键的列表以用于查询 HBase table .这些查询的记录可能存在也可能不存在于 HBase table,我只想 return 那些存在的记录。

我看过各种examples,我发现执行hbase table scan操作来获取行键的范围,范围由startingRowKey指定和 endingRowKey,但我只想查询提供的行键列表。

如何使用 MapReduce 执行此操作? 欢迎任何帮助!

当您将行键列表传递给映射器时,您应该向 HBase 发出 get 请求。所请求密钥的每个 get returns 数据,如果密钥不存在,则没有任何数据。

首先,您应该在映射器的 setup() 方法中创建 Table 实例:

private Table table;

@Override
protected void setup(Context context) throws IOException, InterruptedException {
    Configuration hbaseConfig = HBaseConfiguration.create();
    Connection conn = ConnectionFactory.createConnection(hbaseConfig);
    this.table = conn.getTable(TableName.valueOf("hbaseTable"));
}

然后你可以在 Get and Result 个实例的帮助下从 map() 方法对每个键向 HBase table 发出 get 请求:

String key = "keyString";
Get getValue = new Get(key.getBytes());

//add column family and column qualifier if you desire
getValue.addColumn("columnFamily".getBytes(), "columnQual".getBytes());

try {
    Result result = table.get(getValue);
    if (!table.exists(getValue)) {

        //requested key doesn't exist
        return;
    }

    // do what you want with result instance 
}

完成映射器的工作后,您需要在 cleanup() 方法中关闭与 table 的连接;

@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
    table.close();
}

此外,您可以自由地将 get 请求的结果传递给 reducer 或使用 cleanup() 方法将它们组合起来。这仅取决于您的目的。

你可以在你的映射器中使用这种方法,对我来说效果很好 它将 return 结果数组。

/**
     * Method getDetailRecords.
     * 
     * @param listOfRowKeys List<String>
     * @return Result[]
     * @throws IOException
     */
    private Result[] getDetailRecords(final List<String> listOfRowKeys) throws IOException {
        final HTableInterface table = HBaseConnection.getHTable(TBL_DETAIL);
        final List<Get> listOFGets = new ArrayList<Get>();
        Result[] results = null;
        try {
            for (final String rowkey : listOfRowKeys) {// prepare batch of get with row keys
   // System.err.println("get 'yourtablename', '" + saltIndexPrefix + rowkey + "'");
                final Get get = new Get(Bytes.toBytes(saltedRowKey(rowkey)));
                get.addColumn(COLUMN_FAMILY, Bytes.toBytes(yourcolumnname));
                listOFGets.add(get);
            }
            results = table.get(listOFGets);

        } finally {
            table.close();
        }
        return results;
    }