Storm-hbase 螺栓故障转移 DRPC
Storm-hbase bolt failing over DRPC
我正在编写一个 Storm 拓扑以使用 DRPC 从 HBase 读取数据。本质上,这会执行扫描以获取数据、丰富数据并 returns 它。
我可以轻松获得一个基本的 DRPC 示例(基于 http://storm.apache.org/releases/current/Distributed-RPC.html)。但是,当我插入扫描代码时,这个过程需要很长时间。一分钟后,我收到以下错误:
backtype.storm.generated.DRPCExecutionException
在 backtype.storm.daemon.drpc$service_handler$reify__8688.failRequest(drpc.clj:136) ~[storm-core-0.10.0.2.4.2.0-258.jar:0.10.0.2.4.2.0-258]
在 backtype.storm.drpc.DRPCSpout.fail(DRPCSpout.java:241) ~[storm-core-0.10.0.2.4.2.0-258.jar:0.10.0.2.4.2.0-258
不久之后,我得到 org.apache.hadoop.hbase.client.RetriesExhaustedException。这并不总是发生,但很常见。我基于此的假设是两种可能性之一:
扫描超时。然而,通过 HBase Shell 或 REST return 执行扫描不到一秒
table不一致,导致某区域缺失。我有 运行 hbase hbck,它显示 0 个不一致。
我知道与 HBase 的连接很好:我添加了调试输出,bolt 得到了结果。但是,由于 DRPCExecutionException,这些结果永远不会 return 通过 DRPC 编辑。
我虽然问题是 DRPC 超时,但是我增加了 DRPC 超时很多,并且在相同的时间内得到了相同的结果。谷歌搜索后,我发现其他人也有同样的问题 ([Storm][DRPC] Request failed),但没有说明如何解决这个问题。
为了参考,我在下面添加了我的代码:
try (Table table = HbaseClient.connection().getTable(TableName.valueOf("EPG_URI")))
{
List<Filter> filters = new ArrayList<>();
String startRowString = "start";
String endRowString = "end";
RowFilter startRow = new RowFilter(CompareFilter.CompareOp.GREATER_OR_EQUAL, new BinaryPrefixComparator(startRowString.getBytes()));
filters.add(startRow);
RowFilter endRow = new RowFilter(CompareFilter.CompareOp.LESS_OR_EQUAL, new BinaryPrefixComparator(endRowString.getBytes()));
filters.add(endRow);
FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL, filters);
Scan scan = new Scan();
scan.addFamily("f1".getBytes());
scan.setFilter(filterList);
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner)
{
hbaseValues.add(result);
}
}
}
在此先感谢您的帮助。
好的,所以我不知道为什么会出现最初的问题,但我确实通过大大简化我的扫描仪来修复它:
Scan scan = new Scan(startRowString.getBytes(), endRowString.getBytes());
scan.addFamily("f1".getBytes());
ResultScanner scanner = table.getScanner(scan);
for (Result r : scanner)
{...}
基于此,问题似乎出在HBase端而不是Storm。
我正在编写一个 Storm 拓扑以使用 DRPC 从 HBase 读取数据。本质上,这会执行扫描以获取数据、丰富数据并 returns 它。
我可以轻松获得一个基本的 DRPC 示例(基于 http://storm.apache.org/releases/current/Distributed-RPC.html)。但是,当我插入扫描代码时,这个过程需要很长时间。一分钟后,我收到以下错误:
backtype.storm.generated.DRPCExecutionException
在 backtype.storm.daemon.drpc$service_handler$reify__8688.failRequest(drpc.clj:136) ~[storm-core-0.10.0.2.4.2.0-258.jar:0.10.0.2.4.2.0-258]
在 backtype.storm.drpc.DRPCSpout.fail(DRPCSpout.java:241) ~[storm-core-0.10.0.2.4.2.0-258.jar:0.10.0.2.4.2.0-258
不久之后,我得到 org.apache.hadoop.hbase.client.RetriesExhaustedException。这并不总是发生,但很常见。我基于此的假设是两种可能性之一:
扫描超时。然而,通过 HBase Shell 或 REST return 执行扫描不到一秒 table不一致,导致某区域缺失。我有 运行 hbase hbck,它显示 0 个不一致。 我知道与 HBase 的连接很好:我添加了调试输出,bolt 得到了结果。但是,由于 DRPCExecutionException,这些结果永远不会 return 通过 DRPC 编辑。
我虽然问题是 DRPC 超时,但是我增加了 DRPC 超时很多,并且在相同的时间内得到了相同的结果。谷歌搜索后,我发现其他人也有同样的问题 ([Storm][DRPC] Request failed),但没有说明如何解决这个问题。
为了参考,我在下面添加了我的代码:
try (Table table = HbaseClient.connection().getTable(TableName.valueOf("EPG_URI")))
{
List<Filter> filters = new ArrayList<>();
String startRowString = "start";
String endRowString = "end";
RowFilter startRow = new RowFilter(CompareFilter.CompareOp.GREATER_OR_EQUAL, new BinaryPrefixComparator(startRowString.getBytes()));
filters.add(startRow);
RowFilter endRow = new RowFilter(CompareFilter.CompareOp.LESS_OR_EQUAL, new BinaryPrefixComparator(endRowString.getBytes()));
filters.add(endRow);
FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL, filters);
Scan scan = new Scan();
scan.addFamily("f1".getBytes());
scan.setFilter(filterList);
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner)
{
hbaseValues.add(result);
}
}
}
在此先感谢您的帮助。
好的,所以我不知道为什么会出现最初的问题,但我确实通过大大简化我的扫描仪来修复它:
Scan scan = new Scan(startRowString.getBytes(), endRowString.getBytes());
scan.addFamily("f1".getBytes());
ResultScanner scanner = table.getScanner(scan);
for (Result r : scanner)
{...}
基于此,问题似乎出在HBase端而不是Storm。