Java 泽西岛中的异步
Async in Java Jersey
我有一些代码:
1- 通过 REST 调用接收到一些数据 (POST);
2- 根据该数据执行了一些逻辑;
3-返回结果。
为了这个问题,我们假设它是一个简单的计算器 webapi,允许其客户端执行加法和减法。它看起来像这样:
@Path("/calculator")
public class Calculator {
@POST
@Path("addition")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public Response add(String request){
//Getting A and B from request
...
//Calculating result
int res = a + b;
//Creating response payload
JSONObject res = new JSONObject.put("result",res);
return Response.status(Response.Status.OK).entity(res.toString()).build();
}
@POST
@Path("subtract")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public Response sub(String request){
//Getting A and B from request
...
//Calculating result
int res = a - b;
//Creating response payload
JSONObject res = new JSONObject.put("result",res);
return Response.status(Response.Status.OK).entity(res.toString()).build();
}
}
一切都很好,直到我意识到我无法并行执行更多的计算,因为所有请求都访问一个唯一的资源,一次只能由其中一个使用。
因此,为了这个示例,我们假设我们有一个计算器,并且所有请求的计算都必须由同一个计算器处理器执行。
在我看来,我认为我需要类似 "CalculatorProcessor" 的东西,它接收来自所有计算器 webapi 客户端的请求:
1- 收到请求;
2- 队列请求;
3- 出列请求;
4- 执行计算;
5- Returns 使用回调的结果。
在 Java 中,这对我来说有点微不足道,但我不知道在 Java 泽西岛的背景下我应该怎么做.
例如...
我怎样才能回到 Calculator.add() 和 Calculator.sub() 方法,以便我可以发送 http 请求响应?
有人可以请教我吗?
这是我对此类组件的 java 实现:
import java.util.concurrent.ConcurrentLinkedQueue;
//IMPLEMENTS SINGLETON PATTERN
public class Calculator {
private static Calculator instance = null;
private ConcurrentLinkedQueue<Request> queue = null;
private Runnable processor = null;
//PRIVATE CONSTRUCTOR
private Calculator() {
queue = new ConcurrentLinkedQueue<>();
}
//GET CALCULATOR INSTANCE
static public Calculator getInstance() {
if (instance == null) {
instance = new Calculator();
}
return instance;
}
//REQUEST COMPUTATION
public synchronized void requestComputation(CalculatorCallback c, SupportedOperations o, int a, int b) {
//Adds request to queue
queue.add(new Request(c, o, a, b));
//Checks if there's an active processor
if (processor == null) {
//Launches a new processor if there isn't
Runnable p = new CalculatorProcessor(queue);
new Thread(p).start();
}
}
//CALLBACK INTERFACE
public interface CalculatorCallback {
void computationReady(int result);
}
//SUPPORTED OPERATIONS ENUMERATION
protected enum SupportedOperations {
ADDITION, SUBTRACTION;
}
//CLASS THAT REPRESENTS A REQUEST
private class Request {
final SupportedOperations operation;
final CalculatorCallback callback;
final int a;
final int b;
public Request(CalculatorCallback c, SupportedOperations operation, int a, int b) {
this.callback = c;
this.operation = operation;
this.a = a;
this.b = b;
}
}
//CALCULATOR PROCESSOR THREAD
class CalculatorProcessor implements Runnable {
final private ConcurrentLinkedQueue<Calculator.Request> queue;
public CalculatorProcessor(ConcurrentLinkedQueue<Calculator.Request> queue) {
this.queue = queue;
}
@Override
public void run() {
Calculator.Request current;
int result;
while (!queue.isEmpty()) {
//Gets head
current = queue.poll();
if (current.operation == Calculator.SupportedOperations.ADDITION) {
result = current.a + current.b;
} else if (current.operation == Calculator.SupportedOperations.SUBTRACTION) {
result = current.a - current.b;
} else {
throw new UnsupportedOperationException();
}
//Calls back the requester
current.callback.computationReady(result);
}
}
}
}
这是 CalculatorClient 代码:
public class CalculatorClient implements Calculator.CalculatorCallback {
public static void main(String[] args) {
CalculatorClient client = new CalculatorClient();
Random random = new Random();
int a, b;
for (int i = 0; i < 1000; i++) {
a = random.nextInt(Integer.MAX_VALUE/2);
b = random.nextInt(Integer.MAX_VALUE/2);
System.out.println("Requesting "+a+" + "+b+"...");
Calculator.getInstance().requestComputation(client, Calculator.SupportedOperations.ADDITION,a,b);
}
}
@Override
public void computationReady(int result) {
System.out.println("Result is: "+result);
}
}
如果您使用的是 Jersey 2,则可以使用其 Asynchronous processing feature。您可以只将 AsyncResponse
传递给计算任务,该任务将 resume
完成处理后的响应。
@POST
public void add(@Suspended AysncResponse response, String body) {
Calculator.getInstance().requestComputation(
client,
Calculator.SupportedOperations.ADDITION,
a,b,
response);
// you don't need to return anything from the resource method
// calling `response.resume(someResponse)` (from inside the task)
// is enough. That is why this method just returns `void`
}
使用异步功能的好处是,如果处理需要很长时间,您就不会像尝试使用某种块机制(如 CountDownLatch 或阻塞)那样阻塞服务器线程队列,或类似的东西。服务器线程会立即返回给服务器,以便它可以处理更多请求。
我有一些代码:
1- 通过 REST 调用接收到一些数据 (POST);
2- 根据该数据执行了一些逻辑;
3-返回结果。
为了这个问题,我们假设它是一个简单的计算器 webapi,允许其客户端执行加法和减法。它看起来像这样:
@Path("/calculator")
public class Calculator {
@POST
@Path("addition")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public Response add(String request){
//Getting A and B from request
...
//Calculating result
int res = a + b;
//Creating response payload
JSONObject res = new JSONObject.put("result",res);
return Response.status(Response.Status.OK).entity(res.toString()).build();
}
@POST
@Path("subtract")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public Response sub(String request){
//Getting A and B from request
...
//Calculating result
int res = a - b;
//Creating response payload
JSONObject res = new JSONObject.put("result",res);
return Response.status(Response.Status.OK).entity(res.toString()).build();
}
}
一切都很好,直到我意识到我无法并行执行更多的计算,因为所有请求都访问一个唯一的资源,一次只能由其中一个使用。
因此,为了这个示例,我们假设我们有一个计算器,并且所有请求的计算都必须由同一个计算器处理器执行。
在我看来,我认为我需要类似 "CalculatorProcessor" 的东西,它接收来自所有计算器 webapi 客户端的请求:
1- 收到请求;
2- 队列请求;
3- 出列请求;
4- 执行计算;
5- Returns 使用回调的结果。
在 Java 中,这对我来说有点微不足道,但我不知道在 Java 泽西岛的背景下我应该怎么做. 例如... 我怎样才能回到 Calculator.add() 和 Calculator.sub() 方法,以便我可以发送 http 请求响应? 有人可以请教我吗?
这是我对此类组件的 java 实现:
import java.util.concurrent.ConcurrentLinkedQueue;
//IMPLEMENTS SINGLETON PATTERN
public class Calculator {
private static Calculator instance = null;
private ConcurrentLinkedQueue<Request> queue = null;
private Runnable processor = null;
//PRIVATE CONSTRUCTOR
private Calculator() {
queue = new ConcurrentLinkedQueue<>();
}
//GET CALCULATOR INSTANCE
static public Calculator getInstance() {
if (instance == null) {
instance = new Calculator();
}
return instance;
}
//REQUEST COMPUTATION
public synchronized void requestComputation(CalculatorCallback c, SupportedOperations o, int a, int b) {
//Adds request to queue
queue.add(new Request(c, o, a, b));
//Checks if there's an active processor
if (processor == null) {
//Launches a new processor if there isn't
Runnable p = new CalculatorProcessor(queue);
new Thread(p).start();
}
}
//CALLBACK INTERFACE
public interface CalculatorCallback {
void computationReady(int result);
}
//SUPPORTED OPERATIONS ENUMERATION
protected enum SupportedOperations {
ADDITION, SUBTRACTION;
}
//CLASS THAT REPRESENTS A REQUEST
private class Request {
final SupportedOperations operation;
final CalculatorCallback callback;
final int a;
final int b;
public Request(CalculatorCallback c, SupportedOperations operation, int a, int b) {
this.callback = c;
this.operation = operation;
this.a = a;
this.b = b;
}
}
//CALCULATOR PROCESSOR THREAD
class CalculatorProcessor implements Runnable {
final private ConcurrentLinkedQueue<Calculator.Request> queue;
public CalculatorProcessor(ConcurrentLinkedQueue<Calculator.Request> queue) {
this.queue = queue;
}
@Override
public void run() {
Calculator.Request current;
int result;
while (!queue.isEmpty()) {
//Gets head
current = queue.poll();
if (current.operation == Calculator.SupportedOperations.ADDITION) {
result = current.a + current.b;
} else if (current.operation == Calculator.SupportedOperations.SUBTRACTION) {
result = current.a - current.b;
} else {
throw new UnsupportedOperationException();
}
//Calls back the requester
current.callback.computationReady(result);
}
}
}
}
这是 CalculatorClient 代码:
public class CalculatorClient implements Calculator.CalculatorCallback {
public static void main(String[] args) {
CalculatorClient client = new CalculatorClient();
Random random = new Random();
int a, b;
for (int i = 0; i < 1000; i++) {
a = random.nextInt(Integer.MAX_VALUE/2);
b = random.nextInt(Integer.MAX_VALUE/2);
System.out.println("Requesting "+a+" + "+b+"...");
Calculator.getInstance().requestComputation(client, Calculator.SupportedOperations.ADDITION,a,b);
}
}
@Override
public void computationReady(int result) {
System.out.println("Result is: "+result);
}
}
如果您使用的是 Jersey 2,则可以使用其 Asynchronous processing feature。您可以只将 AsyncResponse
传递给计算任务,该任务将 resume
完成处理后的响应。
@POST
public void add(@Suspended AysncResponse response, String body) {
Calculator.getInstance().requestComputation(
client,
Calculator.SupportedOperations.ADDITION,
a,b,
response);
// you don't need to return anything from the resource method
// calling `response.resume(someResponse)` (from inside the task)
// is enough. That is why this method just returns `void`
}
使用异步功能的好处是,如果处理需要很长时间,您就不会像尝试使用某种块机制(如 CountDownLatch 或阻塞)那样阻塞服务器线程队列,或类似的东西。服务器线程会立即返回给服务器,以便它可以处理更多请求。