使用自定义过滤器过滤 Accumulo 返回的结果时出错

Error when using a custom filter to filter results returned by Accumulo

我写了一个非常简单的自定义过滤器来过滤 Accumulo 返回的结果。这是我写的过滤器

public class MyFilter extends Filter {

    @Override
    public boolean accept(Key key, Value val) {
        Long page = 1L;
        Integer limit = 25;

        if(key.getColumnQualifier().getBytes().equals("Class".getBytes()) && val.get().equals("1".getBytes())) {
            if(page == 1) {
                return true;
            }
            limit--;
            if(limit == 1L) {
                page++;
                limit = 25;
            }
        }

        return false;
    }
}

我像这样将此过滤器添加到 Accumulo 的扫描仪中

Set<Range> ranges = new HashSet<>();
IteratorSetting iter = new IteratorSetting(15, "MyFilter", MyFilter.class);
myScanner.addScanIterator(iter);
Iterator<Entry<Key, Value>> kys = myScanner.iterator();
while(kys.hasNext()) { // This is line 335 in com.latize.ulysses3.service.AccumuloPivotTable.getRows
    Entry<Key, Value> e = kys.next();
    ranges.add(Range.exact(e.getKey().getRow()));
}

但是每当我尝试 运行 这段代码时,我都会得到这个堆栈跟踪

java.lang.RuntimeException: org.apache.accumulo.core.client.impl.AccumuloServerException: Error on server accumulo.tablet.2:9997
        at org.apache.accumulo.core.client.impl.ScannerIterator.hasNext(ScannerIterator.java:161)
        at com.latize.ulysses3.service.AccumuloPivotTable.getRows(AccumuloPivotTable.java:335)
        at com.latize.ulysses3.webservice.DataVault.getFilteredRows(DataVault.java:950)
        at com.latize.ulysses3.webservice.DataVault$Proxy$_$$_WeldClientProxy.getFilteredRows(Unknown Source)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.jboss.resteasy.core.MethodInjectorImpl.invoke(MethodInjectorImpl.java:137)
        at org.jboss.resteasy.core.ResourceMethodInvoker.invokeOnTarget(ResourceMethodInvoker.java:280)
        at org.jboss.resteasy.core.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:234)
        at org.jboss.resteasy.core.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:221)
        at org.jboss.resteasy.core.SynchronousDispatcher.invoke(SynchronousDispatcher.java:356)
        at org.jboss.resteasy.core.SynchronousDispatcher.invokePropagateNotFound(SynchronousDispatcher.java:217)
        at org.jboss.resteasy.plugins.server.servlet.ServletContainerDispatcher.service(ServletContainerDispatcher.java:224)
        at org.jboss.resteasy.plugins.server.servlet.FilterDispatcher.doFilter(FilterDispatcher.java:62)
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241)
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
        at org.apache.marmotta.platform.core.servlet.MarmottaResourceFilter$LMFFilterChain.doFilter(MarmottaResourceFilter.java:189)
        at org.apache.marmotta.platform.core.filters.ModuleResourceFilter.doFilter(ModuleResourceFilter.java:169)
        at org.apache.marmotta.platform.core.filters.ModuleResourceFilter$Proxy$_$$_WeldClientProxy.doFilter(Unknown Source)
        at org.apache.marmotta.platform.core.servlet.MarmottaResourceFilter$LMFFilterChain.doFilter(MarmottaResourceFilter.java:184)
        at com.latize.ulysses.platform.core.filters.ClickJackFilter.doFilter(ClickJackFilter.java:105)
        at com.latize.ulysses.platform.core.filters.ClickJackFilter$Proxy$_$$_WeldClientProxy.doFilter(Unknown Source)
        at org.apache.marmotta.platform.core.servlet.MarmottaResourceFilter$LMFFilterChain.doFilter(MarmottaResourceFilter.java:184)
        at com.latize.ulysses.platform.core.filters.SessionTimeoutCookieFilter.doFilter(SessionTimeoutCookieFilter.java:125)
        at com.latize.ulysses.platform.core.filters.SessionTimeoutCookieFilter$Proxy$_$$_WeldClientProxy.doFilter(Unknown Source)
        at org.apache.marmotta.platform.core.servlet.MarmottaResourceFilter$LMFFilterChain.doFilter(MarmottaResourceFilter.java:184)
        at org.apache.marmotta.platform.core.filters.MarmottaServerNameFilter.doFilter(MarmottaServerNameFilter.java:104)
        at org.apache.marmotta.platform.core.servlet.MarmottaResourceFilter$LMFFilterChain.doFilter(MarmottaResourceFilter.java:184)
        at com.latize.ulysses.platform.core.filters.UlyssesTemplatingFilter.doFilter(UlyssesTemplatingFilter.java:178)
        at com.latize.ulysses.platform.core.filters.UlyssesTemplatingFilter$Proxy$_$$_WeldClientProxy.doFilter(Unknown Source)
        at org.apache.marmotta.platform.core.servlet.MarmottaResourceFilter$LMFFilterChain.doFilter(MarmottaResourceFilter.java:184)
        at org.apache.marmotta.platform.core.filters.TemplatingFilter.doFilter(TemplatingFilter.java:176)
        at org.apache.marmotta.platform.core.filters.TemplatingFilter$Proxy$_$$_WeldClientProxy.doFilter(Unknown Source)
        at org.apache.marmotta.platform.core.servlet.MarmottaResourceFilter$LMFFilterChain.doFilter(MarmottaResourceFilter.java:184)
        at org.apache.marmotta.platform.security.filters.MarmottaAccessControlFilter.doFilter(MarmottaAccessControlFilter.java:142)
        at org.apache.marmotta.platform.security.filters.MarmottaAccessControlFilter$Proxy$_$$_WeldClientProxy.doFilter(Unknown Source)
        at org.apache.marmotta.platform.core.servlet.MarmottaResourceFilter$LMFFilterChain.doFilter(MarmottaResourceFilter.java:184)
        at org.apache.marmotta.platform.user.filters.MarmottaAuthenticationFilter.doFilter(MarmottaAuthenticationFilter.java:228)
        at org.apache.marmotta.platform.user.filters.MarmottaAuthenticationFilter$Proxy$_$$_WeldClientProxy.doFilter(Unknown Source)
        at org.apache.marmotta.platform.core.servlet.MarmottaResourceFilter$LMFFilterChain.doFilter(MarmottaResourceFilter.java:184)
        at org.apache.marmotta.platform.core.servlet.MarmottaResourceFilter.doFilter(MarmottaResourceFilter.java:135)
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241)
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
        at org.apache.marmotta.platform.core.servlet.MarmottaPreStartupFilter.doFilter(MarmottaPreStartupFilter.java:106)
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241)
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
        at com.thetransactioncompany.cors.CORSFilter.doFilter(CORSFilter.java:169)
        at com.thetransactioncompany.cors.CORSFilter.doFilter(CORSFilter.java:232)
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241)
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
        at org.apache.marmotta.platform.core.servlet.MarmottaOptionsFilter.doFilter(MarmottaOptionsFilter.java:83)
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241)
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
        at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:220)
        at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:122)
        at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:505)
        at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:170)
        at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:103)
        at org.apache.catalina.valves.AccessLogValve.invoke(AccessLogValve.java:956)
        at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:116)
        at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:423)
        at org.apache.coyote.http11.AbstractHttp11Processor.process(AbstractHttp11Processor.java:1079)
        at org.apache.coyote.AbstractProtocol$AbstractConnectionHandler.process(AbstractProtocol.java:625)
        at org.apache.tomcat.util.net.JIoEndpoint$SocketProcessor.run(JIoEndpoint.java:318)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.accumulo.core.client.impl.AccumuloServerException: Error on server accumulo.tablet.2:9997
        at org.apache.accumulo.core.client.impl.ThriftScanner.scan(ThriftScanner.java:293)
        at org.apache.accumulo.core.client.impl.ScannerIterator$Reader.run(ScannerIterator.java:80)
        at org.apache.accumulo.core.client.impl.ScannerIterator.hasNext(ScannerIterator.java:151)
        ... 69 more
