在更新共享变量时消除过度同步并改进错误处理
Eliminating excess synchronization and improving error handling when updating a shared variable
我有一个缓存数据库查询结果的共享对象,其接口是 "get cached results" 和 "invalidate cached results." return 稍微陈旧的数据是可以接受的。
我目前的解决方案粘贴在这个问题的底部。每个缓存的 get
和 clear
方法都可以通过 CacheService
中的 public 方法访问。在Cache
内,lastUpdated
包含最近的查询结果; isValid
表示结果是否应该更新; updateGuard
用于保证只有一个线程更新结果; updateWait
让线程等待另一个线程更新结果。为了确保进度,并且因为 return 稍微陈旧的数据是可以接受的,在 lastUpdated
更新后,我立即 return 从更新线程和等待更新的所有线程中得到结果 - 我没有检查 isValid
是否已再次设置为 false
。
主要问题:如果 lastUpdated = getUpdate()
抛出异常(可能是尝试与数据库通信时网络故障的结果),那么目前我只是 returning lastUpdated
- return 稍微陈旧的数据是可以接受的,但在 getUpdate()
期间重复出现瞬态故障可能会导致数据极其陈旧。我想包括一些逻辑
final int maxRetries = 5;
...
try {
updateWait.drainPermits();
int retryCount = 0;
while(true) {
try {
lastUpdated = getUpdate();
break;
} catch(Exception e) {
retryCount++;
if(retryCount == maxRetries) {
throw Exception e in all threads waiting on semaphore
}
}
}
isValid = true;
}
但是我不确定实施 "throw Exception e in all threads waiting on semaphore" 的好方法或者是否有更好的选择。我考虑过的一种选择是使用 Scala Try
,即 Try<ImmutableList<T>> lastUpdated
,但我尽可能不混合使用 Scala 和 Java 对象,以使代码维护更容易。
不太关心:现在我有三个同步变量(isValid、updateGuard、updateWait),这似乎过多 - 我正在寻找一种安全消除一个的方法或其中两个。
public class CacheService {
private final Cache<Foo> fooCache;
private final Cache<Bar> barCache;
// and so on
private abstract class Cache<T> {
private final AtomicBoolean updateGuard = new AtomicBoolean(false);
private final Semaphore updateWait = new Semaphore(Integer.MAX_VALUE);
private volatile boolean isValid = true;
private volatile ImmutableList<T> lastUpdated = getUpdate();
protected abstract ImmutableList<T> getUpdate();
public void clear() {
isValid = false;
}
public ImmutableList<T> get() {
if(isValid) {
return lastUpdated;
} else {
if(updateGuard.compareAndSet(false, true)) {
try {
updateWait.drainPermits();
lastUpdated = getUpdate();
isValid = true;
} finally {
updateGuard.set(false);
updateWait.release(Integer.MAX_VALUE);
}
} else {
while(updateGuard.get()) {
try {
updateWait.acquire();
} catch(InterruptedException e) {
break;
}
}
}
return lastUpdated;
}
}
}
public CacheService() {
fooCache = new Cache<Foo>() {
@Override
protected ImmutableList<Foo> getUpdate() {
return // database query
}
};
// Likewise when initializing barCache etc
}
}
一种方法是使用 CompletableFuture
and completeExceptionally
private abstract static class Cache<T> {
private final AtomicReference<CompletableFuture<ImmutableList<T>>> value =
new AtomicReference<>();
private static final int MAX_TRIES = 5;
protected abstract ImmutableList<T> getUpdate();
public void clear() {
value.getAndUpdate(f -> f != null && f.isDone() ? null : f);
// or value.set(null); if you want the cache to be invalidated while it is being updated.
}
public ImmutableList<T> get() {
CompletableFuture<ImmutableList<T>> f = value.get();
if (f != null) {
try {
return f.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
f = new CompletableFuture<>();
if (!value.compareAndSet(null, f)) {
return get();
}
for(int tries = 0; ; ){
try {
ImmutableList<T> update = getUpdate();
f.complete(update);
return update;
} catch (Exception e){
if(++tries == MAX_TRIES){
f.completeExceptionally(e);
throw new RuntimeException(e);
}
}
}
}
}
您可能希望以不同的方式处理异常,如果您想再次尝试获取更新,则需要在抛出异常后将其清除。
你的实现有问题。当 100 个线程在 updateGuard 锁上停止时,所有线程都将执行 getUpdate() 路径。所以,一旦你有了锁,你需要重新检查isValid。
我不是Semphore的专家class,但我认为结合updateGuard和updateWait应该是可行的。
这只是您的 get 方法主体的精简版本:
while (!isValid) {
if (updateWait.tryAcquire()) {
if (!isValid) {
lastUpdate = getUpdate();
isValid = true;
}
} else {
updateWait.acquire();
}
updateWait.release();
}
return lastUpdate;
这应该具有您代码中的所有语义,加上重新检查 isValid。
异常:在 Java 缓存库 cache2k we implemented Exception caching. I wrote a blog entry on this, see: About caching exception 中。这可能会解决您的一些问题。
最后,这是我对它的总结:
- 快速失败,如果你不能做任何有用的事情,总是传播异常。
- Fail-fast 意味着不重试以尽快摆脱阻塞的资源。用户将在任何情况下重试:失败或等待时间过长。
- 传播异常时不要将其另外记录为警告。
- 如果您从数据源向多个消费者重新抛出一个异常,请确保您明确表明这些异常是重复的
- 一旦你return过时的数据,因为最近的请求returns异常,确保有警告机制。在 cache2k 中,我们可能会实现两个指标:逾期秒数和受影响的条目数
我有一个缓存数据库查询结果的共享对象,其接口是 "get cached results" 和 "invalidate cached results." return 稍微陈旧的数据是可以接受的。
我目前的解决方案粘贴在这个问题的底部。每个缓存的 get
和 clear
方法都可以通过 CacheService
中的 public 方法访问。在Cache
内,lastUpdated
包含最近的查询结果; isValid
表示结果是否应该更新; updateGuard
用于保证只有一个线程更新结果; updateWait
让线程等待另一个线程更新结果。为了确保进度,并且因为 return 稍微陈旧的数据是可以接受的,在 lastUpdated
更新后,我立即 return 从更新线程和等待更新的所有线程中得到结果 - 我没有检查 isValid
是否已再次设置为 false
。
主要问题:如果 lastUpdated = getUpdate()
抛出异常(可能是尝试与数据库通信时网络故障的结果),那么目前我只是 returning lastUpdated
- return 稍微陈旧的数据是可以接受的,但在 getUpdate()
期间重复出现瞬态故障可能会导致数据极其陈旧。我想包括一些逻辑
final int maxRetries = 5;
...
try {
updateWait.drainPermits();
int retryCount = 0;
while(true) {
try {
lastUpdated = getUpdate();
break;
} catch(Exception e) {
retryCount++;
if(retryCount == maxRetries) {
throw Exception e in all threads waiting on semaphore
}
}
}
isValid = true;
}
但是我不确定实施 "throw Exception e in all threads waiting on semaphore" 的好方法或者是否有更好的选择。我考虑过的一种选择是使用 Scala Try
,即 Try<ImmutableList<T>> lastUpdated
,但我尽可能不混合使用 Scala 和 Java 对象,以使代码维护更容易。
不太关心:现在我有三个同步变量(isValid、updateGuard、updateWait),这似乎过多 - 我正在寻找一种安全消除一个的方法或其中两个。
public class CacheService {
private final Cache<Foo> fooCache;
private final Cache<Bar> barCache;
// and so on
private abstract class Cache<T> {
private final AtomicBoolean updateGuard = new AtomicBoolean(false);
private final Semaphore updateWait = new Semaphore(Integer.MAX_VALUE);
private volatile boolean isValid = true;
private volatile ImmutableList<T> lastUpdated = getUpdate();
protected abstract ImmutableList<T> getUpdate();
public void clear() {
isValid = false;
}
public ImmutableList<T> get() {
if(isValid) {
return lastUpdated;
} else {
if(updateGuard.compareAndSet(false, true)) {
try {
updateWait.drainPermits();
lastUpdated = getUpdate();
isValid = true;
} finally {
updateGuard.set(false);
updateWait.release(Integer.MAX_VALUE);
}
} else {
while(updateGuard.get()) {
try {
updateWait.acquire();
} catch(InterruptedException e) {
break;
}
}
}
return lastUpdated;
}
}
}
public CacheService() {
fooCache = new Cache<Foo>() {
@Override
protected ImmutableList<Foo> getUpdate() {
return // database query
}
};
// Likewise when initializing barCache etc
}
}
一种方法是使用 CompletableFuture
and completeExceptionally
private abstract static class Cache<T> {
private final AtomicReference<CompletableFuture<ImmutableList<T>>> value =
new AtomicReference<>();
private static final int MAX_TRIES = 5;
protected abstract ImmutableList<T> getUpdate();
public void clear() {
value.getAndUpdate(f -> f != null && f.isDone() ? null : f);
// or value.set(null); if you want the cache to be invalidated while it is being updated.
}
public ImmutableList<T> get() {
CompletableFuture<ImmutableList<T>> f = value.get();
if (f != null) {
try {
return f.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
f = new CompletableFuture<>();
if (!value.compareAndSet(null, f)) {
return get();
}
for(int tries = 0; ; ){
try {
ImmutableList<T> update = getUpdate();
f.complete(update);
return update;
} catch (Exception e){
if(++tries == MAX_TRIES){
f.completeExceptionally(e);
throw new RuntimeException(e);
}
}
}
}
}
您可能希望以不同的方式处理异常,如果您想再次尝试获取更新,则需要在抛出异常后将其清除。
你的实现有问题。当 100 个线程在 updateGuard 锁上停止时,所有线程都将执行 getUpdate() 路径。所以,一旦你有了锁,你需要重新检查isValid。
我不是Semphore的专家class,但我认为结合updateGuard和updateWait应该是可行的。
这只是您的 get 方法主体的精简版本:
while (!isValid) {
if (updateWait.tryAcquire()) {
if (!isValid) {
lastUpdate = getUpdate();
isValid = true;
}
} else {
updateWait.acquire();
}
updateWait.release();
}
return lastUpdate;
这应该具有您代码中的所有语义,加上重新检查 isValid。
异常:在 Java 缓存库 cache2k we implemented Exception caching. I wrote a blog entry on this, see: About caching exception 中。这可能会解决您的一些问题。
最后,这是我对它的总结:
- 快速失败,如果你不能做任何有用的事情,总是传播异常。
- Fail-fast 意味着不重试以尽快摆脱阻塞的资源。用户将在任何情况下重试:失败或等待时间过长。
- 传播异常时不要将其另外记录为警告。
- 如果您从数据源向多个消费者重新抛出一个异常,请确保您明确表明这些异常是重复的
- 一旦你return过时的数据,因为最近的请求returns异常,确保有警告机制。在 cache2k 中,我们可能会实现两个指标:逾期秒数和受影响的条目数