如何使用 akka 流限制图中的请求?
How to throttle request in a graph using akka stream?
背景
我有一个项目,我们在 Java 中使用 akka-streams。
在这个项目中,我有一个字符串流和一个对它们进行一些操作的图形。
Objective
在我的图表中,我想将该流广播给 2 个工作人员。将用 'A'
替换所有字符 'a'
并在实时接收数据时发送数据。
另一个将接收数据,每 3 个字符串,它将连接这 3 个字符串并将它们映射到数字。
它看起来像下面这样:
显然Sink 2
接收信息的速度不如Sink 1
。但这是预期的行为。这里有趣的部分是工人 2。
问题
做工人 1 很容易,也不难。这里的问题是工作人员 2。我知道 akka 有最多可以保存 X 条消息的缓冲区,但看起来我被迫选择现有的 Overflow strategies 之一,这通常会导致选择我想要的消息删除或者我是否想让流保持活动状态。
我想要的是,当我在 worke2 中的缓冲区达到缓冲区的最大大小时,对它拥有的所有消息执行 concat 和 map 操作,然后将它们一起发送(之后重置缓冲区)。
但即使在阅读了 akka 的 stream-rate 文档之后,我也找不到这样做的方法,至少使用 Java 是这样。
研究
我也检查了一个类似的 SO 问题, 然而已经一年多了,没有人回应。
问题
使用图形 DSL,我将如何创建路径:
Source -> bcast -> worker2 -> Sink 2
??
在您 bcast
应用 groupedWithin
运算符后,持续时间不受限制,元素数量设置为 3。
https://doc.akka.io/docs/akka/2.5/stream/operators/Source-or-Flow/groupedWithin.html
您也可以自己做,添加一个阶段,将元素存储在 List
中,并在每次达到 3 个元素时发出列表。
import akka.stream.Attributes;
import akka.stream.FlowShape;
import akka.stream.Inlet;
import akka.stream.Outlet;
import akka.stream.stage.AbstractInHandler;
import akka.stream.stage.GraphStage;
import akka.stream.stage.GraphStageLogic;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.List;
public class RecordGrouper<T> extends GraphStage<FlowShape<T, List<T>>> {
private final Inlet<T> inlet = Inlet.create("in");
private final Outlet<List<T>> outlet = Outlet.create("out");
private final FlowShape<T, List<T>> shape = new FlowShape<>(inlet, outlet);
@Override
public GraphStageLogic createLogic(Attributes inheritedAttributes) {
return new GraphStageLogic(shape) {
List<T> batch = new ArrayList<>(3);
{
setHandler(
inlet,
new AbstractInHandler() {
@Override
public void onPush() {
T record = grab(inlet);
batch.add(record);
if (batch.size() == 3) {
emit(outlet, ImmutableList.copyOf(batch));
batch.clear();
}
pull(inlet);
}
});
}
@Override
public void preStart() {
pull(inlet);
}
};
}
@Override
public FlowShape<T, List<T>> shape() {
return shape;
}
}
作为侧节点,我认为 buffer
运算符不会起作用,因为它仅在存在背压时才会启动。所以如果一切都很安静,元素仍然会一个接一个地发射,而不是 3 乘 3。https://doc.akka.io/docs/akka/2.5/stream/operators/Source-or-Flow/buffer.html
背景
我有一个项目,我们在 Java 中使用 akka-streams。
在这个项目中,我有一个字符串流和一个对它们进行一些操作的图形。
Objective
在我的图表中,我想将该流广播给 2 个工作人员。将用 'A'
替换所有字符 'a'
并在实时接收数据时发送数据。
另一个将接收数据,每 3 个字符串,它将连接这 3 个字符串并将它们映射到数字。
它看起来像下面这样:
显然Sink 2
接收信息的速度不如Sink 1
。但这是预期的行为。这里有趣的部分是工人 2。
问题
做工人 1 很容易,也不难。这里的问题是工作人员 2。我知道 akka 有最多可以保存 X 条消息的缓冲区,但看起来我被迫选择现有的 Overflow strategies 之一,这通常会导致选择我想要的消息删除或者我是否想让流保持活动状态。
我想要的是,当我在 worke2 中的缓冲区达到缓冲区的最大大小时,对它拥有的所有消息执行 concat 和 map 操作,然后将它们一起发送(之后重置缓冲区)。
但即使在阅读了 akka 的 stream-rate 文档之后,我也找不到这样做的方法,至少使用 Java 是这样。
研究
我也检查了一个类似的 SO 问题, 然而已经一年多了,没有人回应。
问题
使用图形 DSL,我将如何创建路径:
Source -> bcast -> worker2 -> Sink 2
??
在您 bcast
应用 groupedWithin
运算符后,持续时间不受限制,元素数量设置为 3。
https://doc.akka.io/docs/akka/2.5/stream/operators/Source-or-Flow/groupedWithin.html
您也可以自己做,添加一个阶段,将元素存储在 List
中,并在每次达到 3 个元素时发出列表。
import akka.stream.Attributes;
import akka.stream.FlowShape;
import akka.stream.Inlet;
import akka.stream.Outlet;
import akka.stream.stage.AbstractInHandler;
import akka.stream.stage.GraphStage;
import akka.stream.stage.GraphStageLogic;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.List;
public class RecordGrouper<T> extends GraphStage<FlowShape<T, List<T>>> {
private final Inlet<T> inlet = Inlet.create("in");
private final Outlet<List<T>> outlet = Outlet.create("out");
private final FlowShape<T, List<T>> shape = new FlowShape<>(inlet, outlet);
@Override
public GraphStageLogic createLogic(Attributes inheritedAttributes) {
return new GraphStageLogic(shape) {
List<T> batch = new ArrayList<>(3);
{
setHandler(
inlet,
new AbstractInHandler() {
@Override
public void onPush() {
T record = grab(inlet);
batch.add(record);
if (batch.size() == 3) {
emit(outlet, ImmutableList.copyOf(batch));
batch.clear();
}
pull(inlet);
}
});
}
@Override
public void preStart() {
pull(inlet);
}
};
}
@Override
public FlowShape<T, List<T>> shape() {
return shape;
}
}
作为侧节点,我认为 buffer
运算符不会起作用,因为它仅在存在背压时才会启动。所以如果一切都很安静,元素仍然会一个接一个地发射,而不是 3 乘 3。https://doc.akka.io/docs/akka/2.5/stream/operators/Source-or-Flow/buffer.html