多个生产者和多个消费者 Java 信号量阻塞队列

Multiple Producers and Multiple Consumers Java Semaphore Blocking Queue

我们正在尝试围绕一个信号量锁队列实现多个生产者和多个消费者。我们 运行 遇到了生产的物品比我告诉它的要多的问题。有人可以帮我弄清楚我们哪里出错了吗?谢谢

package prog2;

import java.util.Scanner;

public class prog2 {

public static void main(String[] args) throws Exception {

    Scanner scanner = new Scanner(System.in);
    System.out.println("Input number of producers threads, number of consumer threads, size of the buffer, and the number of items to be produced.");
    System.out.println("Format Ex: 4 5 10 1000");
    String input = scanner.nextLine();
    String[] numbers = input.split(" ");

    int num_producers, num_consumers, size_buffer, num_items, num_consumed, num_produced;
    num_producers = Integer.parseInt(numbers[0]);
    num_consumers = Integer.parseInt(numbers[1]);
    size_buffer = Integer.parseInt(numbers[2]);
    num_items = Integer.parseInt(numbers[3]);

    ProducerConsumer implementation = new ProducerConsumer(size_buffer, num_items);

    for(int producer_count = 0; producer_count < num_producers; producer_count++){ //creating multiple producers
                Producer p = new Producer(implementation);
                p.start();
    }

    for(int consumer_count = 0; consumer_count < num_consumers; consumer_count++){ //creating multiple consumers
                Consumer c = new Consumer (implementation);
                c.start();
    } 

    System.out.println("Number of Produced items: " + implementation.num_produced + " Number of Consumed items: " + implementation.num_consumed);

    }
}


package prog2;

public class Producer extends Thread{

protected ProducerConsumer implementation;

public Producer (ProducerConsumer implementation){
    this.implementation = implementation;
}

@Override
public void run(){
        try{ 
            while(implementation.done_processing != true){
                implementation.put();
            }
        }catch(InterruptedException e){
        }
}  
}

package prog2;

public class Consumer extends Thread{

protected ProducerConsumer implementation;

public Consumer (ProducerConsumer implementation){
    this.implementation = implementation;
}
  @Override
  public void run(){
    try{ 
        while(implementation.done_consuming != true){
            implementation.get();
        }
    }catch(InterruptedException e){
    }
}  
}


package prog2;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.sql.Timestamp;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;

class ProducerConsumer {
//Queue Creation
private final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
private final Random theRandom = new Random();
//Semaphore Creation
static Semaphore semProd = new Semaphore(1);
static Semaphore semCon = new Semaphore(0);

public boolean done_processing = false;
public boolean done_consuming = false;
public int num_items = 0;
private int size_buffer = 0;
public int num_produced = 0;
public int num_consumed = 0;

public ProducerConsumer (int size_buffer, int num_items){
    this.size_buffer = size_buffer;
    this.num_items = num_items;
}

public void write_producer_log (String data) throws IOException{
    File f1 = new File ("producer-event.log");
    if(!f1.exists()){
        f1.createNewFile();
    }
    FileWriter fileWriter = new FileWriter(f1.getName(),true);
    try (BufferedWriter bw = new BufferedWriter(fileWriter)) {
        bw.write(data);
        bw.newLine();
    }
}

public void write_consumer_log (String data) throws IOException{
    File f1 = new File ("consumer-event.log");
    if(!f1.exists()){
        f1.createNewFile();
    }
    FileWriter fileWriter = new FileWriter(f1.getName(),true);
    try (BufferedWriter bw = new BufferedWriter(fileWriter)) {
        bw.write(data);
        bw.newLine();
    }
}

public void put() throws InterruptedException {
    semProd.acquire();
    try {
        if(num_produced >= num_items){
            done_processing = true;
        }
            if (queue.size() == size_buffer) {
                return;
            }

            int number = theRandom.nextInt();
            boolean isAdded = queue.add(number);
            num_produced++;
                Timestamp timestamp = new Timestamp(System.nanoTime());
                String log_entry = timestamp + " Producer " +  
Thread.currentThread().getId() + " " + num_produced + " " + number +"\n";
              try{  
                write_producer_log(log_entry);
              }catch (IOException e){

              }
    } 
    finally {
            semCon.release();
    }

}

public void get() throws InterruptedException {
    semCon.acquire();
    try {
        if(num_consumed >= num_items){
            done_consuming = true;
        }
            if (queue.isEmpty()) {
                return;
            }

            Integer value = queue.take();
            num_consumed++;
                Timestamp timestamp = new Timestamp(System.nanoTime());
                String log_entry = timestamp + " Consumer " +  
Thread.currentThread().getId() + " " + num_consumed + " " + value +"\n";
              try{  
                write_consumer_log(log_entry);
              }catch (IOException e){

              }     
    } 
    finally {
        semProd.release();
    }
}
}

