用于管理 api 每分钟最大请求数的数据结构

Data structure to manage a maximum number of requests per minute for an api

我需要将数据发送到外部 api,但是这个 API 对每个端点的请求有限制(即:每分钟 60 个请求)。

数据来自Kafka,然后每条消息都转到redis(因为我可以发送一个包含200个项目的请求)。所以,我用一个简单的缓存来帮助我,我可以保证如果我的服务器宕机,我不会丢失任何消息。

问题是,有时候 Kafka 开始发送很多消息,然后 Redis 开始增长(超过 100 万条消息发送到 api),我们可以不要在消息进来时请求太快。然后,我们有很大的延迟。

我的第一个代码很简单:ExecutorService executor = Executors.newFixedThreadPool(1);
这在消息很少且延迟最小的情况下非常有效。

所以,我做的第一件事就是将执行程序更改为:ExecutorService executor = Executors.newCachedThreadPool();
所以我可以要求新线程,因为我需要更快地向外部发出请求 api,但是,我遇到了每分钟请求限制的问题。

有些端点我每分钟可以发出300个请求,其他端点500个,其他端点30个等等。

我做的代码不是很好,这是我工作的公司,所以,我真的需要把它做得更好。

所以,每次我要请求外部 api 时,我都会调用 makeRequest 方法,这个方法是 synchronized,我知道我可以使用同步列表,但我认为同步方法在这种情况下更好。

// This is an inner class
private static class IntegrationType {

    final Queue<Long> requests; // This queue is used to store the timestamp of the requests
    final int maxRequestsPerMinute; // How many requests I can make per minute

    public IntegrationType(final int maxRequestsPerMinute) {
        this.maxRequestsPerMinute = maxRequestsPerMinute;
        this.requests = new LinkedList<>();
    }

    synchronized void makeRequest() {
        final long current = System.currentTimeMillis();
        requests.add(current);
        if (requests.size() >= maxRequestsPerMinute) {
            long first = requests.poll(); // gets the first request

            // The difference between the current request and the first request of the queue
            final int differenceInSeconds = (int) (current - first) / 1000;
           
            // if the difference is less than the maximum allowed
            if (differenceInSeconds <= 60) {
                // seconds to sleep.
                final int secondsToSleep = 60 - differenceInSeconds;
                sleep(secondsToSleep);
            }
        }
    }

     void sleep( int seconds){
        try {
            Thread.sleep(seconds * 1000);
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
     }
}

那么,有我可以使用的数据结构吗? 我应该考虑什么?

提前致谢。

如果正确理解你的问题,你可以使用BlockingQueue with a ScheduledExecutorService如下。

BlockingQueues 有方法 put 如果有可用的 space,它只会将给定的元素添加到队列中,否则方法调用将等待(直到有空闲space)。他们还有方法 take ,如果有任何元素,它只会从队列中删除一个元素,否则方法调用将等待(直到至少有一个元素要取)。

具体来说,您可以使用 LinkedBlockingQueueArrayBlockingQueue,它们可以在任何给定时间提供固定大小的元素。这个固定大小意味着您可以提交 put 任意数量的请求,但您只会 take 请求并每秒处理一次或类似的(例如每分钟发出 60 个请求) ).

要用固定大小实例化一个LinkedBlockingQueue,只需使用相应的构造函数(它接受大小作为参数)。 LinkedBlockingQueue 将根据其文档以 FIFO 顺序 take 元素。

要实例化具有固定大小的 ArrayBlockingQueue,请使用接受大小的构造函数以及名为 fairboolean 标志。如果此标志为 true,则队列将 take 元素也按 FIFO 顺序排列。

然后你可以有一个 ScheduledExecutorService(而不是在循环中等待),你可以在其中提交一个 Runnable,它将 take 从队列中,与外部 API 然后等待通信之间所需的延迟。

下面是一个简单的演示示例:

