可变速率消费者的负载平衡队列

Load Balance Queues for variable rate consumers

我有一个生产者和消费者框架。每个生产者推送到一个队列,消费者从队列中消费。在任何时间点,都可以有一个或多个队列,每个消费者从单个队列中消费。但是生产者可以生产到任何队列。如果消费者速度慢,它会不断堆积消息。我正在尝试提供一个框架,在该框架中我可以对消费者进行负载平衡,以便无论消费者速度如何,所有消费者队列都具有几乎相同的消息。

示例:

此处队列 Q1-Q3 应该具有几乎相等的消息,而不管 C1-C3 消费者的速率如何。我现在使用的默认策略是生产者循环法,但如果任何消费者速度慢,它会继续向队列中添加消息。所有消息都属于同一类型,因此它会进入任何队列。

任何入手的建议都会有所帮助。

简单 - 添加到项目数最少的队列。

以下是我已经实施的解决方案。使用的算法如下。

  1. 每 30 秒计算所有队列的平均值。
  2. 如果a的滞后 consumer w.r.t 表示大于特定阈值时忽略 queue/consumer.

生产者代码:

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.BlockingQueue;

public class Producer implements Runnable{

    private List<BlockingQueue<Integer>> blockingQueues = new ArrayList<>();
    private List<Integer> fullPartitions;
    private List<Integer> activePartitions;
    long timer = System.currentTimeMillis();
    int THRESHOLD = 10000;
    int currentQueue = 0;

    public Producer(List<BlockingQueue<Integer>> blockingQueues, List<Integer> fullPartitions, List<Integer> activePartitions) {
        this.blockingQueues = blockingQueues;
        this.fullPartitions = fullPartitions;
        this.activePartitions = activePartitions;
    }

    @Override
    public void run() {
        long start = System.currentTimeMillis();
        while(true) {
            blockingQueues.get(getNextID()).offer(new Random().nextInt(100000));
            try {
                if(System.currentTimeMillis()-start<300000)
                    Thread.sleep(1);
                else
                    break;
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    private int getNextID() {
        if(System.currentTimeMillis()-timer>30000) {
            activePartitions = new ArrayList<>();
            long mean = 0l; 
            for(int i=0;i<fullPartitions.size();i++) 
                mean += blockingQueues.get(i).size();

            mean  = mean/blockingQueues.size();
            for(int i=0;i<fullPartitions.size();i++) 
                if(blockingQueues.get(i).size()-mean<THRESHOLD)
                    activePartitions.add(i);

            timer = System.currentTimeMillis();
        }
        int partitionID = activePartitions.get(currentQueue%activePartitions.size());
        currentQueue++;
        return partitionID;
    }
}

消费者:

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class Consumer implements Runnable{

    private BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(100000000);
    private int delayFactor;
    public Consumer(BlockingQueue<Integer> blockingQueue, int delayFactor, int consumerNo) {
        this.blockingQueue = blockingQueue;
        this.delayFactor = delayFactor;
    }

    @Override
    public void run() {
        long start = System.currentTimeMillis();
        while(true) {
            try {
                blockingQueue.take();
                if(blockingQueue.isEmpty())
                    System.out.println((System.currentTimeMillis()-start)/1000);
                Thread.sleep(delayFactor);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}

主线程:

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class KafkaLoadBalancer {

    private static int MAX_PARTITION = 4;

    public static void main(String args[]) throws InterruptedException {
        List<BlockingQueue<Integer>> blockingQueues = new ArrayList<>();
        List<Integer> fullPartitions = new ArrayList<Integer>();
        List<Integer> activePartitions = new ArrayList<Integer>();

        System.out.println("Creating Queues");
        for(int i=0;i<MAX_PARTITION;i++) {
            blockingQueues.add(new ArrayBlockingQueue<>(1000000));
            fullPartitions.add(i);
            activePartitions.add(i);
        }

        System.out.println("Starting Producers");
        for(int i=0;i<MAX_PARTITION;i++) {
            Producer producer = new Producer(blockingQueues,fullPartitions,activePartitions);
            new Thread(producer).start();
        }

        System.out.println("Starting Consumers");
        for(int i=0;i<MAX_PARTITION;i++) {
            Consumer consumer = new Consumer(blockingQueues.get(i),i+1,i);
            new Thread(consumer).start();
        }

        System.out.println("Starting Display Thread");
        DisplayQueue dq = new DisplayQueue(blockingQueues);
        new Thread(dq).start();
    }
}

DispayQueue:显示队列大小

import java.util.List;
import java.util.concurrent.BlockingQueue;

public class DisplayQueue implements Runnable {

    private List<BlockingQueue<Integer>> blockingQueues;

    public DisplayQueue(List<BlockingQueue<Integer>> blockingQueues) {
        this.blockingQueues = blockingQueues;
    }

    @Override
    public void run() {

        long start = System.currentTimeMillis();
        while(true) {
            if(System.currentTimeMillis()-start>30000) {
                for(int i=0;i<blockingQueues.size();i++)
                    System.out.println("Queue "+i+" size is=="+blockingQueues.get(i).size());
                start = System.currentTimeMillis();
            }
        }

    }

}