在更新共享变量时消除过度同步并改进错误处理

Eliminating excess synchronization and improving error handling when updating a shared variable

我有一个缓存数据库查询结果的共享对象,其接口是 "get cached results" 和 "invalidate cached results." return 稍微陈旧的数据是可以接受的。

我目前的解决方案粘贴在这个问题的底部。每个缓存的 getclear 方法都可以通过 CacheService 中的 public 方法访问。在Cache内,lastUpdated包含最近的查询结果; isValid 表示结果是否应该更新; updateGuard用于保证只有一个线程更新结果; updateWait 让线程等待另一个线程更新结果。为了确保进度,并且因为 return 稍微陈旧的数据是可以接受的,在 lastUpdated 更新后,我立即 return 从更新线程和等待更新的所有线程中得到结果 - 我没有检查 isValid 是否已再次设置为 false


主要问题:如果 lastUpdated = getUpdate() 抛出异常(可能是尝试与数据库通信时网络故障的结果),那么目前我只是 returning lastUpdated - return 稍微陈旧的数据是可以接受的,但在 getUpdate() 期间重复出现瞬态故障可能会导致数据极其陈旧。我想包括一些逻辑

final int maxRetries = 5;
...
try {
  updateWait.drainPermits();
  int retryCount = 0;
  while(true) {
    try {
      lastUpdated = getUpdate();
      break;
    } catch(Exception e) {
      retryCount++;
      if(retryCount == maxRetries) {
        throw Exception e in all threads waiting on semaphore
      }
    }
  }
  isValid = true;
}

但是我不确定实施 "throw Exception e in all threads waiting on semaphore" 的好方法或者是否有更好的选择。我考虑过的一种选择是使用 Scala Try,即 Try<ImmutableList<T>> lastUpdated,但我尽可能不混合使用 Scala 和 Java 对象,以使代码维护更容易。


不太关心:现在我有三个同步变量(isValid、updateGuard、updateWait),这似乎过多 - 我正在寻找一种安全消除一个的方法或其中两个。


public class CacheService {
  private final Cache<Foo> fooCache;
  private final Cache<Bar> barCache;
  // and so on

  private abstract class Cache<T> {
    private final AtomicBoolean updateGuard = new AtomicBoolean(false);
    private final Semaphore updateWait = new Semaphore(Integer.MAX_VALUE);

    private volatile boolean isValid = true;
    private volatile ImmutableList<T> lastUpdated = getUpdate();

    protected abstract ImmutableList<T> getUpdate();

    public void clear() {
      isValid = false;
    }

    public ImmutableList<T> get() {
      if(isValid) {
        return lastUpdated;
      } else {
        if(updateGuard.compareAndSet(false, true)) {
          try {
            updateWait.drainPermits();
            lastUpdated = getUpdate();
            isValid = true;
          } finally {
            updateGuard.set(false);
            updateWait.release(Integer.MAX_VALUE);
          }
        } else {
          while(updateGuard.get()) {
            try {
              updateWait.acquire();
            } catch(InterruptedException e) {
              break;
            }
          }
        }
        return lastUpdated;
      }
    }
  }

  public CacheService() {
    fooCache = new Cache<Foo>() {
      @Override
      protected ImmutableList<Foo> getUpdate() {
        return // database query
      }
    };
    // Likewise when initializing barCache etc
  }
}

一种方法是使用 CompletableFuture and completeExceptionally

private abstract static class Cache<T> {
    private final AtomicReference<CompletableFuture<ImmutableList<T>>> value = 
        new AtomicReference<>();
    private static final int MAX_TRIES = 5;

    protected abstract ImmutableList<T> getUpdate();

    public void clear() {
        value.getAndUpdate(f -> f != null && f.isDone() ? null : f);
        // or value.set(null); if you want the cache to be invalidated while it is being updated.
    }

    public ImmutableList<T> get() {
        CompletableFuture<ImmutableList<T>> f = value.get();
        if (f != null) {
            try {
                return f.get();
            } catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException(e);
            }
        }
        f = new CompletableFuture<>();
        if (!value.compareAndSet(null, f)) {
            return get();
        }
        for(int tries = 0; ; ){
            try {
                ImmutableList<T> update = getUpdate();
                f.complete(update);
                return update;
            } catch (Exception e){
                if(++tries == MAX_TRIES){
                    f.completeExceptionally(e);
                    throw new RuntimeException(e);
                }
            }
        }
    }
}

您可能希望以不同的方式处理异常,如果您想再次尝试获取更新,则需要在抛出异常后将其清除。

你的实现有问题。当 100 个线程在 updateGuard 锁上停止时,所有线程都将执行 getUpdate() 路径。所以,一旦你有了锁,你需要重新检查isValid。

我不是Semphore的专家class,但我认为结合updateGuard和updateWait应该是可行的。

这只是您的 get 方法主体的精简版本:

while (!isValid) {
  if (updateWait.tryAcquire()) {
    if (!isValid) {
      lastUpdate = getUpdate();
      isValid = true;
    }
  } else {
    updateWait.acquire();
  }
  updateWait.release();
}
return lastUpdate;

这应该具有您代码中的所有语义,加上重新检查 isValid。

异常:在 Java 缓存库 cache2k we implemented Exception caching. I wrote a blog entry on this, see: About caching exception 中。这可能会解决您的一些问题。

最后,这是我对它的总结:

  • 快速失败,如果你不能做任何有用的事情,总是传播异常。
  • Fail-fast 意味着不重试以尽快摆脱阻塞的资源。用户将在任何情况下重试:失败或等待时间过长。
  • 传播异常时不要将其另外记录为警告。
  • 如果您从数据源向多个消费者重新抛出一个异常,请确保您明确表明这些异常是重复的
  • 一旦你return过时的数据,因为最近的请求returns异常,确保有警告机制。在 cache2k 中,我们可能会实现两个指标:逾期秒数和受影响的条目数