Akka Streams - 如何生成列表的大小,然后是整个列表
Akka Streams - How to produce the size of a List followed by the entire List
我想消费一个元素列表,然后转发列表的大小,然后是完整的元素列表。
例如
Given List (1, 2, 3, 4, 5)
When the source is fully consumed
Then the next processing-stage receives the elements List(5, 1, 2, 3, 4, 5)
这是我试图解决的玩具问题;我明白在下一个处理阶段接收其第一个元素之前完全使用列表是一种不好的做法。
我有计算列表大小的代码。
val length: RunnableGraph[Future[Int]] = FileIO.fromPath(Paths.get("myList.txt")).toMat(Sink.fold(0) {
case (length, s) => length + s.length
})(Keep.right)
我不确定在将列表发送到下一个处理阶段(前面是列表的大小)之前如何完全使用列表。
您可以使用 fold
to accumulate both the size of the List
and the List
elements themselves, then use flatMapConcat
and concat
:
val data = List(1, 2, 3, 4, 5)
Source(data)
.fold((0, List.empty[Int]))((acc, curr) => (acc._1 + 1, acc._2 :+ curr))
.flatMapConcat {
case (size, elems) => Source.single(size).concat(Source(elems))
}
.runForeach(println)
以上代码打印:
5 // size of the list
1
2
3
4
5
请注意,虽然上面的代码在这个玩具示例中有效,但它不是 "streamlike",因为它将整个 List
复制到内存中(这否定了使用流的全部意义)。希望这个例子能够说明 Akka Stream 的一些功能,但不要在生产代码中遵循这种方法。
我想消费一个元素列表,然后转发列表的大小,然后是完整的元素列表。
例如
Given List (1, 2, 3, 4, 5)
When the source is fully consumed
Then the next processing-stage receives the elements List(5, 1, 2, 3, 4, 5)
这是我试图解决的玩具问题;我明白在下一个处理阶段接收其第一个元素之前完全使用列表是一种不好的做法。
我有计算列表大小的代码。
val length: RunnableGraph[Future[Int]] = FileIO.fromPath(Paths.get("myList.txt")).toMat(Sink.fold(0) {
case (length, s) => length + s.length
})(Keep.right)
我不确定在将列表发送到下一个处理阶段(前面是列表的大小)之前如何完全使用列表。
您可以使用 fold
to accumulate both the size of the List
and the List
elements themselves, then use flatMapConcat
and concat
:
val data = List(1, 2, 3, 4, 5)
Source(data)
.fold((0, List.empty[Int]))((acc, curr) => (acc._1 + 1, acc._2 :+ curr))
.flatMapConcat {
case (size, elems) => Source.single(size).concat(Source(elems))
}
.runForeach(println)
以上代码打印:
5 // size of the list
1
2
3
4
5
请注意,虽然上面的代码在这个玩具示例中有效,但它不是 "streamlike",因为它将整个 List
复制到内存中(这否定了使用流的全部意义)。希望这个例子能够说明 Akka Stream 的一些功能,但不要在生产代码中遵循这种方法。