无限期地等待可能永远不会到达的消息
Waiting indefinitely for a message that may never arrive
我有一个 Java 类型的 actor,它负责可能暂时不可用的外部资源的 filter/retry 逻辑。 actor的字段和常用方法为:
public class MyActorImpl implements MyActor {
private static final long MINWAIT = 50;
private static final long MAXWAIT = 1000;
private static final long DEFAULTWAIT = 0;
private static final double BACKOFFMULTIPLIER = 1.5;
private long updateWait(long currentWait) {
return Math.min(Math.max((long) (currentWait * BACKOFFMULTIPLIER), MINWAIT), MAXWAIT);
}
// mutable
private long opWait = DEFAULTWAIT;
private final Queue<OpInput> opBuffer = new ArrayDeque<>();
// called from external actor
public void operation(OpInput opInput) {
operation(opInput, DEFAULTWAIT);
}
// called internally
public void operation(OpInput opInput, long currentWait);
}
演员有几个操作都或多或少具有相同的retry/buffer逻辑;每个操作都有自己的 [op]Wait
和 [op]Buffer
字段。
- 父演员调用
void operation(OpInput opInput)
- 前面的方法使用
DEFAULTWAIT
作为第二个参数调用 void operation(OpInput opInput, long currentWait)
- 如果
currentWait
参数不等于 opWait
则输入存储在 opBuffer
中,否则将输入发送到外部资源。
- 如果外部资源returns成功,则
opWait
设置为DEFAULTWAIT
,opBuffer
的内容通过[=22=传回] 方法。如果外部资源(或更可能是网络)returns 出现错误,那么我会使用 opWait
毫秒的延迟更新 opWait = updateWait(opWait)
并在 actor 系统调度程序上安排 operation(opInput, opWait)
。
即我正在使用 actor 系统调度程序来实现指数退避;我正在使用 currentWait
参数来识别我正在重试的消息,并且正在缓冲其他消息,直到主要消息被外部资源成功处理。
问题是,如果计划的 operation(opInput, currentWait)
消息丢失,那么我将永远缓冲消息,因为 currentWait == opWait
守卫将无法处理所有其他消息。我可以使用 spring-retry 之类的东西来实现指数退避,但我没有看到合并操作的重试循环的方法,这意味着我可以在每个重试循环中使用一个线程(而使用 actor 系统的调度程序不会'不要给系统带来更多压力)。
我正在寻找一种容错能力更强的方法来在参与者和外部资源之间的接口上实现缓冲和指数退避,而不必为任务分配太多资源。
如果我对你的理解是正确的,如果唯一的问题是丢失预定的消息,你为什么不对特定消息使用 Reliable Proxy Pattern 之类的东西,然后如果它失败 opWait = DEFAULTWAIT;
我了解到您的代码有些问题,当您说 public void operation(OpInput opInput)
被外部调用时,我不明白您的意思。你的意思是这个方法正在与网络交互,它使用了有时不可用的资源?
如果可以的话,我可以推荐一个替代方案。据我了解,你的主要问题是你有一个资源有时不可用,所以你有某种 que/buffer 你用某种等待逻辑实现,以便消息再次可用时将被处理,不幸的是,其中涉及一些可能丢失并导致无限等待的消息。我认为你可以使用带有超时的 Futures 来实现你想要的。如果未来在一定时间内未完成,则重试,最多说 3 次重试。您甚至可以根据服务器负载和完成消息所需的时间来调整此时间。希望对您有所帮助。
我有一个 Java 类型的 actor,它负责可能暂时不可用的外部资源的 filter/retry 逻辑。 actor的字段和常用方法为:
public class MyActorImpl implements MyActor {
private static final long MINWAIT = 50;
private static final long MAXWAIT = 1000;
private static final long DEFAULTWAIT = 0;
private static final double BACKOFFMULTIPLIER = 1.5;
private long updateWait(long currentWait) {
return Math.min(Math.max((long) (currentWait * BACKOFFMULTIPLIER), MINWAIT), MAXWAIT);
}
// mutable
private long opWait = DEFAULTWAIT;
private final Queue<OpInput> opBuffer = new ArrayDeque<>();
// called from external actor
public void operation(OpInput opInput) {
operation(opInput, DEFAULTWAIT);
}
// called internally
public void operation(OpInput opInput, long currentWait);
}
演员有几个操作都或多或少具有相同的retry/buffer逻辑;每个操作都有自己的 [op]Wait
和 [op]Buffer
字段。
- 父演员调用
void operation(OpInput opInput)
- 前面的方法使用
DEFAULTWAIT
作为第二个参数调用void operation(OpInput opInput, long currentWait)
- 如果
currentWait
参数不等于opWait
则输入存储在opBuffer
中,否则将输入发送到外部资源。 - 如果外部资源returns成功,则
opWait
设置为DEFAULTWAIT
,opBuffer
的内容通过[=22=传回] 方法。如果外部资源(或更可能是网络)returns 出现错误,那么我会使用opWait
毫秒的延迟更新opWait = updateWait(opWait)
并在 actor 系统调度程序上安排operation(opInput, opWait)
。
即我正在使用 actor 系统调度程序来实现指数退避;我正在使用 currentWait
参数来识别我正在重试的消息,并且正在缓冲其他消息,直到主要消息被外部资源成功处理。
问题是,如果计划的 operation(opInput, currentWait)
消息丢失,那么我将永远缓冲消息,因为 currentWait == opWait
守卫将无法处理所有其他消息。我可以使用 spring-retry 之类的东西来实现指数退避,但我没有看到合并操作的重试循环的方法,这意味着我可以在每个重试循环中使用一个线程(而使用 actor 系统的调度程序不会'不要给系统带来更多压力)。
我正在寻找一种容错能力更强的方法来在参与者和外部资源之间的接口上实现缓冲和指数退避,而不必为任务分配太多资源。
如果我对你的理解是正确的,如果唯一的问题是丢失预定的消息,你为什么不对特定消息使用 Reliable Proxy Pattern 之类的东西,然后如果它失败 opWait = DEFAULTWAIT;
我了解到您的代码有些问题,当您说 public void operation(OpInput opInput)
被外部调用时,我不明白您的意思。你的意思是这个方法正在与网络交互,它使用了有时不可用的资源?
如果可以的话,我可以推荐一个替代方案。据我了解,你的主要问题是你有一个资源有时不可用,所以你有某种 que/buffer 你用某种等待逻辑实现,以便消息再次可用时将被处理,不幸的是,其中涉及一些可能丢失并导致无限等待的消息。我认为你可以使用带有超时的 Futures 来实现你想要的。如果未来在一定时间内未完成,则重试,最多说 3 次重试。您甚至可以根据服务器负载和完成消息所需的时间来调整此时间。希望对您有所帮助。