如何只读取期货序列中的成功值
How to read only Successful values from a Seq of Futures
我正在学习 akka/scala 并且正在尝试只读取那些从 Seq[Future[Int]]
成功但无法正常工作的 Future
。
- 我模拟了一个包含 10 个
Future[Int]
的数组,其中一些失败取决于 FailThreshold
取的值(全部失败为 10,none 失败为 0)。
- 然后我尝试将它们读入 ArrayBuffer(无法找到 return 具有值的不可变结构的方法)。
- 此外,Success/Failure 上没有过滤器,因此必须 运行 每个未来的
onComplete
和更新缓冲区作为副作用。
- 即使
FailThreshold=0
和 Seq 将所有 Future 设置为 Success,数组缓冲区有时也是空的并且 运行s return 不同大小的数组。
我尝试了一些来自网络的其他建议,例如在列表中使用 Future.sequence
,但如果任何未来变量失败,这将引发异常。
import akka.actor._
import akka.pattern.ask
import scala.concurrent.{Await, Future, Promise}
import scala.concurrent.duration._
import scala.util.{Timeout, Failure, Success}
import concurrent.ExecutionContext.Implicits.global
case object AskNameMessage
implicit val timeout = Timeout(5, SECONDS)
val FailThreshold = 0
class HeyActor(num: Int) extends Actor {
def receive = {
case AskNameMessage => if (num<FailThreshold) {Thread.sleep(1000);sender ! num} else sender ! num
}
}
class FLPActor extends Actor {
def receive = {
case t: IndexedSeq[Future[Int]] => {
println(t)
val b = scala.collection.mutable.ArrayBuffer.empty[Int]
t.foldLeft( b ){ case (bf,ft) =>
ft.onComplete { case Success(v) => bf += ft.value.get.get }
bf
}
println(b)
}
}
}
val system = ActorSystem("AskTest")
val flm = (0 to 10).map( (n) => system.actorOf(Props(new HeyActor(n)), name="futureListMake"+(n)) )
val flp = system.actorOf(Props(new FLPActor), name="futureListProcessor")
// val delay = akka.pattern.after(500 millis, using=system.scheduler)(Future.failed( throw new IllegalArgumentException("DONE!") ))
val delay = akka.pattern.after(500 millis, using=system.scheduler)(Future.successful(0))
val seqOfFtrs = (0 to 10).map( (n) => Future.firstCompletedOf( Seq(delay, flm(n) ? AskNameMessage) ).mapTo[Int] )
flp ! seqOfFtrs
FLPActor
中的receive
大多得到
Vector(Future(Success(0)), Future(Success(1)), Future(Success(2)), Future(Success(3)), Future(Success(4)), Future(Success(5)), Future(Success(6)), Future(Success(7)), Future(Success(8)), Future(Success(9)), Future(Success(10)))
但是数组缓冲区 b
有不同数量的值并且有时是空的。
有人可以指出这里的空白吗,
- 为什么即使所有 Future 都解析为 Success,数组缓冲区的大小也会变化,
- 当我们想要
ask
具有 TimeOut 的不同参与者并且仅使用那些已成功 returned 的请求进行进一步处理时,使用的正确模式是什么。
与其直接发送 IndexedSeq[Future[Int]],不如转换为 Future[IndexedSeq[Int]],然后将其通过管道传递给下一个 actor。您不会将期货直接发送给演员。你必须管它。
HeyActor 可以保持不变。
之后
val seqOfFtrs = (0 to 10).map( (n) => Future.firstCompletedOf( Seq(delay, flm(n) ? AskNameMessage) ).mapTo[Int] )
做一个恢复,然后用Future.sequence把它变成一个Future:
val oneFut = Future.sequence(seqOfFtrs.map(f=>f.map(Some(_)).recover{ case (ex: Throwable) => None})).map(_.flatten)
如果您不了解 Some、None 和 flatten 的业务,请确保您了解 Option 类型。从序列中删除值的一种方法是将序列中的值映射到 Option(Some 或 None),然后展平序列。 None 值被删除,一些值被展开。
将数据转换为单个 Future 后,将其通过管道传递给 FLPActor:
oneFut pipeTo flp
应使用以下接收函数重写 FLPActor:
def receive = {
case printme: IndexedSeq[Int] => println(printme)
}
在 Akka 中,从 Future 或 Future 的 onComplete 修改 actor 主线程中的某些状态是一个很大的禁忌。在最坏的情况下,它会导致竞争条件。请记住,每个 Future 都在自己的线程上运行,因此 运行 actor 中的 Future 意味着您可以在不同的线程中完成并发工作。让 Future 直接修改 actor 中的某些状态,而 actor 也在处理某些状态,这是灾难的根源。在 Akka 中,您直接在主要执行者的主要执行线程中处理对状态的所有更改。如果您在 Future 中完成了一些工作并且需要从 actor 的主线程访问该工作,您可以将其通过管道传递给该 actor。 pipeTo 模式对于访问 Future 的已完成计算是功能性的、正确的和安全的。
回答关于为什么 FLPActor 没有正确打印出 IndexedSeq 的问题:您在 Futures 完成之前打印出 ArrayBuffer。 onComplete 不是在这种情况下使用的正确习惯用法,您通常应该避免使用它,因为它不是好的功能样式。
不要忘记 pipeTo 语法的导入 akka.pattern.pipe。
我正在学习 akka/scala 并且正在尝试只读取那些从 Seq[Future[Int]]
成功但无法正常工作的 Future
。
- 我模拟了一个包含 10 个
Future[Int]
的数组,其中一些失败取决于FailThreshold
取的值(全部失败为 10,none 失败为 0)。 - 然后我尝试将它们读入 ArrayBuffer(无法找到 return 具有值的不可变结构的方法)。
- 此外,Success/Failure 上没有过滤器,因此必须 运行 每个未来的
onComplete
和更新缓冲区作为副作用。 - 即使
FailThreshold=0
和 Seq 将所有 Future 设置为 Success,数组缓冲区有时也是空的并且 运行s return 不同大小的数组。
我尝试了一些来自网络的其他建议,例如在列表中使用 Future.sequence
,但如果任何未来变量失败,这将引发异常。
import akka.actor._
import akka.pattern.ask
import scala.concurrent.{Await, Future, Promise}
import scala.concurrent.duration._
import scala.util.{Timeout, Failure, Success}
import concurrent.ExecutionContext.Implicits.global
case object AskNameMessage
implicit val timeout = Timeout(5, SECONDS)
val FailThreshold = 0
class HeyActor(num: Int) extends Actor {
def receive = {
case AskNameMessage => if (num<FailThreshold) {Thread.sleep(1000);sender ! num} else sender ! num
}
}
class FLPActor extends Actor {
def receive = {
case t: IndexedSeq[Future[Int]] => {
println(t)
val b = scala.collection.mutable.ArrayBuffer.empty[Int]
t.foldLeft( b ){ case (bf,ft) =>
ft.onComplete { case Success(v) => bf += ft.value.get.get }
bf
}
println(b)
}
}
}
val system = ActorSystem("AskTest")
val flm = (0 to 10).map( (n) => system.actorOf(Props(new HeyActor(n)), name="futureListMake"+(n)) )
val flp = system.actorOf(Props(new FLPActor), name="futureListProcessor")
// val delay = akka.pattern.after(500 millis, using=system.scheduler)(Future.failed( throw new IllegalArgumentException("DONE!") ))
val delay = akka.pattern.after(500 millis, using=system.scheduler)(Future.successful(0))
val seqOfFtrs = (0 to 10).map( (n) => Future.firstCompletedOf( Seq(delay, flm(n) ? AskNameMessage) ).mapTo[Int] )
flp ! seqOfFtrs
FLPActor
中的receive
大多得到
Vector(Future(Success(0)), Future(Success(1)), Future(Success(2)), Future(Success(3)), Future(Success(4)), Future(Success(5)), Future(Success(6)), Future(Success(7)), Future(Success(8)), Future(Success(9)), Future(Success(10)))
但是数组缓冲区 b
有不同数量的值并且有时是空的。
有人可以指出这里的空白吗,
- 为什么即使所有 Future 都解析为 Success,数组缓冲区的大小也会变化,
- 当我们想要
ask
具有 TimeOut 的不同参与者并且仅使用那些已成功 returned 的请求进行进一步处理时,使用的正确模式是什么。
与其直接发送 IndexedSeq[Future[Int]],不如转换为 Future[IndexedSeq[Int]],然后将其通过管道传递给下一个 actor。您不会将期货直接发送给演员。你必须管它。
HeyActor 可以保持不变。
之后val seqOfFtrs = (0 to 10).map( (n) => Future.firstCompletedOf( Seq(delay, flm(n) ? AskNameMessage) ).mapTo[Int] )
做一个恢复,然后用Future.sequence把它变成一个Future:
val oneFut = Future.sequence(seqOfFtrs.map(f=>f.map(Some(_)).recover{ case (ex: Throwable) => None})).map(_.flatten)
如果您不了解 Some、None 和 flatten 的业务,请确保您了解 Option 类型。从序列中删除值的一种方法是将序列中的值映射到 Option(Some 或 None),然后展平序列。 None 值被删除,一些值被展开。
将数据转换为单个 Future 后,将其通过管道传递给 FLPActor:
oneFut pipeTo flp
应使用以下接收函数重写 FLPActor:
def receive = {
case printme: IndexedSeq[Int] => println(printme)
}
在 Akka 中,从 Future 或 Future 的 onComplete 修改 actor 主线程中的某些状态是一个很大的禁忌。在最坏的情况下,它会导致竞争条件。请记住,每个 Future 都在自己的线程上运行,因此 运行 actor 中的 Future 意味着您可以在不同的线程中完成并发工作。让 Future 直接修改 actor 中的某些状态,而 actor 也在处理某些状态,这是灾难的根源。在 Akka 中,您直接在主要执行者的主要执行线程中处理对状态的所有更改。如果您在 Future 中完成了一些工作并且需要从 actor 的主线程访问该工作,您可以将其通过管道传递给该 actor。 pipeTo 模式对于访问 Future 的已完成计算是功能性的、正确的和安全的。
回答关于为什么 FLPActor 没有正确打印出 IndexedSeq 的问题:您在 Futures 完成之前打印出 ArrayBuffer。 onComplete 不是在这种情况下使用的正确习惯用法,您通常应该避免使用它,因为它不是好的功能样式。
不要忘记 pipeTo 语法的导入 akka.pattern.pipe。