akka 流将 akka-htpp web 请求调用集成到流中
akka stream integrating akka-htpp web request call into stream
Akka Streams 入门我想执行一个简单的计算。通过调用 restful web api:
扩展基本的快速入门 https://doc.akka.io/docs/akka/2.5/stream/stream-quickstart.html
val source: Source[Int, NotUsed] = Source(1 to 100)
source.runForeach(println)
已经可以很好地打印数字了。但是当尝试创建一个 Actor 来执行 HTTP 请求时(这实际上是必要的吗?)根据 https://doc.akka.io/docs/akka/2.5.5/scala/stream/stream-integrations.html
import akka.pattern.ask
implicit val askTimeout = Timeout(5.seconds)
val words: Source[String, NotUsed] =
Source(List("hello", "hi"))
words
.mapAsync(parallelism = 5)(elem => (ref ? elem).mapTo[String])
// continue processing of the replies from the actor
.map(_.toLowerCase)
.runWith(Sink.ignore)
我无法编译它,因为未定义 ?
运算符。据我所知,这个只能在演员内部定义。
我也不明白 mapAsync
我的自定义 actor 到底在什么地方需要被调用。
编辑
https://blog.colinbreck.com/backoff-and-retry-error-handling-for-akka-streams/ 至少包含示例的一部分。
看起来创建演员不是强制性的,即
implicit val system = ActorSystem()
implicit val ec = system.dispatcher
implicit val materializer = ActorMaterializer()
val source = Source(List("232::03::14062::19965186", "232::03::14062::19965189"))
.map(cellKey => {
val splits = cellKey.split("::")
val mcc = splits(0)
val mnc = splits(1)
val lac = splits(2)
val ci = splits(3)
CellKeySource(cellKey, mcc, mnc, lac, ci)
})
.limit(2)
.mapAsyncUnordered(2)(ck => getResponse(ck.cellKey, ck.mobileCountryCode, ck.mobileNetworkCode, ck.locationArea, ck.cellKey)("<<myToken>>"))
def getResponse(cellKey: String, mobileCountryCode:String, mobileNetworkCode:String, locationArea:String, cellId:String)(token:String): Future[String] = {
RestartSource.withBackoff(
minBackoff = 10.milliseconds,
maxBackoff = 30.seconds,
randomFactor = 0.2,
maxRestarts = 2
) { () =>
val responseFuture: Future[HttpResponse] =
Http().singleRequest(HttpRequest(uri = s"https://www.googleapis.com/geolocation/v1/geolocate?key=${token}", entity = ByteString(
// TODO use proper JSON objects
s"""
|{
| "cellTowers": [
| "mobileCountryCode": $mobileCountryCode,
| "mobileNetworkCode": $mobileNetworkCode,
| "locationAreaCode": $locationArea,
| "cellId": $cellId,
| ]
|}
""".stripMargin)))
Source.fromFuture(responseFuture)
.mapAsync(parallelism = 1) {
case HttpResponse(StatusCodes.OK, _, entity, _) =>
Unmarshal(entity).to[String]
case HttpResponse(statusCode, _, _, _) =>
throw WebRequestException(statusCode.toString() )
}
}
.runWith(Sink.head)
.recover {
case _ => throw StreamFailedAfterMaxRetriesException()
}
}
val done: Future[Done] = source.runForeach(println)
done.onComplete(_ ⇒ system.terminate())
已经是问题的(部分)答案,即如何集成 Akka-streams + akka-http。但是,它不起作用,即只抛出错误 400 并且永远不会终止。
您必须从 akka 导入询问模式。
进口akka.pattern.ask
编辑:好的,抱歉,我看到您已经导入了。您的代码中的 ref 是什么? ActorRef?
我想你已经找到了 api 如何调用 akka-http 客户端
关于您的第一个无效代码片段。我认为这个例子本身发生了一些误解。您希望示例中的代码在复制后能够正常工作。但该文档的意图只是演示一个 example/concept,如何将一些长 运行 任务从流中委派出去,然后在准备就绪时使用结果。为此,使用了 ask
调用 akka actor,因为调用 ask
方法 returns 一个 Future
。可能文档的作者只是省略了演员的定义。你可以试试这个例子:
import java.lang.System.exit
import akka.NotUsed
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import akka.pattern.ask
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import akka.util.Timeout
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.language.higherKinds
object App extends scala.App {
implicit val sys: ActorSystem = ActorSystem()
implicit val mat: ActorMaterializer = ActorMaterializer()
val ref: ActorRef = sys.actorOf(Props[Translator])
implicit val askTimeout: Timeout = Timeout(5.seconds)
val words: Source[String, NotUsed] = Source(List("hello", "hi"))
words
.mapAsync(parallelism = 5)(elem => (ref ? elem).mapTo[String])
.map(_.toLowerCase)
.runWith(Sink.foreach(println))
.onComplete(t => {
println(s"finished: $t")
exit(1)
})
}
class Translator extends Actor {
override def receive: Receive = {
case msg => sender() ! s"$msg!"
}
}
Akka Streams 入门我想执行一个简单的计算。通过调用 restful web api:
扩展基本的快速入门 https://doc.akka.io/docs/akka/2.5/stream/stream-quickstart.htmlval source: Source[Int, NotUsed] = Source(1 to 100)
source.runForeach(println)
已经可以很好地打印数字了。但是当尝试创建一个 Actor 来执行 HTTP 请求时(这实际上是必要的吗?)根据 https://doc.akka.io/docs/akka/2.5.5/scala/stream/stream-integrations.html
import akka.pattern.ask
implicit val askTimeout = Timeout(5.seconds)
val words: Source[String, NotUsed] =
Source(List("hello", "hi"))
words
.mapAsync(parallelism = 5)(elem => (ref ? elem).mapTo[String])
// continue processing of the replies from the actor
.map(_.toLowerCase)
.runWith(Sink.ignore)
我无法编译它,因为未定义 ?
运算符。据我所知,这个只能在演员内部定义。
我也不明白 mapAsync
我的自定义 actor 到底在什么地方需要被调用。
编辑
https://blog.colinbreck.com/backoff-and-retry-error-handling-for-akka-streams/ 至少包含示例的一部分。 看起来创建演员不是强制性的,即
implicit val system = ActorSystem()
implicit val ec = system.dispatcher
implicit val materializer = ActorMaterializer()
val source = Source(List("232::03::14062::19965186", "232::03::14062::19965189"))
.map(cellKey => {
val splits = cellKey.split("::")
val mcc = splits(0)
val mnc = splits(1)
val lac = splits(2)
val ci = splits(3)
CellKeySource(cellKey, mcc, mnc, lac, ci)
})
.limit(2)
.mapAsyncUnordered(2)(ck => getResponse(ck.cellKey, ck.mobileCountryCode, ck.mobileNetworkCode, ck.locationArea, ck.cellKey)("<<myToken>>"))
def getResponse(cellKey: String, mobileCountryCode:String, mobileNetworkCode:String, locationArea:String, cellId:String)(token:String): Future[String] = {
RestartSource.withBackoff(
minBackoff = 10.milliseconds,
maxBackoff = 30.seconds,
randomFactor = 0.2,
maxRestarts = 2
) { () =>
val responseFuture: Future[HttpResponse] =
Http().singleRequest(HttpRequest(uri = s"https://www.googleapis.com/geolocation/v1/geolocate?key=${token}", entity = ByteString(
// TODO use proper JSON objects
s"""
|{
| "cellTowers": [
| "mobileCountryCode": $mobileCountryCode,
| "mobileNetworkCode": $mobileNetworkCode,
| "locationAreaCode": $locationArea,
| "cellId": $cellId,
| ]
|}
""".stripMargin)))
Source.fromFuture(responseFuture)
.mapAsync(parallelism = 1) {
case HttpResponse(StatusCodes.OK, _, entity, _) =>
Unmarshal(entity).to[String]
case HttpResponse(statusCode, _, _, _) =>
throw WebRequestException(statusCode.toString() )
}
}
.runWith(Sink.head)
.recover {
case _ => throw StreamFailedAfterMaxRetriesException()
}
}
val done: Future[Done] = source.runForeach(println)
done.onComplete(_ ⇒ system.terminate())
已经是问题的(部分)答案,即如何集成 Akka-streams + akka-http。但是,它不起作用,即只抛出错误 400 并且永远不会终止。
您必须从 akka 导入询问模式。
进口akka.pattern.ask
编辑:好的,抱歉,我看到您已经导入了。您的代码中的 ref 是什么? ActorRef?
我想你已经找到了 api 如何调用 akka-http 客户端
关于您的第一个无效代码片段。我认为这个例子本身发生了一些误解。您希望示例中的代码在复制后能够正常工作。但该文档的意图只是演示一个 example/concept,如何将一些长 运行 任务从流中委派出去,然后在准备就绪时使用结果。为此,使用了
ask
调用 akka actor,因为调用ask
方法 returns 一个Future
。可能文档的作者只是省略了演员的定义。你可以试试这个例子:import java.lang.System.exit import akka.NotUsed import akka.actor.{Actor, ActorRef, ActorSystem, Props} import akka.pattern.ask import akka.stream.ActorMaterializer import akka.stream.scaladsl.{Sink, Source} import akka.util.Timeout import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ import scala.language.higherKinds object App extends scala.App { implicit val sys: ActorSystem = ActorSystem() implicit val mat: ActorMaterializer = ActorMaterializer() val ref: ActorRef = sys.actorOf(Props[Translator]) implicit val askTimeout: Timeout = Timeout(5.seconds) val words: Source[String, NotUsed] = Source(List("hello", "hi")) words .mapAsync(parallelism = 5)(elem => (ref ? elem).mapTo[String]) .map(_.toLowerCase) .runWith(Sink.foreach(println)) .onComplete(t => { println(s"finished: $t") exit(1) }) } class Translator extends Actor { override def receive: Receive = { case msg => sender() ! s"$msg!" } }