在classprog2中,你应该使用thread.join来等待所有的生产者和消费者在打印num_producednum_consumed之前完成他们的工作:

public class prog2 {

    public static void main(String[] args) throws Exception {

        Scanner scanner = new Scanner(System.in);
        System.out.println("Input number of producers threads, number of consumer threads, size of the buffer, and the number of items to be produced.");
        System.out.println("Format Ex: 4 5 10 1000");
        String input = scanner.nextLine();
        String[] numbers = input.split(" ");

        int num_producers, num_consumers, size_buffer, num_items, num_consumed, num_produced;
        num_producers = Integer.parseInt(numbers[0]);
        num_consumers = Integer.parseInt(numbers[1]);
        size_buffer = Integer.parseInt(numbers[2]);
        num_items = Integer.parseInt(numbers[3]);

        ProducerConsumer implementation = new ProducerConsumer(size_buffer, num_items);
        Producer[] producers = new Producer[num_producers];
        Consumer[] consumers = new Consumer[num_consumers];

        for (int producer_count = 0; producer_count < num_producers; producer_count++) { //creating multiple producers
            producers[producer_count] = new Producer(implementation);
            producers[producer_count].start();
        }

        for (int consumer_count = 0; consumer_count < num_consumers; consumer_count++) { //creating multiple consumers
            consumers[consumer_count] = new Consumer(implementation);
            consumers[consumer_count].start();
        }

        for (int producer_count = 0; producer_count < num_producers; producer_count++) {
            producers[producer_count].join();
        }

        for (int consumer_count = 0; consumer_count < num_consumers; consumer_count++) {
            consumers[consumer_count].join();
        }

        System.out.println("Number of Produced items: " + implementation.num_produced + 
                " Number of Consumed items: " + implementation.num_consumed);
    }
}

在classProducerConsumer,

  1. 您应该使用关键字 volatile 声明以下变量,因此 reader 个线程总能得到更新值:

    • done_processing
    • done_consuming
    • num_produced
    • num_consumed
  2. 你应该改用Semaphore.tryAcquire(long timeout, TimeUnit unit) Semaphore.acquire(),这将防止线程被阻塞时 done_consuming = truedone_processing = true.
  3. done_consuming = truedone_processing = true,而不是继续执行。

这里是ProducerConsumer没有日志的代码:

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

class ProducerConsumer {
    //Queue Creation
    private final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
    private final Random theRandom = new Random();
    //Semaphore Creation
    static Semaphore semProd = new Semaphore(1);
    static Semaphore semCon = new Semaphore(0);

    public volatile boolean done_processing = false;
    public volatile boolean done_consuming = false;

    public int num_items = 0;
    private int size_buffer = 0;

    public volatile int num_produced = 0;
    public volatile int num_consumed = 0;

    public ProducerConsumer(int size_buffer, int num_items) {
        this.size_buffer = size_buffer;
        this.num_items = num_items;
    }

    public void put() throws InterruptedException {
        if (semProd.tryAcquire(100, TimeUnit.MILLISECONDS)) {
            try {
                if (num_produced >= num_items) {
                    done_processing = true;
                    return;
                }
                if (queue.size() == size_buffer) {
                    return;
                }
                int number = theRandom.nextInt();
                boolean isAdded = queue.add(number);
                num_produced++;
            } finally {
                semCon.release();
            }
        }
    }

    public void get() throws InterruptedException {
        if (semCon.tryAcquire(100, TimeUnit.MILLISECONDS)) {
            try {
                if (num_consumed >= num_items) {
                    done_consuming = true;
                    return;
                }
                if (queue.isEmpty()) {
                    return;
                }
                Integer value = queue.take();
                num_consumed++;
            } finally {
                semProd.release();
            }
        }
    }
}

测试结果:

Input number of producers threads, number of consumer threads, size of the buffer, and the number of items to be produced.
Format Ex: 4 5 10 1000
4 5 10 1000
Number of Produced items: 1000 Number of Consumed items: 1000

Input number of producers threads, number of consumer threads, size of the buffer, and the number of items to be produced.
Format Ex: 4 5 10 1000
10 20 30 10000
Number of Produced items: 10000 Number of Consumed items: 10000