使用 Future.map 组合器时的 Future(<not completed>)
Future(<not completed>) when Future.map Combinator Used
在使用Future.map
处理第一个Future
之后,一旦Future.onComplete
被回调,第二个依赖Future
总是Future(<not completed>)
。其他使用两个 Future
的结构从不表现出这种行为。
谁能解释为什么 Future.onComplete
被调用,尽管未来显然还不完整?
使用 Scala 2.12.3。如果需要,剪切并粘贴以下内容以评估此问题。
import scala.collection.mutable.ListBuffer
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.util.{Failure, Success}
object FutureNotCompleted {
def threadNumber: String = f"${Thread.currentThread().getId.toInt}%2d"
/////////////////////////////////////////////////////
// Future completion control
/////////////////////////////////////////////////////
var futures: ListBuffer[Future[Any]] = ListBuffer()
def add(future: Future[Any]): Unit = synchronized(futures += future)
def remove(future: Future[Any]): Unit = synchronized(futures = futures.filter(_ != future))
def loopTillCompleted: Unit = {
var futuresOnList = true;
while (futuresOnList) {
Thread.sleep(100)
for (future <- futures) {
if (future.isCompleted) {
future.value.get match {
case Success(v) => println(s"${threadNumber} Success: ${v}")
case Failure(e) => println(s"${threadNumber} Error: ${e}")
}
remove(future)
}
}
if (futures.size == 0) futuresOnList = false
}
}
/////////////////////////////////////////////////////
// Future factory
/////////////////////////////////////////////////////
def createRegisteredFuture: Future[Int] = {
val future = createFuture
add(future)
future
}
def createFuture: Future[Int] = Future {
val i = (Math.random() * 1000).toInt
println(s"${threadNumber} Future work start: ${i}")
Thread.sleep((Math.random() * 1000).toLong)
println(s"${threadNumber} Future work stop: ${i}")
if (Math.random > 0.7) throw new RuntimeException(s"${threadNumber} Error for ${i}")
i
}
/////////////////////////////////////////////////////
// Functions exhibiting Future use conditions
/////////////////////////////////////////////////////
def futureDoesNotComplete: Unit = {
val f1 = createRegisteredFuture
val f2 = f1.map {
i => createRegisteredFuture
}
// This is never completed at the time the 'onComplete' callback is called
f2.onComplete({
case Success(j) => println(s"${threadNumber} j: ${j} ")
case Failure(e) => println(s"${threadNumber} f2 Failure: ${e}")
})
loopTillCompleted
println(s"${threadNumber} All done.")
}
def futureCompletes: Unit = {
val f1 = createRegisteredFuture
f1.onComplete({
case Success(i) => {
val f2 = createRegisteredFuture
f2.onComplete({
case Success(j) => println(s"${threadNumber} i: ${i} j: ${j} ${i}+${j}=${i + j}")
case Failure(e) => println(s"${threadNumber} f2 Failure: ${e}")
})
}
case Failure(e) => println(s"${threadNumber} f1 Failure: ${e}")
})
loopTillCompleted
println(s"${threadNumber} All done.")
}
def futureCompletesFor: Unit = {
for {
f1 <- createRegisteredFuture
f2 <- createRegisteredFuture
} yield {
println(s"f1: ${f1} f2: ${f2}: f1+f2=${f1+f2}")
}
loopTillCompleted
println(s"${threadNumber} All done.")
}
def main(a: Array[String]): Unit = {
// futureCompletes
futureDoesNotComplete
// futureCompletesFor
}
}
我不是 100% 确定这里所需的行为到底是什么,但我会在这里尝试 flatMap,我认为你真的想要 onComplete 整个未来链。如果这是问题所在,那么你只是在完成外部未来,这就是我所怀疑的,因为当你使用 for (正在工作)时,它与 flatMap 的作用相同。
10 Future work start: 791
10 Future work stop: 791
11 Future work start: 819
11 Future work stop: 819
11 j: 819
1 Success: 791
1 Success: 819
1 All done.
我会在这里将 map 更改为 flatMap,因为 createRegisteredFuture 会创建一个未来。
val f2 = f1.flatMap {
i => createRegisteredFuture
}
在使用Future.map
处理第一个Future
之后,一旦Future.onComplete
被回调,第二个依赖Future
总是Future(<not completed>)
。其他使用两个 Future
的结构从不表现出这种行为。
谁能解释为什么 Future.onComplete
被调用,尽管未来显然还不完整?
使用 Scala 2.12.3。如果需要,剪切并粘贴以下内容以评估此问题。
import scala.collection.mutable.ListBuffer
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.util.{Failure, Success}
object FutureNotCompleted {
def threadNumber: String = f"${Thread.currentThread().getId.toInt}%2d"
/////////////////////////////////////////////////////
// Future completion control
/////////////////////////////////////////////////////
var futures: ListBuffer[Future[Any]] = ListBuffer()
def add(future: Future[Any]): Unit = synchronized(futures += future)
def remove(future: Future[Any]): Unit = synchronized(futures = futures.filter(_ != future))
def loopTillCompleted: Unit = {
var futuresOnList = true;
while (futuresOnList) {
Thread.sleep(100)
for (future <- futures) {
if (future.isCompleted) {
future.value.get match {
case Success(v) => println(s"${threadNumber} Success: ${v}")
case Failure(e) => println(s"${threadNumber} Error: ${e}")
}
remove(future)
}
}
if (futures.size == 0) futuresOnList = false
}
}
/////////////////////////////////////////////////////
// Future factory
/////////////////////////////////////////////////////
def createRegisteredFuture: Future[Int] = {
val future = createFuture
add(future)
future
}
def createFuture: Future[Int] = Future {
val i = (Math.random() * 1000).toInt
println(s"${threadNumber} Future work start: ${i}")
Thread.sleep((Math.random() * 1000).toLong)
println(s"${threadNumber} Future work stop: ${i}")
if (Math.random > 0.7) throw new RuntimeException(s"${threadNumber} Error for ${i}")
i
}
/////////////////////////////////////////////////////
// Functions exhibiting Future use conditions
/////////////////////////////////////////////////////
def futureDoesNotComplete: Unit = {
val f1 = createRegisteredFuture
val f2 = f1.map {
i => createRegisteredFuture
}
// This is never completed at the time the 'onComplete' callback is called
f2.onComplete({
case Success(j) => println(s"${threadNumber} j: ${j} ")
case Failure(e) => println(s"${threadNumber} f2 Failure: ${e}")
})
loopTillCompleted
println(s"${threadNumber} All done.")
}
def futureCompletes: Unit = {
val f1 = createRegisteredFuture
f1.onComplete({
case Success(i) => {
val f2 = createRegisteredFuture
f2.onComplete({
case Success(j) => println(s"${threadNumber} i: ${i} j: ${j} ${i}+${j}=${i + j}")
case Failure(e) => println(s"${threadNumber} f2 Failure: ${e}")
})
}
case Failure(e) => println(s"${threadNumber} f1 Failure: ${e}")
})
loopTillCompleted
println(s"${threadNumber} All done.")
}
def futureCompletesFor: Unit = {
for {
f1 <- createRegisteredFuture
f2 <- createRegisteredFuture
} yield {
println(s"f1: ${f1} f2: ${f2}: f1+f2=${f1+f2}")
}
loopTillCompleted
println(s"${threadNumber} All done.")
}
def main(a: Array[String]): Unit = {
// futureCompletes
futureDoesNotComplete
// futureCompletesFor
}
}
我不是 100% 确定这里所需的行为到底是什么,但我会在这里尝试 flatMap,我认为你真的想要 onComplete 整个未来链。如果这是问题所在,那么你只是在完成外部未来,这就是我所怀疑的,因为当你使用 for (正在工作)时,它与 flatMap 的作用相同。
10 Future work start: 791
10 Future work stop: 791
11 Future work start: 819
11 Future work stop: 819
11 j: 819
1 Success: 791
1 Success: 819
1 All done.
我会在这里将 map 更改为 flatMap,因为 createRegisteredFuture 会创建一个未来。
val f2 = f1.flatMap {
i => createRegisteredFuture
}