Akka http return 来自指令的异步响应 (java)

Akka http return async response from directive (java)

我想要一个可通过 http 访问的基于 actor 的服务。
我是 akka 的新手,但经过研究后我得出结论,使用 akka http 对我来说是最好的方法。

我正在实现一个使用 akka http 的服务器,如下所示:

class MyServer extends AllDirectives {
    private final Http http;
    private final Materializer materializer;
    private final Flow<HttpRequest, HttpResponse, NotUsed> routes;

    private ServerBinding binding;

    MyServer(final ActorSystem system) {
        this.http = Http.get(system);
        this.materializer = ActorMaterializer.create(system);
        this.routes = this.createRoutes().flow(system, materializer);
    }

    CompletionStage<ServerBinding> start() {
        return this.http
                .bindAndHandle(this.routes, ConnectHttp.toHost("localhost", DEFAULT_PORT), this.materializer)
                .thenApplyAsync(serverBinding -> {
                    this.binding = serverBinding;
                    return serverBinding;
                });
    }

    CompletionStage<BoxedUnit> stop() {
        return this.binding.unbind();
    }

    private Route createRoutes() {
        // create routes 
    }
}

createRoutes 方法中,我想使用指令创建分层路由树(或森林),例如:

private Route createRoutes() {
    return pathPrefix("typeOne", () ->
                path(PathMatchers.segment(), id ->
                    get(() -> handleTypeOneGetRequest(id))
                    .orElse(post(() -> handleTypeOnePostRequest(id, requestData)))
                )
            ).orElse(pathPrefix("typeTwo", () ->
                path(PathMatchers.segment(), id ->
                        get(() -> handleTypeTwoGetRequest(id))
                        .orElse(post(() -> handleTypeTwoPostRequest(id, requestData)))
                )
    ));
}

处理请求时(例如handleTypeOneGetRequest)我想将消息传递给适当的参与者(例如TypeOne)和return一个异步响应当来自参与者的响应消息 return 时完成。

我的问题是在处理指令时我不知道如何访问 RequestContext

我的问题是:

不太确定我是否答对了你的问题,但据我所知,你基本上想让演员处理你的请求,然后 return 得到结果。我通常使用 Scala,所以请不要介意我在您的 Java8 代码中可能出现的语法错误 ;)

我通常使用 akka ask 模式来检索参与者发送的响应对象的未来,然后使用 onSuccess 指令从未来提取实际值。 由于默认情况下参与者的响应是无类型的,因此您需要检查它是否实际是您所期望的,然后您可以使用该值完成您的请求。 (您可能还需要转换对象,以便您的响应编组器工作)。

Timeout timeout = new Timeout(Duration.create(5, "seconds"));    

private Route createRoutes() {
    return pathPrefix("typeOne", () ->
                path(PathMatchers.segment(), id ->
                    get(() -> 
                            onSuccess(() -> Patterns.ask(actor, id, timeout),
                                      extraction -> if (extraction instanceof WhatEverYouExpect) complete(extraction)
                                     )
                        )
    ));
}

询问模式:http://doc.akka.io/docs/akka/snapshot/java/futures.html

onSuccess 指令:http://doc.akka.io/docs/akka/snapshot/java/http/routing-dsl/directives/future-directives/onSuccess.html

编辑

您应该看看 extract* 指令: http://doc.akka.io/docs/akka/2.4.7/java/http/routing-dsl/directives/alphabetically.html

具体摘录: http://doc.akka.io/docs/akka/2.4.7/java/http/routing-dsl/directives/basic-directives/extract.html#extract-java

final Route route = extract(
  ctx -> ctx.getRequest().getUri().toString().length() // extract anything you need and pack into your object
  len -> //use your object
);

我想这就是你需要的。


编辑(由 OP)

这是该问题的完整解决方案,包括 HttpContext、异步响应的 Actor 用法以及如何使其在 java 中工作:

pathPrefix("typeOne", () ->
    path(PathMatchers.segment(), segment ->
        get(() ->
            extract(
                context -> context,
                context -> onSuccess(() -> 
                    FutureConverters.toJava(Patterns.ask(ACTOR_REF, MESSAGE, TIMEOUT)),
                    extraction -> complete(DO_SOMETHING_WITH(segment, context, extraction))
                ).orElse(...)
            )
        )
    )
)