import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class Main {
    
    public static class RequestSubmitter implements Runnable {
        private final BlockingQueue<Request> q;
        
        public RequestSubmitter(final BlockingQueue<Request> q) {
            this.q = Objects.requireNonNull(q);
        }
        
        @Override
        public void run() {
            try {
                q.put(new Request()); //Will block until available capacity.
            }
            catch (final InterruptedException ix) {
                System.err.println("Interrupted!"); //Not expected to happen under normal use.
            }
        }
    }
    
    public static class Request {
        public void make() {
            try {
                //Let's simulate the communication with the external API:
                TimeUnit.MILLISECONDS.sleep((long) (Math.random() * 100));
            }
            catch (final InterruptedException ix) {
                //Let's say here we failed to communicate with the external API...
            }
        }
    }
    
    public static class RequestImplementor implements Runnable {
        private final BlockingQueue<Request> q;
        
        public RequestImplementor(final BlockingQueue<Request> q) {
            this.q = Objects.requireNonNull(q);
        }
        
        @Override
        public void run() {
            try {
                q.take().make(); //Will block until there is at least one element to take.
                System.out.println("Request made.");
            }
            catch (final InterruptedException ix) {
                //Here the 'taking' from the 'q' is interrupted.
            }
        }
    }
    
    public static void main(final String[] args) throws InterruptedException {
        
        /*The following initialization parameters specify that we
        can communicate with the external API 60 times per 1 minute.*/
        final int maxRequestsPerTime = 60;
        final TimeUnit timeUnit = TimeUnit.MINUTES;
        final long timeAmount = 1;
        
        final BlockingQueue<Request> q = new ArrayBlockingQueue<>(maxRequestsPerTime, true);
        //final BlockingQueue<Request> q = new LinkedBlockingQueue<>(maxRequestsPerTime);
        
        //Submit some RequestSubmitters to the pool...
        final ExecutorService pool = Executors.newFixedThreadPool(100);
        for (int i = 0; i < 50_000; ++i)
            pool.submit(new RequestSubmitter(q));
        
        System.out.println("Serving...");
        
        //Find out the period between communications with the external API:
        final long delayMicroseconds = TimeUnit.MICROSECONDS.convert(timeAmount, timeUnit) / maxRequestsPerTime;
        //We could do the same with NANOSECONDS for more accuracy, but that would be overkill I think.
        
        //The most important line probably:
        Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(new RequestImplementor(q), 0L, delayMicroseconds, TimeUnit.MICROSECONDS);
    }
}

请注意,我使用了 scheduleWithFixedDelay 而不是 scheduleAtFixedRate。您可以在他们的文档中看到第一个将等待提交的 Runnable 调用结束之间的延迟开始下一个,而第二个不会等待并重新提交 Runnableperiod 个时间单位。但是我们不知道与外部API通信需要多长时间,所以如果我们scheduleAtFixedRate每分钟一次period,但是请求需要更多比一分钟完成?...然后将在第一个请求尚未完成时提交一个新请求。所以这就是我使用 scheduleWithFixedDelay 而不是 scheduleAtFixedRate 的原因。但还有更多:我使用了 单线程 计划执行程序服务。这是否意味着如果第一次调用未完成,则无法启动第二个调用?......好吧,如果你看一下 Executors#newSingleThreadScheduledExecutor() 的实现,那么可能会发生第二次调用,因为单线程核心池大小,并不意味着该池具有 固定 大小。

我使用 scheduleWithFixedDelay 的另一个原因是请求下溢。例如队列为空怎么办?然后调度也应该等待,而不是再次提交Runnable

另一方面,如果我们使用scheduleWithFixedDelay,调度之间有1/60f秒的延迟,并且在一分钟内提交了超过60个请求,那么这肯定会使我们对外部 API 的吞吐量下降,因为使用 scheduleWithFixedDelay 我们可以保证 最多 向外部 API 发出 60 个请求。它可以比那个少,但我们不希望它是。我们希望每次都达到极限。如果这不是您关心的问题,那么您已经可以使用上面的实现了。

但假设您每次都希望尽可能接近限制,在这种情况下,据我所知,您可以使用自定义调度程序来完成此操作,这比首先,但时间更准确。

最重要的是,通过上述实施,您需要确保与外部 API 的通信尽可能快地为请求提供服务。

最后,我要提醒您考虑一下,如果我建议的 BlockingQueue 实现未按 FIFO 顺序 puting,我找不到会发生什么。我的意思是,如果队列已满时 2 个请求几乎同时到达怎么办?他们都会等待,但是第一个到达的人会等待并先 puted ,还是第二个先 puted ?我不知道。如果您不关心在外部 API 处发出的某些请求乱序,那么请不要担心并使用到目前为止的代码。但是,如果您确实关心,并且能够在每个请求中输入例如序列号,那么您可以在 BlockingQueue 之后使用 PriorityQueue ,或者甚至可以尝试 PriorityBlockingQueue(不幸的是它是无界的)。那会使事情变得更加复杂,所以我没有 post 与 PriorityQueue 相关的代码。至少我尽力了,我希望我能提出一些好的想法。我并不是说这个 post 是解决您所有问题的完整解决方案,但它是开始时的一些注意事项。

我实施了与@gthanop 建议不同的东西。

我发现,限制可能会改变。所以,我可能需要增加或缩小阻止列表。另一个原因,不会那么容易地使我们当前的代码适应这一点。还有一个,我们可能会用到多个实例,所以我们需要一个分布式锁。

所以,我实现起来更容易,但效率不如@ghtanop 的答案。

这是我的代码(改编,因为我无法显示公司代码):

import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ScheduledExecutorService;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;

public class Teste {
    
    private static enum ExternalApi {    
        A, B, C;
    }

    private static class RequestManager {

        private long firstRequest; // First request in one minute
    
        // how many request have we made
        private int requestsCount = 0;
    
        // A timer thread, it will execute at every minute, it will refresh the request count and the first request time
        private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
    
        RequestManager() {
            final long initialDelay = 0L;
            final long fixedRate = 60;
    
            executor.scheduleAtFixedRate(() -> {
                System.out.println("Clearing the current count!");
                requestsCount = 0;
                firstRequest = System.currentTimeMillis();
            }, initialDelay, fixedRate, TimeUnit.SECONDS);
        }
    
