flink 增加异步操作的并行度
flink increase parallelism of async operation
我们有 AsyncFunction,异步操作是使用 akka http client
完成的
class Foo[A,B] extends AsyncFunction[A, B] with {
val akkaConfig = ConfigFactory.load()
implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor())
implicit lazy val system = ActorSystem("MyActorSystem", akkaConfig)
implicit lazy val materializer = ActorMaterializer()
def postReq(uriStr: String, str: String): Future[HttpResponse] = {
Http().singleRequest(HttpRequest(
method = HttpMethods.POST,
uri = uriStr,
entity = HttpEntity(ContentTypes.`application/json`, str))
)
}
override def asyncInvoke(input: A, resultFuture: ResultFuture[B]) : Unit = {
val resultFutureRequested: Future[HttpResponse] = postReq(...)
//the rest of the class ...
问题:
- 如果我想增加 http 请求的并行性 - 我应该使用 akka 配置来实现还是有办法通过 flink.yamel
进行配置
- 既然 Flink 也在使用 akka,那么创建
ActorSystem
和 ExecutionContext
的正确方法是什么?
关于第一个问题,您有三种不同的设置会影响性能和实际执行的请求数:
- 并行性,这将导致 Flink 创建 Your
AsyncFunction
的多个实例,包括 Your HttpClient
. 的多个实例
- 函数本身的并发请求数。当你调用
orderedWait
或unorderedWait
时你应该在函数中提供capacity
,这将限制并发请求数。
- 您的 Http 客户端的实际设置。
如您所见,2. 和 3. 点是相连的,因为 Flink 可以限制可能的并发请求数,所以有时您的 Http Client 设置中的更改可能不会产生影响,因为requests 受 Flink 自身限制。
增加 AsyncFunction
的吞吐量视情况而定。您需要记住 AsyncFunction
是单线程调用。这基本上意味着如果您调用的服务的响应时间很长,您将简单地阻止等待响应的请求数量,因此唯一的方法是增加 parallelism'
。但是,通常更改函数的 HttpClient
和 capacity
的设置应该可以让您获得更好的吞吐量。
至于第二个问题,我认为创建多个 ActorSystems
没有问题。您可以在[此处]看到类似问题的回答。
我们有 AsyncFunction,异步操作是使用 akka http client
完成的class Foo[A,B] extends AsyncFunction[A, B] with {
val akkaConfig = ConfigFactory.load()
implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor())
implicit lazy val system = ActorSystem("MyActorSystem", akkaConfig)
implicit lazy val materializer = ActorMaterializer()
def postReq(uriStr: String, str: String): Future[HttpResponse] = {
Http().singleRequest(HttpRequest(
method = HttpMethods.POST,
uri = uriStr,
entity = HttpEntity(ContentTypes.`application/json`, str))
)
}
override def asyncInvoke(input: A, resultFuture: ResultFuture[B]) : Unit = {
val resultFutureRequested: Future[HttpResponse] = postReq(...)
//the rest of the class ...
问题:
- 如果我想增加 http 请求的并行性 - 我应该使用 akka 配置来实现还是有办法通过 flink.yamel 进行配置
- 既然 Flink 也在使用 akka,那么创建
ActorSystem
和ExecutionContext
的正确方法是什么?
关于第一个问题,您有三种不同的设置会影响性能和实际执行的请求数:
- 并行性,这将导致 Flink 创建 Your
AsyncFunction
的多个实例,包括 YourHttpClient
. 的多个实例
- 函数本身的并发请求数。当你调用
orderedWait
或unorderedWait
时你应该在函数中提供capacity
,这将限制并发请求数。 - 您的 Http 客户端的实际设置。
如您所见,2. 和 3. 点是相连的,因为 Flink 可以限制可能的并发请求数,所以有时您的 Http Client 设置中的更改可能不会产生影响,因为requests 受 Flink 自身限制。
增加 AsyncFunction
的吞吐量视情况而定。您需要记住 AsyncFunction
是单线程调用。这基本上意味着如果您调用的服务的响应时间很长,您将简单地阻止等待响应的请求数量,因此唯一的方法是增加 parallelism'
。但是,通常更改函数的 HttpClient
和 capacity
的设置应该可以让您获得更好的吞吐量。
至于第二个问题,我认为创建多个 ActorSystems
没有问题。您可以在[此处]看到类似问题的回答。