Java 泽西岛中的异步

Async in Java Jersey

我有一些代码:

1- 通过 REST 调用接收到一些数据 (POST);

2- 根据该数据执行了一些逻辑;

3-返回结果。

为了这个问题,我们假设它是一个简单的计算器 webapi,允许其客户端执行加法和减法。它看起来像这样:

@Path("/calculator")
public class Calculator {

    @POST
    @Path("addition")
    @Consumes(MediaType.APPLICATION_JSON)
    @Produces(MediaType.APPLICATION_JSON)
    public Response add(String request){
        //Getting A and B from request
        ...
        //Calculating result
        int res = a + b;
        //Creating response payload
        JSONObject res = new JSONObject.put("result",res);

        return Response.status(Response.Status.OK).entity(res.toString()).build();
    }

    @POST
    @Path("subtract")
    @Consumes(MediaType.APPLICATION_JSON)
    @Produces(MediaType.APPLICATION_JSON)
    public Response sub(String request){
        //Getting A and B from request
        ...
        //Calculating result
        int res = a - b;
        //Creating response payload
        JSONObject res = new JSONObject.put("result",res);

        return Response.status(Response.Status.OK).entity(res.toString()).build();
    }
}

一切都很好,直到我意识到我无法并行执行更多的计算,因为所有请求都访问一个唯一的资源,一次只能由其中一个使用。

因此,为了这个示例,我们假设我们有一个计算器,并且所有请求的计算都必须由同一个计算器处理器执行。

在我看来,我认为我需要类似 "CalculatorProcessor" 的东西,它接收来自所有计算器 webapi 客户端的请求:

1- 收到请求;

2- 队列请求;

3- 出列请求;

4- 执行计算;

5- Returns 使用回调的结果。

在 Java 中,这对我来说有点微不足道,但我不知道在 Java 泽西岛的背景下我应该怎么做. 例如... 我怎样才能回到 Calculator.add() 和 Calculator.sub() 方法,以便我可以发送 http 请求响应? 有人可以请教我吗?

这是我对此类组件的 java 实现:

import java.util.concurrent.ConcurrentLinkedQueue;

//IMPLEMENTS SINGLETON PATTERN
public class Calculator {

private static Calculator instance = null;
private ConcurrentLinkedQueue<Request> queue = null;
private Runnable processor = null;

//PRIVATE CONSTRUCTOR
private Calculator() {
    queue = new ConcurrentLinkedQueue<>();
}

//GET CALCULATOR INSTANCE
static public Calculator getInstance() {
    if (instance == null) {
        instance = new Calculator();
    }
    return instance;
}

//REQUEST COMPUTATION
public synchronized void requestComputation(CalculatorCallback c, SupportedOperations o, int a, int b) {
    //Adds request to queue
    queue.add(new Request(c, o, a, b));

    //Checks if there's an active processor
    if (processor == null) {
        //Launches a new processor if there isn't
        Runnable p = new CalculatorProcessor(queue);
        new Thread(p).start();
    }
}

//CALLBACK INTERFACE
public interface CalculatorCallback {
    void computationReady(int result);
}


//SUPPORTED OPERATIONS ENUMERATION
protected enum SupportedOperations {
    ADDITION, SUBTRACTION;
}

//CLASS THAT REPRESENTS A REQUEST
private class Request {

    final SupportedOperations operation;
    final CalculatorCallback callback;
    final int a;
    final int b;

    public Request(CalculatorCallback c, SupportedOperations operation, int a, int b) {
        this.callback = c;
        this.operation = operation;
        this.a = a;
        this.b = b;
    }
}

//CALCULATOR PROCESSOR THREAD
class CalculatorProcessor implements Runnable {

    final private ConcurrentLinkedQueue<Calculator.Request> queue;

    public CalculatorProcessor(ConcurrentLinkedQueue<Calculator.Request> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        Calculator.Request current;
        int result;
        while (!queue.isEmpty()) {
            //Gets head
            current = queue.poll();

            if (current.operation == Calculator.SupportedOperations.ADDITION) {
                result = current.a + current.b;
            } else if (current.operation == Calculator.SupportedOperations.SUBTRACTION) {
                result = current.a - current.b;
            } else {
                throw new UnsupportedOperationException();
            }
            //Calls back the requester
            current.callback.computationReady(result);
        }
    }
}
}

这是 CalculatorClient 代码:

public class CalculatorClient implements Calculator.CalculatorCallback {

public static void main(String[] args) {

    CalculatorClient client = new CalculatorClient();
    Random random = new Random();
    int a, b;

    for (int i = 0; i < 1000; i++) {
        a = random.nextInt(Integer.MAX_VALUE/2);
        b = random.nextInt(Integer.MAX_VALUE/2);
        System.out.println("Requesting "+a+" + "+b+"...");
        Calculator.getInstance().requestComputation(client, Calculator.SupportedOperations.ADDITION,a,b);
    }
}

@Override
public void computationReady(int result) {
    System.out.println("Result is: "+result);
}

}

如果您使用的是 Jersey 2,则可以使用其 Asynchronous processing feature。您可以只将 AsyncResponse 传递给计算任务,该任务将 resume 完成处理后的响应。

@POST
public void add(@Suspended AysncResponse response, String body) {
    Calculator.getInstance().requestComputation(
            client, 
            Calculator.SupportedOperations.ADDITION,
            a,b,
            response);

    // you don't need to return anything from the resource method
    // calling `response.resume(someResponse)` (from inside the task)
    // is enough. That is why this method just returns `void`
}

使用异步功能的好处是,如果处理需要很长时间,您就不会像尝试使用某种块机制(如 CountDownLatch 或阻塞)那样阻塞服务器线程队列,或类似的东西。服务器线程会立即返回给服务器,以便它可以处理更多请求。