用于管理 api 每分钟最大请求数的数据结构
Data structure to manage a maximum number of requests per minute for an api
我需要将数据发送到外部 api,但是这个 API 对每个端点的请求有限制(即:每分钟 60 个请求)。
数据来自Kafka,然后每条消息都转到redis(因为我可以发送一个包含200个项目的请求)。所以,我用一个简单的缓存来帮助我,我可以保证如果我的服务器宕机,我不会丢失任何消息。
问题是,有时候 Kafka 开始发送很多消息,然后 Redis 开始增长(超过 100 万条消息发送到 api),我们可以不要在消息进来时请求太快。然后,我们有很大的延迟。
我的第一个代码很简单:ExecutorService executor = Executors.newFixedThreadPool(1);
这在消息很少且延迟最小的情况下非常有效。
所以,我做的第一件事就是将执行程序更改为:ExecutorService executor = Executors.newCachedThreadPool();
所以我可以要求新线程,因为我需要更快地向外部发出请求 api,但是,我遇到了每分钟请求限制的问题。
有些端点我每分钟可以发出300个请求,其他端点500个,其他端点30个等等。
我做的代码不是很好,这是我工作的公司,所以,我真的需要把它做得更好。
所以,每次我要请求外部 api 时,我都会调用 makeRequest 方法,这个方法是 synchronized,我知道我可以使用同步列表,但我认为同步方法在这种情况下更好。
// This is an inner class
private static class IntegrationType {
final Queue<Long> requests; // This queue is used to store the timestamp of the requests
final int maxRequestsPerMinute; // How many requests I can make per minute
public IntegrationType(final int maxRequestsPerMinute) {
this.maxRequestsPerMinute = maxRequestsPerMinute;
this.requests = new LinkedList<>();
}
synchronized void makeRequest() {
final long current = System.currentTimeMillis();
requests.add(current);
if (requests.size() >= maxRequestsPerMinute) {
long first = requests.poll(); // gets the first request
// The difference between the current request and the first request of the queue
final int differenceInSeconds = (int) (current - first) / 1000;
// if the difference is less than the maximum allowed
if (differenceInSeconds <= 60) {
// seconds to sleep.
final int secondsToSleep = 60 - differenceInSeconds;
sleep(secondsToSleep);
}
}
}
void sleep( int seconds){
try {
Thread.sleep(seconds * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
那么,有我可以使用的数据结构吗?
我应该考虑什么?
提前致谢。
如果正确理解你的问题,你可以使用BlockingQueue
with a ScheduledExecutorService
如下。
BlockingQueue
s 有方法 put
如果有可用的 space,它只会将给定的元素添加到队列中,否则方法调用将等待(直到有空闲space)。他们还有方法 take
,如果有任何元素,它只会从队列中删除一个元素,否则方法调用将等待(直到至少有一个元素要取)。
具体来说,您可以使用 LinkedBlockingQueue
或 ArrayBlockingQueue
,它们可以在任何给定时间提供固定大小的元素。这个固定大小意味着您可以提交 put
任意数量的请求,但您只会 take
请求并每秒处理一次或类似的(例如每分钟发出 60 个请求) ).
要用固定大小实例化一个LinkedBlockingQueue
,只需使用相应的构造函数(它接受大小作为参数)。 LinkedBlockingQueue
将根据其文档以 FIFO 顺序 take
元素。
要实例化具有固定大小的 ArrayBlockingQueue
,请使用接受大小的构造函数以及名为 fair
的 boolean
标志。如果此标志为 true
,则队列将 take
元素也按 FIFO 顺序排列。
然后你可以有一个 ScheduledExecutorService
(而不是在循环中等待),你可以在其中提交一个 Runnable
,它将 take
从队列中,与外部 API 然后等待通信之间所需的延迟。
下面是一个简单的演示示例:
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class Main {
public static class RequestSubmitter implements Runnable {
private final BlockingQueue<Request> q;
public RequestSubmitter(final BlockingQueue<Request> q) {
this.q = Objects.requireNonNull(q);
}
@Override
public void run() {
try {
q.put(new Request()); //Will block until available capacity.
}
catch (final InterruptedException ix) {
System.err.println("Interrupted!"); //Not expected to happen under normal use.
}
}
}
public static class Request {
public void make() {
try {
//Let's simulate the communication with the external API:
TimeUnit.MILLISECONDS.sleep((long) (Math.random() * 100));
}
catch (final InterruptedException ix) {
//Let's say here we failed to communicate with the external API...
}
}
}
public static class RequestImplementor implements Runnable {
private final BlockingQueue<Request> q;
public RequestImplementor(final BlockingQueue<Request> q) {
this.q = Objects.requireNonNull(q);
}
@Override
public void run() {
try {
q.take().make(); //Will block until there is at least one element to take.
System.out.println("Request made.");
}
catch (final InterruptedException ix) {
//Here the 'taking' from the 'q' is interrupted.
}
}
}
public static void main(final String[] args) throws InterruptedException {
/*The following initialization parameters specify that we
can communicate with the external API 60 times per 1 minute.*/
final int maxRequestsPerTime = 60;
final TimeUnit timeUnit = TimeUnit.MINUTES;
final long timeAmount = 1;
final BlockingQueue<Request> q = new ArrayBlockingQueue<>(maxRequestsPerTime, true);
//final BlockingQueue<Request> q = new LinkedBlockingQueue<>(maxRequestsPerTime);
//Submit some RequestSubmitters to the pool...
final ExecutorService pool = Executors.newFixedThreadPool(100);
for (int i = 0; i < 50_000; ++i)
pool.submit(new RequestSubmitter(q));
System.out.println("Serving...");
//Find out the period between communications with the external API:
final long delayMicroseconds = TimeUnit.MICROSECONDS.convert(timeAmount, timeUnit) / maxRequestsPerTime;
//We could do the same with NANOSECONDS for more accuracy, but that would be overkill I think.
//The most important line probably:
Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(new RequestImplementor(q), 0L, delayMicroseconds, TimeUnit.MICROSECONDS);
}
}
请注意,我使用了 scheduleWithFixedDelay
而 而不是 scheduleAtFixedRate
。您可以在他们的文档中看到第一个将等待提交的 Runnable
调用结束之间的延迟开始下一个,而第二个不会等待并重新提交 Runnable
每 period
个时间单位。但是我们不知道与外部API通信需要多长时间,所以如果我们scheduleAtFixedRate
每分钟一次period
,但是请求需要更多比一分钟完成?...然后将在第一个请求尚未完成时提交一个新请求。所以这就是我使用 scheduleWithFixedDelay
而不是 scheduleAtFixedRate
的原因。但还有更多:我使用了 单线程 计划执行程序服务。这是否意味着如果第一次调用未完成,则无法启动第二个调用?......好吧,如果你看一下 Executors#newSingleThreadScheduledExecutor()
的实现,那么可能会发生第二次调用,因为单线程核心池大小,并不意味着该池具有 固定 大小。
我使用 scheduleWithFixedDelay
的另一个原因是请求下溢。例如队列为空怎么办?然后调度也应该等待,而不是再次提交Runnable
。
另一方面,如果我们使用scheduleWithFixedDelay
,调度之间有1/60f
秒的延迟,并且在一分钟内提交了超过60个请求,那么这肯定会使我们对外部 API 的吞吐量下降,因为使用 scheduleWithFixedDelay
我们可以保证 最多 向外部 API 发出 60 个请求。它可以比那个少,但我们不希望它是。我们希望每次都达到极限。如果这不是您关心的问题,那么您已经可以使用上面的实现了。
但假设您每次都希望尽可能接近限制,在这种情况下,据我所知,您可以使用自定义调度程序来完成此操作,这比首先,但时间更准确。
最重要的是,通过上述实施,您需要确保与外部 API 的通信尽可能快地为请求提供服务。
最后,我要提醒您考虑一下,如果我建议的 BlockingQueue
实现未按 FIFO 顺序 put
ing,我找不到会发生什么。我的意思是,如果队列已满时 2 个请求几乎同时到达怎么办?他们都会等待,但是第一个到达的人会等待并先 put
ed ,还是第二个先 put
ed ?我不知道。如果您不关心在外部 API 处发出的某些请求乱序,那么请不要担心并使用到目前为止的代码。但是,如果您确实关心,并且能够在每个请求中输入例如序列号,那么您可以在 BlockingQueue
之后使用 PriorityQueue
,或者甚至可以尝试 PriorityBlockingQueue
(不幸的是它是无界的)。那会使事情变得更加复杂,所以我没有 post 与 PriorityQueue
相关的代码。至少我尽力了,我希望我能提出一些好的想法。我并不是说这个 post 是解决您所有问题的完整解决方案,但它是开始时的一些注意事项。
我实施了与@gthanop 建议不同的东西。
我发现,限制可能会改变。所以,我可能需要增加或缩小阻止列表。另一个原因,不会那么容易地使我们当前的代码适应这一点。还有一个,我们可能会用到多个实例,所以我们需要一个分布式锁。
所以,我实现起来更容易,但效率不如@ghtanop 的答案。
这是我的代码(改编,因为我无法显示公司代码):
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ScheduledExecutorService;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
public class Teste {
private static enum ExternalApi {
A, B, C;
}
private static class RequestManager {
private long firstRequest; // First request in one minute
// how many request have we made
private int requestsCount = 0;
// A timer thread, it will execute at every minute, it will refresh the request count and the first request time
private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
RequestManager() {
final long initialDelay = 0L;
final long fixedRate = 60;
executor.scheduleAtFixedRate(() -> {
System.out.println("Clearing the current count!");
requestsCount = 0;
firstRequest = System.currentTimeMillis();
}, initialDelay, fixedRate, TimeUnit.SECONDS);
}
void incrementRequest() {
requestsCount++;
}
long getFirstRequest() {
return firstRequest;
}
boolean requestsExceeded(final int requestLimit) {
return requestsCount >= requestLimit;
}
}
public static class RequestHelper {
private static final byte SECONDS_IN_MINUTE = 60;
private static final short MILLISECONDS_IN_SECOND = 1000;
private static final byte ZERO_SECONDS = 0;
// Table to support the time, and count of the requests
private final Map<Integer, RequestManager> requests;
// Table that contains the limits of each type of request
private final Map<Integer, Integer> requestLimits;
/**
* We need an array of semaphores, because, we might lock the requests for ONE, but not for TWO
*/
private final Semaphore[] semaphores;
private RequestHelper(){
// one semaphore for type
semaphores = new Semaphore[ExternalApi.values().length];
requests = new ConcurrentHashMap<>();
requestLimits = new HashMap<>();
for (final ExternalApi type : ExternalApi.values()) {
// Binary semaphore, must be fair, because we are updating things.
semaphores[type.ordinal()] = new Semaphore(1, true);
}
}
/**
* When my token expire, I must update this, because the limits might change.
* @param limits the new api limits
*/
protected void updateLimits(final Map<ExternalApi, Integer> limits) {
limits.forEach((key, value) -> requestLimits.put(key.ordinal(), value));
}
/**
* Increments the counter for the type of the request,
* Using the mutual exclusion lock, we can handle and block other threads that are trying to
* do a request to the api.
* If the incoming requests are going to exceed the maximum, we will make the thread sleep for N seconds ( 60 - time since first request)
* since we are using a Binary Semaphore, it will block incoming requests until the thread that is sleeping, wakeup and release the semaphore lock.
*
* @param type of the integration, Supp, List, PET etc ...
*/
protected final void addRequest(final ExternalApi type) {
// the index of this request
final int requestIndex = type.ordinal();
// we get the permit for the semaphore of the type
final Semaphore semaphore = semaphores[requestIndex];
// Try to acquire a permit, if no permit is available, it will block until one is available.
semaphore.acquireUninterruptibly();
///gets the requestManager for the type
final RequestManager requestManager = getRequest(requestIndex);
// increments the number of requests
requestManager.incrementRequest();
if (requestManager.requestsExceeded(requestLimits.get(type.ordinal()))) {
// the difference in seconds between a minute - the time that we needed to reach the maximum of requests
final int secondsToSleep = SECONDS_IN_MINUTE - (int) (System.currentTimeMillis() - requestManager.getFirstRequest()) / MILLISECONDS_IN_SECOND;
// We reached the maximum in less than a minute
if (secondsToSleep > ZERO_SECONDS) {
System.out.printf("We reached the maximum of: %d per minute by: %s. We must wait for: %d before make a new request!\n", requestLimits.get(type.ordinal()), type.name(), secondsToSleep);
sleep(secondsToSleep * MILLISECONDS_IN_SECOND);
}
}
// releases the semaphore
semaphore.release();
}
private final void sleep(final long time) {
try {
Thread.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* Gets the first Request Manager, if it is the first request, it will create the
* RequestManager object
* @param index
* @return a RequestManager instance
*/
private RequestManager getRequest(final int index) {
RequestManager request = requests.get(index);
if(request == null) {
request = new RequestManager();
requests.put(index, request);
}
return request;
}
}
public static void main(String[] args) {
final RequestHelper requestHelper = new RequestHelper();
final Map<ExternalApi, Integer> apiLimits = Map.of(ExternalApi.A, 30, ExternalApi.B, 60, ExternalApi.C, 90);
// update the limits
requestHelper.updateLimits(apiLimits);
final ScheduledExecutorService executor = Executors.newScheduledThreadPool(3);
executor.scheduleWithFixedDelay(() -> {
System.out.println("A new request is going to happen");
requestHelper.addRequest(ExternalApi.A);
sleep(65);
}, 0, 100, TimeUnit.MILLISECONDS);
executor.scheduleWithFixedDelay(() -> {
System.out.println("B new request is going to happen");
requestHelper.addRequest(ExternalApi.B);
sleep(50);
}, 0, 200, TimeUnit.MILLISECONDS);
executor.scheduleWithFixedDelay(() -> {
System.out.println("C new request is going to happen");
requestHelper.addRequest(ExternalApi.C);
sleep(30);
}, 0, 300, TimeUnit.MILLISECONDS);
}
private static final void sleep(final long time) {
try {
Thread.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
我需要将数据发送到外部 api,但是这个 API 对每个端点的请求有限制(即:每分钟 60 个请求)。
数据来自Kafka,然后每条消息都转到redis(因为我可以发送一个包含200个项目的请求)。所以,我用一个简单的缓存来帮助我,我可以保证如果我的服务器宕机,我不会丢失任何消息。
问题是,有时候 Kafka 开始发送很多消息,然后 Redis 开始增长(超过 100 万条消息发送到 api),我们可以不要在消息进来时请求太快。然后,我们有很大的延迟。
我的第一个代码很简单:ExecutorService executor = Executors.newFixedThreadPool(1);
这在消息很少且延迟最小的情况下非常有效。
所以,我做的第一件事就是将执行程序更改为:ExecutorService executor = Executors.newCachedThreadPool();
所以我可以要求新线程,因为我需要更快地向外部发出请求 api,但是,我遇到了每分钟请求限制的问题。
有些端点我每分钟可以发出300个请求,其他端点500个,其他端点30个等等。
我做的代码不是很好,这是我工作的公司,所以,我真的需要把它做得更好。
所以,每次我要请求外部 api 时,我都会调用 makeRequest 方法,这个方法是 synchronized,我知道我可以使用同步列表,但我认为同步方法在这种情况下更好。
// This is an inner class
private static class IntegrationType {
final Queue<Long> requests; // This queue is used to store the timestamp of the requests
final int maxRequestsPerMinute; // How many requests I can make per minute
public IntegrationType(final int maxRequestsPerMinute) {
this.maxRequestsPerMinute = maxRequestsPerMinute;
this.requests = new LinkedList<>();
}
synchronized void makeRequest() {
final long current = System.currentTimeMillis();
requests.add(current);
if (requests.size() >= maxRequestsPerMinute) {
long first = requests.poll(); // gets the first request
// The difference between the current request and the first request of the queue
final int differenceInSeconds = (int) (current - first) / 1000;
// if the difference is less than the maximum allowed
if (differenceInSeconds <= 60) {
// seconds to sleep.
final int secondsToSleep = 60 - differenceInSeconds;
sleep(secondsToSleep);
}
}
}
void sleep( int seconds){
try {
Thread.sleep(seconds * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
那么,有我可以使用的数据结构吗? 我应该考虑什么?
提前致谢。
如果正确理解你的问题,你可以使用BlockingQueue
with a ScheduledExecutorService
如下。
BlockingQueue
s 有方法 put
如果有可用的 space,它只会将给定的元素添加到队列中,否则方法调用将等待(直到有空闲space)。他们还有方法 take
,如果有任何元素,它只会从队列中删除一个元素,否则方法调用将等待(直到至少有一个元素要取)。
具体来说,您可以使用 LinkedBlockingQueue
或 ArrayBlockingQueue
,它们可以在任何给定时间提供固定大小的元素。这个固定大小意味着您可以提交 put
任意数量的请求,但您只会 take
请求并每秒处理一次或类似的(例如每分钟发出 60 个请求) ).
要用固定大小实例化一个LinkedBlockingQueue
,只需使用相应的构造函数(它接受大小作为参数)。 LinkedBlockingQueue
将根据其文档以 FIFO 顺序 take
元素。
要实例化具有固定大小的 ArrayBlockingQueue
,请使用接受大小的构造函数以及名为 fair
的 boolean
标志。如果此标志为 true
,则队列将 take
元素也按 FIFO 顺序排列。
然后你可以有一个 ScheduledExecutorService
(而不是在循环中等待),你可以在其中提交一个 Runnable
,它将 take
从队列中,与外部 API 然后等待通信之间所需的延迟。
下面是一个简单的演示示例:
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class Main {
public static class RequestSubmitter implements Runnable {
private final BlockingQueue<Request> q;
public RequestSubmitter(final BlockingQueue<Request> q) {
this.q = Objects.requireNonNull(q);
}
@Override
public void run() {
try {
q.put(new Request()); //Will block until available capacity.
}
catch (final InterruptedException ix) {
System.err.println("Interrupted!"); //Not expected to happen under normal use.
}
}
}
public static class Request {
public void make() {
try {
//Let's simulate the communication with the external API:
TimeUnit.MILLISECONDS.sleep((long) (Math.random() * 100));
}
catch (final InterruptedException ix) {
//Let's say here we failed to communicate with the external API...
}
}
}
public static class RequestImplementor implements Runnable {
private final BlockingQueue<Request> q;
public RequestImplementor(final BlockingQueue<Request> q) {
this.q = Objects.requireNonNull(q);
}
@Override
public void run() {
try {
q.take().make(); //Will block until there is at least one element to take.
System.out.println("Request made.");
}
catch (final InterruptedException ix) {
//Here the 'taking' from the 'q' is interrupted.
}
}
}
public static void main(final String[] args) throws InterruptedException {
/*The following initialization parameters specify that we
can communicate with the external API 60 times per 1 minute.*/
final int maxRequestsPerTime = 60;
final TimeUnit timeUnit = TimeUnit.MINUTES;
final long timeAmount = 1;
final BlockingQueue<Request> q = new ArrayBlockingQueue<>(maxRequestsPerTime, true);
//final BlockingQueue<Request> q = new LinkedBlockingQueue<>(maxRequestsPerTime);
//Submit some RequestSubmitters to the pool...
final ExecutorService pool = Executors.newFixedThreadPool(100);
for (int i = 0; i < 50_000; ++i)
pool.submit(new RequestSubmitter(q));
System.out.println("Serving...");
//Find out the period between communications with the external API:
final long delayMicroseconds = TimeUnit.MICROSECONDS.convert(timeAmount, timeUnit) / maxRequestsPerTime;
//We could do the same with NANOSECONDS for more accuracy, but that would be overkill I think.
//The most important line probably:
Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(new RequestImplementor(q), 0L, delayMicroseconds, TimeUnit.MICROSECONDS);
}
}
请注意,我使用了 scheduleWithFixedDelay
而 而不是 scheduleAtFixedRate
。您可以在他们的文档中看到第一个将等待提交的 Runnable
调用结束之间的延迟开始下一个,而第二个不会等待并重新提交 Runnable
每 period
个时间单位。但是我们不知道与外部API通信需要多长时间,所以如果我们scheduleAtFixedRate
每分钟一次period
,但是请求需要更多比一分钟完成?...然后将在第一个请求尚未完成时提交一个新请求。所以这就是我使用 scheduleWithFixedDelay
而不是 scheduleAtFixedRate
的原因。但还有更多:我使用了 单线程 计划执行程序服务。这是否意味着如果第一次调用未完成,则无法启动第二个调用?......好吧,如果你看一下 Executors#newSingleThreadScheduledExecutor()
的实现,那么可能会发生第二次调用,因为单线程核心池大小,并不意味着该池具有 固定 大小。
我使用 scheduleWithFixedDelay
的另一个原因是请求下溢。例如队列为空怎么办?然后调度也应该等待,而不是再次提交Runnable
。
另一方面,如果我们使用scheduleWithFixedDelay
,调度之间有1/60f
秒的延迟,并且在一分钟内提交了超过60个请求,那么这肯定会使我们对外部 API 的吞吐量下降,因为使用 scheduleWithFixedDelay
我们可以保证 最多 向外部 API 发出 60 个请求。它可以比那个少,但我们不希望它是。我们希望每次都达到极限。如果这不是您关心的问题,那么您已经可以使用上面的实现了。
但假设您每次都希望尽可能接近限制,在这种情况下,据我所知,您可以使用自定义调度程序来完成此操作,这比首先,但时间更准确。
最重要的是,通过上述实施,您需要确保与外部 API 的通信尽可能快地为请求提供服务。
最后,我要提醒您考虑一下,如果我建议的 BlockingQueue
实现未按 FIFO 顺序 put
ing,我找不到会发生什么。我的意思是,如果队列已满时 2 个请求几乎同时到达怎么办?他们都会等待,但是第一个到达的人会等待并先 put
ed ,还是第二个先 put
ed ?我不知道。如果您不关心在外部 API 处发出的某些请求乱序,那么请不要担心并使用到目前为止的代码。但是,如果您确实关心,并且能够在每个请求中输入例如序列号,那么您可以在 BlockingQueue
之后使用 PriorityQueue
,或者甚至可以尝试 PriorityBlockingQueue
(不幸的是它是无界的)。那会使事情变得更加复杂,所以我没有 post 与 PriorityQueue
相关的代码。至少我尽力了,我希望我能提出一些好的想法。我并不是说这个 post 是解决您所有问题的完整解决方案,但它是开始时的一些注意事项。
我实施了与@gthanop 建议不同的东西。
我发现,限制可能会改变。所以,我可能需要增加或缩小阻止列表。另一个原因,不会那么容易地使我们当前的代码适应这一点。还有一个,我们可能会用到多个实例,所以我们需要一个分布式锁。
所以,我实现起来更容易,但效率不如@ghtanop 的答案。
这是我的代码(改编,因为我无法显示公司代码):
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ScheduledExecutorService;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
public class Teste {
private static enum ExternalApi {
A, B, C;
}
private static class RequestManager {
private long firstRequest; // First request in one minute
// how many request have we made
private int requestsCount = 0;
// A timer thread, it will execute at every minute, it will refresh the request count and the first request time
private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
RequestManager() {
final long initialDelay = 0L;
final long fixedRate = 60;
executor.scheduleAtFixedRate(() -> {
System.out.println("Clearing the current count!");
requestsCount = 0;
firstRequest = System.currentTimeMillis();
}, initialDelay, fixedRate, TimeUnit.SECONDS);
}
void incrementRequest() {
requestsCount++;
}
long getFirstRequest() {
return firstRequest;
}
boolean requestsExceeded(final int requestLimit) {
return requestsCount >= requestLimit;
}
}
public static class RequestHelper {
private static final byte SECONDS_IN_MINUTE = 60;
private static final short MILLISECONDS_IN_SECOND = 1000;
private static final byte ZERO_SECONDS = 0;
// Table to support the time, and count of the requests
private final Map<Integer, RequestManager> requests;
// Table that contains the limits of each type of request
private final Map<Integer, Integer> requestLimits;
/**
* We need an array of semaphores, because, we might lock the requests for ONE, but not for TWO
*/
private final Semaphore[] semaphores;
private RequestHelper(){
// one semaphore for type
semaphores = new Semaphore[ExternalApi.values().length];
requests = new ConcurrentHashMap<>();
requestLimits = new HashMap<>();
for (final ExternalApi type : ExternalApi.values()) {
// Binary semaphore, must be fair, because we are updating things.
semaphores[type.ordinal()] = new Semaphore(1, true);
}
}
/**
* When my token expire, I must update this, because the limits might change.
* @param limits the new api limits
*/
protected void updateLimits(final Map<ExternalApi, Integer> limits) {
limits.forEach((key, value) -> requestLimits.put(key.ordinal(), value));
}
/**
* Increments the counter for the type of the request,
* Using the mutual exclusion lock, we can handle and block other threads that are trying to
* do a request to the api.
* If the incoming requests are going to exceed the maximum, we will make the thread sleep for N seconds ( 60 - time since first request)
* since we are using a Binary Semaphore, it will block incoming requests until the thread that is sleeping, wakeup and release the semaphore lock.
*
* @param type of the integration, Supp, List, PET etc ...
*/
protected final void addRequest(final ExternalApi type) {
// the index of this request
final int requestIndex = type.ordinal();
// we get the permit for the semaphore of the type
final Semaphore semaphore = semaphores[requestIndex];
// Try to acquire a permit, if no permit is available, it will block until one is available.
semaphore.acquireUninterruptibly();
///gets the requestManager for the type
final RequestManager requestManager = getRequest(requestIndex);
// increments the number of requests
requestManager.incrementRequest();
if (requestManager.requestsExceeded(requestLimits.get(type.ordinal()))) {
// the difference in seconds between a minute - the time that we needed to reach the maximum of requests
final int secondsToSleep = SECONDS_IN_MINUTE - (int) (System.currentTimeMillis() - requestManager.getFirstRequest()) / MILLISECONDS_IN_SECOND;
// We reached the maximum in less than a minute
if (secondsToSleep > ZERO_SECONDS) {
System.out.printf("We reached the maximum of: %d per minute by: %s. We must wait for: %d before make a new request!\n", requestLimits.get(type.ordinal()), type.name(), secondsToSleep);
sleep(secondsToSleep * MILLISECONDS_IN_SECOND);
}
}
// releases the semaphore
semaphore.release();
}
private final void sleep(final long time) {
try {
Thread.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* Gets the first Request Manager, if it is the first request, it will create the
* RequestManager object
* @param index
* @return a RequestManager instance
*/
private RequestManager getRequest(final int index) {
RequestManager request = requests.get(index);
if(request == null) {
request = new RequestManager();
requests.put(index, request);
}
return request;
}
}
public static void main(String[] args) {
final RequestHelper requestHelper = new RequestHelper();
final Map<ExternalApi, Integer> apiLimits = Map.of(ExternalApi.A, 30, ExternalApi.B, 60, ExternalApi.C, 90);
// update the limits
requestHelper.updateLimits(apiLimits);
final ScheduledExecutorService executor = Executors.newScheduledThreadPool(3);
executor.scheduleWithFixedDelay(() -> {
System.out.println("A new request is going to happen");
requestHelper.addRequest(ExternalApi.A);
sleep(65);
}, 0, 100, TimeUnit.MILLISECONDS);
executor.scheduleWithFixedDelay(() -> {
System.out.println("B new request is going to happen");
requestHelper.addRequest(ExternalApi.B);
sleep(50);
}, 0, 200, TimeUnit.MILLISECONDS);
executor.scheduleWithFixedDelay(() -> {
System.out.println("C new request is going to happen");
requestHelper.addRequest(ExternalApi.C);
sleep(30);
}, 0, 300, TimeUnit.MILLISECONDS);
}
private static final void sleep(final long time) {
try {
Thread.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}