ListenableFuture 链可以处理内部 ExecutionException 吗?

Can a ListenableFuture chain handle inner ExecutionException?

我获得了执行 gRPC 调用的 api (fnGrpc) 和 return 解析为某个值的 ListenableFuture v(其实现是固定且不可修改的)。

我想提供一个辅助函数 (fnHelper):

  1. 对 gRPC 结果进行一些转换处理,它本身 return 是一个 ListenableFuture 解析为转换后的值 t1.

  2. 处理 gRPC 调用失败,return 一些其他值 t2 而不是 fnHelper 的来电者看到 ExecutionException.

我可以使用 Futures.transform():

解决 (1)
package myHelper;

ListenableFuture<T> fnHelper() {
  return Futures.transform(fnGrpc(), new Function<V, T>() {
    @Override
    public T apply(V v) {
      T t1 = f(v);
      return t1;
    }
  });
} 

和来电者:

package myApp;

// ...

try {
  T t = fnHelper().get();
} catch (ExecutionException | InterruptedException ex) {
  // ...
}

我怎样才能实现 (2) 同时仍然有 fnHelper return a ListenableFuture 并保持非阻塞?

我可以让 fnHelper 本身创建一个额外的线程,我会在其中调用 fnGrpc 上的 .get(),但是是否存在避免这个额外线程的另一种方法?

我不是 Guava 专家,但您似乎可以使用相同的 Futures utility class, in particular the method catchingAsync 来做到这一点,您可以在其中传递 returns 一个 ListenableFuture 的函数回退值(t2):

ListenableFuture<Integer> faultTolerantFuture = Futures.catchingAsync(originalFuture,
                       Exception.class, x -> immediateFuture(t2), executor);

然后您应该能够使用 transform 方法链接它,该方法进行转换:

ListenableFuture<T> fnHelper() {
    return Futures.catching(Futures.transform(fnGrpc(), new Function<V, T>() {
        @Override
        public T apply(V v) {
            T t1 = f(v);
            return t1;
        }
    }),
    Exception.class, x -> immediateFuture(t2));
}

注意:在最后一个片段中,我使用 catching 而不是 catchingAsync 来与您问题中的代码保持一致,但我没有t 指定一个执行者。您可能需要使用带有 Async 后缀的方法进行 non-blocking 处理。