        void incrementRequest() {
            requestsCount++;
        }
    
        long getFirstRequest() {
            return firstRequest;
        }
    
    
        boolean requestsExceeded(final int requestLimit) {
            return requestsCount >= requestLimit;
        }
    
    }

    public static class RequestHelper {

        private static final byte SECONDS_IN_MINUTE = 60;
        private static final short MILLISECONDS_IN_SECOND = 1000;
        private static final byte ZERO_SECONDS = 0;
    
        // Table to support the time, and count of the requests
        private final Map<Integer, RequestManager> requests;
    
        // Table that contains the limits of each type of request
        private final Map<Integer, Integer> requestLimits;
    
        /**
         * We need an array of semaphores, because, we might lock the requests for ONE, but not for TWO
         */
        private final Semaphore[] semaphores;
    
        private RequestHelper(){
    
            // one semaphore for type
            semaphores = new Semaphore[ExternalApi.values().length];
            requests = new ConcurrentHashMap<>();
            requestLimits = new HashMap<>();
    
            for (final ExternalApi type : ExternalApi.values()) {

                // Binary semaphore, must be fair, because we are updating things.
                semaphores[type.ordinal()] = new Semaphore(1, true);
            }
        }
    
        /**
         * When my token expire, I must update this, because the limits might change.
         * @param limits the new api limits
         */
        protected void updateLimits(final Map<ExternalApi, Integer> limits) {
            limits.forEach((key, value) -> requestLimits.put(key.ordinal(), value));
        }
    
    
        /**
         * Increments the counter for the type of the request,
         * Using the mutual exclusion lock, we can handle and block other threads that are trying to
         * do a request to the api.
         * If the incoming requests are going to exceed the maximum, we will make the thread sleep for N seconds ( 60 - time since first request)
         * since we are using a Binary Semaphore, it will block incoming requests until the thread that is sleeping, wakeup and release the semaphore lock.
         *
         * @param type of the integration, Supp, List, PET etc ...
         */
        protected final void addRequest(final ExternalApi type) {
    
            // the index of this request
            final int requestIndex = type.ordinal();
    
            // we get the permit for the semaphore of the type
            final Semaphore semaphore = semaphores[requestIndex];
    
            // Try to acquire a permit, if no permit is available, it will block until one is available.
            semaphore.acquireUninterruptibly();
    
            ///gets the requestManager for the type
            final RequestManager requestManager = getRequest(requestIndex);
    
            // increments the number of requests
            requestManager.incrementRequest();
    
            if (requestManager.requestsExceeded(requestLimits.get(type.ordinal()))) {
    
                // the difference in seconds between a minute - the time that we needed to reach the maximum of requests
                final int secondsToSleep = SECONDS_IN_MINUTE - (int) (System.currentTimeMillis() - requestManager.getFirstRequest()) / MILLISECONDS_IN_SECOND;
    
                // We reached the maximum in less than a minute
                if (secondsToSleep > ZERO_SECONDS) {
                    System.out.printf("We reached the maximum of: %d per minute by: %s. We must wait for: %d before make a new request!\n", requestLimits.get(type.ordinal()), type.name(), secondsToSleep);
                    sleep(secondsToSleep * MILLISECONDS_IN_SECOND);
                }
            }
            // releases the semaphore
            semaphore.release();
        }
    
    
        private final void sleep(final long time) {
            try {
                Thread.sleep(time);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * Gets the first Request Manager, if it is the first request, it will create the
         * RequestManager object
         * @param index
         * @return a RequestManager instance
         */
        private RequestManager getRequest(final int index) {
            RequestManager request = requests.get(index);
            if(request == null) {
                request = new RequestManager();
                requests.put(index, request);
            }
            return request;
        }
    }

    public static void main(String[] args) {
        
        final RequestHelper requestHelper = new RequestHelper();
        
        final Map<ExternalApi, Integer> apiLimits = Map.of(ExternalApi.A, 30, ExternalApi.B, 60, ExternalApi.C, 90);
        
        // update the limits
        requestHelper.updateLimits(apiLimits);

        final ScheduledExecutorService executor = Executors.newScheduledThreadPool(3);
        executor.scheduleWithFixedDelay(() -> {
            System.out.println("A new request is going to happen");
            requestHelper.addRequest(ExternalApi.A);
            sleep(65);
        }, 0, 100, TimeUnit.MILLISECONDS);

        executor.scheduleWithFixedDelay(() -> {
            System.out.println("B new request is going to happen");
            requestHelper.addRequest(ExternalApi.B);
            sleep(50);
        }, 0, 200, TimeUnit.MILLISECONDS);

        executor.scheduleWithFixedDelay(() -> {
            System.out.println("C new request is going to happen");
            requestHelper.addRequest(ExternalApi.C);
            sleep(30);
        }, 0, 300, TimeUnit.MILLISECONDS);

    }
    
    
    private static final void sleep(final long time) {
        try {
            Thread.sleep(time);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    } 
}