如何在循环中调用 Akka Http POST(1000-10000 次)?

How to call Akka Http POST in loop (1000-10000 times)?

我正在 Java 学习 Akka。我用两个演员写了一个简单的程序。

我的第一个演员 ActorA 使用包含 1000 个字符串的列表调用。 ActorA 遍历列表并为每个元素调用 ActorB

ActorB 使用从 ActorA.[=14= 收到的字符串参数对外部服务进行 Http POST 调用]

我预计 ActorB 将成功进行 1000 个 Http POST 调用并收到相同数量的响应。但是 ActorB 能够在 80-120 次之间随机发出 POST 请求,然后停止发出 POST 调用。

我尝试提供一个自定义调度程序,因为 HTTP POST 调用是一个阻塞操作,但仍然没有成功!

参考下面给出的代码和配置。

public class ActorA extends AbstractActor {
static public Props props() {
    return Props.create(ActorA.class);
}


static public class IdWrapper {
    List<String> ids;

    public IdWrapper(List<String> ids) {
        this.ids = ids;
    }
}


@Override
public Receive createReceive() {
    return receiveBuilder()
            .match(IdWrapper.class, this::process)
            .build();
}

private void process(IdWrapper msg) {
    msg.ids.forEach(id -> {
        context().actorSelection("actorB").tell(new MessageForB(id), ActorRef.noSender());
        }
    );
}

}

public class ActorB extends AbstractActor {   

final Http http = Http.get(getContext().system());
final Materializer materializer = ActorMaterializer.create(context());    

public static Props props() {
    return Props.create(ActorB.class);
}

static public class MessageForB implements Serializable {
    String id;

    public MessageForB(String id) {
        this.id = id;
    }
}


@Override
public Receive createReceive() {
    return receiveBuilder()
            .match(MessageForB.class, this::process)
            .build();
}

private void process(MessageForB messageForB) {

    ExecutionContext ec = getContext().getSystem().dispatchers().lookup("my-blocking-dispatcher");
    /**
     * Get id from request
     */
    String reqId = messageForB.id;

    /**
     * Prepare request
     */
    XmlRequest requestEntity = getRequest(Stream.of(reqId).collect(Collectors.toList()));
    String requestAsString = null;


    try {
        /**
         * Create and configure JAXBMarshaller.
         */
        JAXBContext jaxbContext = JAXBContext.newInstance(XmlRequest.class);
        Marshaller jaxbMarshaller = jaxbContext.createMarshaller();
        jaxbMarshaller.setProperty(Marshaller.JAXB_FRAGMENT, Boolean.TRUE);

        /**
         * Convert request entity to string before making POST request.
         */
        StringWriter sw = new StringWriter();
        jaxbMarshaller.marshal(requestEntity, sw);
        requestAsString = sw.toString();

    } catch (JAXBException e) {
        e.printStackTrace();
    }


    /**
     * Create RequestEntity from request string.
     */
    RequestEntity entity = HttpEntities.create(
            MediaTypes.APPLICATION_XML.toContentType(HttpCharsets.ISO_8859_1),
            requestAsString);

    /**
     * Create Http POST with necessary headers and call
     */
    final CompletionStage<HttpResponse> responseFuture =
            http.singleRequest(HttpRequest.POST("http://{hostname}:{port}/path")
                    .withEntity(entity));

    responseFuture
            .thenCompose(httpResponse -> {
                /**
                 * Convert response into String
                 **/
                final CompletionStage<String> res = Unmarshaller.entityToString().unmarshal
                        (httpResponse.entity(), ec, materializer);
                /**
                 * Consume response bytes
                 **/
                httpResponse.entity().getDataBytes().runWith(Sink.ignore(), materializer);
                return res;

            })
            .thenAccept(s -> {

                try {
                    /**
                     * Deserialize string to DTO.
                     */
                    MyResponse MyResponse = getMyResponse(s);

                    // further processing..

                } catch (JAXBException e) {
                    e.printStackTrace();
                }
            });
}

private XmlRequest getRequest(List<String> identifiers){
    XmlRequest request = new XmlRequest();
    // Business logic to create req entity
    return request;
}

    private MyResponse getMyResponse(String s) throws JAXBException {
    JAXBContext jaxbContext = JAXBContext.newInstance
            (MyResponse.class);
    javax.xml.bind.Unmarshaller jaxbUnmarshaller = jaxbContext
            .createUnmarshaller();
    StringReader reader = new StringReader(s);
    return (MyResponse)
            jaxbUnmarshaller.unmarshal(reader);
}

}

my-blocking-dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = 5
core-pool-size-max = 20
}
throughput = 1
}

我在哪里可以改进或更正我的代码,以便 ActorB 能够成功地对 发送的所有项目进行 Http POST 调用演员A ?

据我所知,您使用了 http.singleReques

根据 akka-http docs

For these cases Akka HTTP offers the Http().singleRequest(...) method, which simply turns an HttpRequest instance into Future[HttpResponse]. Internally the request is dispatched across the (cached) host connection pool for the request’s effective URI.

http.singleRequest 使用连接池来处理请求,因此您需要从 akka http config.

增加连接池中的连接数

在具有此默认值的主机连接池部分:

host-connection-pool {
  max-connections = 4
  min-connections = 0
  max-retries = 5
  max-open-requests = 32
  pipelining-limit = 1
  idle-timeout = 30 s
}

解决方案 2:

使用http.outgoingConnection

根据 akka-http docs 它将为每个请求创建一个特定的连接。所以你可以在没有连接池的情况下并行处理1000个连接。

With the connection-level API you open a new HTTP connection to a target endpoint by materializing a Flow returned by the Http().outgoingConnection(...) method. Here is an example:

def run(req:String): Unit ={
  val apiBaseUrl = "example.com" //without protocol
  val path = "/api/update"
  val body = HttpEntity(ContentTypes.`application/json`,req.getBytes)
  val request = HttpRequest(HttpMethods.POST, path,entity = body)
  val connectionFlow = Http().outgoingConnection(apiBaseUrl)
  val result =   Source.single(request).via(connectionFlow).runWith(Sink.head)
  result.onComplete{
    case Success(value) =>
      println(value)
    case Failure(e)=>
      e.printStackTrace()
  }
}