Caused by: org.apache.thrift.TApplicationException: Internal error processing startScan
        at org.apache.thrift.TApplicationException.read(TApplicationException.java:111)
        at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:71)
        at org.apache.accumulo.core.tabletserver.thrift.TabletClientService$Client.recv_startScan(TabletClientService.java:232)
        at org.apache.accumulo.core.tabletserver.thrift.TabletClientService$Client.startScan(TabletClientService.java:208)
        at org.apache.accumulo.core.client.impl.ThriftScanner.scan(ThriftScanner.java:410)
        at org.apache.accumulo.core.client.impl.ThriftScanner.scan(ThriftScanner.java:285)
        ... 71 more

accumulo.tablet.2:9997 是 运行 我的平板服务器之一的机器的域名。配置没问题,因为其他功能正常。港口也起来了。有人可以告诉我为什么会出现该错误吗?任何帮助将不胜感激。

我知道了。当我检查 tserver 的日志时,我遇到了 ClassNotFound 异常。看来我需要将 class 添加到 Accumulo 的 class 路径。但是,如果我想将这些过滤后的结果作为应用程序的一部分而不是从终端 运行 获取,我该怎么做呢?我可能无法将过滤器添加到 Accumulo 的 classpath.

要正确使用过滤器,您必须将它们添加到 accumulo 类路径中,通常是通过部署 .jar 文件,其中在 $ACCUMULO_HOME/lib$ACCUMULO_HOME/lib/ext 中系统的每个节点上定义了迭代器。为了更深入地了解迭代器,我推荐非常好的 documentation of accumulo.

为了便于部署,我建议使用 Jenkins, Puppet or Chef. If this is to much of an effort for your project a simple shell script will do as well. You may find the cluster shell cssh 对管理问题也非常有帮助!请注意,在 ext 文件夹中部署后,您 不需要 需要重新启动 accumulo,但是 lib 文件夹是必需的。