Storm-hbase 螺栓故障转移 DRPC

Storm-hbase bolt failing over DRPC

我正在编写一个 Storm 拓扑以使用 DRPC 从 HBase 读取数据。本质上,这会执行扫描以获取数据、丰富数据并 returns 它。

我可以轻松获得一个基本的 DRPC 示例(基于 http://storm.apache.org/releases/current/Distributed-RPC.html)。但是,当我插入扫描代码时,这个过程需要很长时间。一分钟后,我收到以下错误:

backtype.storm.generated.DRPCExecutionException

在 backtype.storm.daemon.drpc$service_handler$reify__8688.failRequest(drpc.clj:136) ~[storm-core-0.10.0.2.4.2.0-258.jar:0.10.0.2.4.2.0-258]

在 backtype.storm.drpc.DRPCSpout.fail(DRPCSpout.java:241) ~[storm-core-0.10.0.2.4.2.0-258.jar:0.10.0.2.4.2.0-258

不久之后,我得到 org.apache.hadoop.hbase.client.RetriesExhaustedException。这并不总是发生,但很常见。我基于此的假设是两种可能性之一:

扫描超时。然而,通过 HBase Shell 或 REST return 执行扫描不到一秒 table不一致,导致某区域缺失。我有 运行 hbase hbck,它显示 0 个不一致。 我知道与 HBase 的连接很好:我添加了调试输出,bolt 得到了结果。但是,由于 DRPCExecutionException,这些结果永远不会 return 通过 DRPC 编辑。

我虽然问题是 DRPC 超时,但是我增加了 DRPC 超时很多,并且在相同的时间内得到了相同的结果。谷歌搜索后,我发现其他人也有同样的问题 ([Storm][DRPC] Request failed),但没有说明如何解决这个问题。

为了参考,我在下面添加了我的代码:

try (Table table = HbaseClient.connection().getTable(TableName.valueOf("EPG_URI"))) 
    {
        List<Filter> filters = new ArrayList<>();
        String startRowString = "start";
        String endRowString = "end";
        RowFilter startRow = new RowFilter(CompareFilter.CompareOp.GREATER_OR_EQUAL, new BinaryPrefixComparator(startRowString.getBytes()));
        filters.add(startRow);
        RowFilter endRow = new RowFilter(CompareFilter.CompareOp.LESS_OR_EQUAL, new BinaryPrefixComparator(endRowString.getBytes()));
        filters.add(endRow);
        FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL, filters);

        Scan scan = new Scan();
        scan.addFamily("f1".getBytes());
        scan.setFilter(filterList);

        ResultScanner scanner = table.getScanner(scan);
        for (Result result : scanner) 
        {
            hbaseValues.add(result);
        }
    }
}

在此先感谢您的帮助。

好的,所以我不知道为什么会出现最初的问题,但我确实通过大大简化我的扫描仪来修复它:

Scan scan = new Scan(startRowString.getBytes(), endRowString.getBytes());
scan.addFamily("f1".getBytes());
ResultScanner scanner = table.getScanner(scan);            
for (Result r : scanner)
{...}

基于此,问题似乎出在HBase端而不是Storm。