多个生产者和多个消费者 Java 信号量阻塞队列
Multiple Producers and Multiple Consumers Java Semaphore Blocking Queue
我们正在尝试围绕一个信号量锁队列实现多个生产者和多个消费者。我们 运行 遇到了生产的物品比我告诉它的要多的问题。有人可以帮我弄清楚我们哪里出错了吗?谢谢
package prog2;
import java.util.Scanner;
public class prog2 {
public static void main(String[] args) throws Exception {
Scanner scanner = new Scanner(System.in);
System.out.println("Input number of producers threads, number of consumer threads, size of the buffer, and the number of items to be produced.");
System.out.println("Format Ex: 4 5 10 1000");
String input = scanner.nextLine();
String[] numbers = input.split(" ");
int num_producers, num_consumers, size_buffer, num_items, num_consumed, num_produced;
num_producers = Integer.parseInt(numbers[0]);
num_consumers = Integer.parseInt(numbers[1]);
size_buffer = Integer.parseInt(numbers[2]);
num_items = Integer.parseInt(numbers[3]);
ProducerConsumer implementation = new ProducerConsumer(size_buffer, num_items);
for(int producer_count = 0; producer_count < num_producers; producer_count++){ //creating multiple producers
Producer p = new Producer(implementation);
p.start();
}
for(int consumer_count = 0; consumer_count < num_consumers; consumer_count++){ //creating multiple consumers
Consumer c = new Consumer (implementation);
c.start();
}
System.out.println("Number of Produced items: " + implementation.num_produced + " Number of Consumed items: " + implementation.num_consumed);
}
}
package prog2;
public class Producer extends Thread{
protected ProducerConsumer implementation;
public Producer (ProducerConsumer implementation){
this.implementation = implementation;
}
@Override
public void run(){
try{
while(implementation.done_processing != true){
implementation.put();
}
}catch(InterruptedException e){
}
}
}
package prog2;
public class Consumer extends Thread{
protected ProducerConsumer implementation;
public Consumer (ProducerConsumer implementation){
this.implementation = implementation;
}
@Override
public void run(){
try{
while(implementation.done_consuming != true){
implementation.get();
}
}catch(InterruptedException e){
}
}
}
package prog2;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.sql.Timestamp;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
class ProducerConsumer {
//Queue Creation
private final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
private final Random theRandom = new Random();
//Semaphore Creation
static Semaphore semProd = new Semaphore(1);
static Semaphore semCon = new Semaphore(0);
public boolean done_processing = false;
public boolean done_consuming = false;
public int num_items = 0;
private int size_buffer = 0;
public int num_produced = 0;
public int num_consumed = 0;
public ProducerConsumer (int size_buffer, int num_items){
this.size_buffer = size_buffer;
this.num_items = num_items;
}
public void write_producer_log (String data) throws IOException{
File f1 = new File ("producer-event.log");
if(!f1.exists()){
f1.createNewFile();
}
FileWriter fileWriter = new FileWriter(f1.getName(),true);
try (BufferedWriter bw = new BufferedWriter(fileWriter)) {
bw.write(data);
bw.newLine();
}
}
public void write_consumer_log (String data) throws IOException{
File f1 = new File ("consumer-event.log");
if(!f1.exists()){
f1.createNewFile();
}
FileWriter fileWriter = new FileWriter(f1.getName(),true);
try (BufferedWriter bw = new BufferedWriter(fileWriter)) {
bw.write(data);
bw.newLine();
}
}
public void put() throws InterruptedException {
semProd.acquire();
try {
if(num_produced >= num_items){
done_processing = true;
}
if (queue.size() == size_buffer) {
return;
}
int number = theRandom.nextInt();
boolean isAdded = queue.add(number);
num_produced++;
Timestamp timestamp = new Timestamp(System.nanoTime());
String log_entry = timestamp + " Producer " +
Thread.currentThread().getId() + " " + num_produced + " " + number +"\n";
try{
write_producer_log(log_entry);
}catch (IOException e){
}
}
finally {
semCon.release();
}
}
public void get() throws InterruptedException {
semCon.acquire();
try {
if(num_consumed >= num_items){
done_consuming = true;
}
if (queue.isEmpty()) {
return;
}
Integer value = queue.take();
num_consumed++;
Timestamp timestamp = new Timestamp(System.nanoTime());
String log_entry = timestamp + " Consumer " +
Thread.currentThread().getId() + " " + num_consumed + " " + value +"\n";
try{
write_consumer_log(log_entry);
}catch (IOException e){
}
}
finally {
semProd.release();
}
}
}
在classprog2
中,你应该使用thread.join
来等待所有的生产者和消费者在打印num_produced
和num_consumed
之前完成他们的工作:
public class prog2 {
public static void main(String[] args) throws Exception {
Scanner scanner = new Scanner(System.in);
System.out.println("Input number of producers threads, number of consumer threads, size of the buffer, and the number of items to be produced.");
System.out.println("Format Ex: 4 5 10 1000");
String input = scanner.nextLine();
String[] numbers = input.split(" ");
int num_producers, num_consumers, size_buffer, num_items, num_consumed, num_produced;
num_producers = Integer.parseInt(numbers[0]);
num_consumers = Integer.parseInt(numbers[1]);
size_buffer = Integer.parseInt(numbers[2]);
num_items = Integer.parseInt(numbers[3]);
ProducerConsumer implementation = new ProducerConsumer(size_buffer, num_items);
Producer[] producers = new Producer[num_producers];
Consumer[] consumers = new Consumer[num_consumers];
for (int producer_count = 0; producer_count < num_producers; producer_count++) { //creating multiple producers
producers[producer_count] = new Producer(implementation);
producers[producer_count].start();
}
for (int consumer_count = 0; consumer_count < num_consumers; consumer_count++) { //creating multiple consumers
consumers[consumer_count] = new Consumer(implementation);
consumers[consumer_count].start();
}
for (int producer_count = 0; producer_count < num_producers; producer_count++) {
producers[producer_count].join();
}
for (int consumer_count = 0; consumer_count < num_consumers; consumer_count++) {
consumers[consumer_count].join();
}
System.out.println("Number of Produced items: " + implementation.num_produced +
" Number of Consumed items: " + implementation.num_consumed);
}
}
在classProducerConsumer
,
您应该使用关键字 volatile
声明以下变量,因此
reader 个线程总能得到更新值:
- done_processing
- done_consuming
- num_produced
- num_consumed
- 你应该改用
Semaphore.tryAcquire(long timeout, TimeUnit unit)
Semaphore.acquire()
,这将防止线程被阻塞时
done_consuming = true
或 done_processing = true
.
- 当
done_consuming = true
或
done_processing = true
,而不是继续执行。
这里是ProducerConsumer
没有日志的代码:
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
class ProducerConsumer {
//Queue Creation
private final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
private final Random theRandom = new Random();
//Semaphore Creation
static Semaphore semProd = new Semaphore(1);
static Semaphore semCon = new Semaphore(0);
public volatile boolean done_processing = false;
public volatile boolean done_consuming = false;
public int num_items = 0;
private int size_buffer = 0;
public volatile int num_produced = 0;
public volatile int num_consumed = 0;
public ProducerConsumer(int size_buffer, int num_items) {
this.size_buffer = size_buffer;
this.num_items = num_items;
}
public void put() throws InterruptedException {
if (semProd.tryAcquire(100, TimeUnit.MILLISECONDS)) {
try {
if (num_produced >= num_items) {
done_processing = true;
return;
}
if (queue.size() == size_buffer) {
return;
}
int number = theRandom.nextInt();
boolean isAdded = queue.add(number);
num_produced++;
} finally {
semCon.release();
}
}
}
public void get() throws InterruptedException {
if (semCon.tryAcquire(100, TimeUnit.MILLISECONDS)) {
try {
if (num_consumed >= num_items) {
done_consuming = true;
return;
}
if (queue.isEmpty()) {
return;
}
Integer value = queue.take();
num_consumed++;
} finally {
semProd.release();
}
}
}
}
测试结果:
Input number of producers threads, number of consumer threads, size of the buffer, and the number of items to be produced.
Format Ex: 4 5 10 1000
4 5 10 1000
Number of Produced items: 1000 Number of Consumed items: 1000
Input number of producers threads, number of consumer threads, size of the buffer, and the number of items to be produced.
Format Ex: 4 5 10 1000
10 20 30 10000
Number of Produced items: 10000 Number of Consumed items: 10000
我们正在尝试围绕一个信号量锁队列实现多个生产者和多个消费者。我们 运行 遇到了生产的物品比我告诉它的要多的问题。有人可以帮我弄清楚我们哪里出错了吗?谢谢
package prog2;
import java.util.Scanner;
public class prog2 {
public static void main(String[] args) throws Exception {
Scanner scanner = new Scanner(System.in);
System.out.println("Input number of producers threads, number of consumer threads, size of the buffer, and the number of items to be produced.");
System.out.println("Format Ex: 4 5 10 1000");
String input = scanner.nextLine();
String[] numbers = input.split(" ");
int num_producers, num_consumers, size_buffer, num_items, num_consumed, num_produced;
num_producers = Integer.parseInt(numbers[0]);
num_consumers = Integer.parseInt(numbers[1]);
size_buffer = Integer.parseInt(numbers[2]);
num_items = Integer.parseInt(numbers[3]);
ProducerConsumer implementation = new ProducerConsumer(size_buffer, num_items);
for(int producer_count = 0; producer_count < num_producers; producer_count++){ //creating multiple producers
Producer p = new Producer(implementation);
p.start();
}
for(int consumer_count = 0; consumer_count < num_consumers; consumer_count++){ //creating multiple consumers
Consumer c = new Consumer (implementation);
c.start();
}
System.out.println("Number of Produced items: " + implementation.num_produced + " Number of Consumed items: " + implementation.num_consumed);
}
}
package prog2;
public class Producer extends Thread{
protected ProducerConsumer implementation;
public Producer (ProducerConsumer implementation){
this.implementation = implementation;
}
@Override
public void run(){
try{
while(implementation.done_processing != true){
implementation.put();
}
}catch(InterruptedException e){
}
}
}
package prog2;
public class Consumer extends Thread{
protected ProducerConsumer implementation;
public Consumer (ProducerConsumer implementation){
this.implementation = implementation;
}
@Override
public void run(){
try{
while(implementation.done_consuming != true){
implementation.get();
}
}catch(InterruptedException e){
}
}
}
package prog2;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.sql.Timestamp;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
class ProducerConsumer {
//Queue Creation
private final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
private final Random theRandom = new Random();
//Semaphore Creation
static Semaphore semProd = new Semaphore(1);
static Semaphore semCon = new Semaphore(0);
public boolean done_processing = false;
public boolean done_consuming = false;
public int num_items = 0;
private int size_buffer = 0;
public int num_produced = 0;
public int num_consumed = 0;
public ProducerConsumer (int size_buffer, int num_items){
this.size_buffer = size_buffer;
this.num_items = num_items;
}
public void write_producer_log (String data) throws IOException{
File f1 = new File ("producer-event.log");
if(!f1.exists()){
f1.createNewFile();
}
FileWriter fileWriter = new FileWriter(f1.getName(),true);
try (BufferedWriter bw = new BufferedWriter(fileWriter)) {
bw.write(data);
bw.newLine();
}
}
public void write_consumer_log (String data) throws IOException{
File f1 = new File ("consumer-event.log");
if(!f1.exists()){
f1.createNewFile();
}
FileWriter fileWriter = new FileWriter(f1.getName(),true);
try (BufferedWriter bw = new BufferedWriter(fileWriter)) {
bw.write(data);
bw.newLine();
}
}
public void put() throws InterruptedException {
semProd.acquire();
try {
if(num_produced >= num_items){
done_processing = true;
}
if (queue.size() == size_buffer) {
return;
}
int number = theRandom.nextInt();
boolean isAdded = queue.add(number);
num_produced++;
Timestamp timestamp = new Timestamp(System.nanoTime());
String log_entry = timestamp + " Producer " +
Thread.currentThread().getId() + " " + num_produced + " " + number +"\n";
try{
write_producer_log(log_entry);
}catch (IOException e){
}
}
finally {
semCon.release();
}
}
public void get() throws InterruptedException {
semCon.acquire();
try {
if(num_consumed >= num_items){
done_consuming = true;
}
if (queue.isEmpty()) {
return;
}
Integer value = queue.take();
num_consumed++;
Timestamp timestamp = new Timestamp(System.nanoTime());
String log_entry = timestamp + " Consumer " +
Thread.currentThread().getId() + " " + num_consumed + " " + value +"\n";
try{
write_consumer_log(log_entry);
}catch (IOException e){
}
}
finally {
semProd.release();
}
}
}
在classprog2
中,你应该使用thread.join
来等待所有的生产者和消费者在打印num_produced
和num_consumed
之前完成他们的工作:
public class prog2 {
public static void main(String[] args) throws Exception {
Scanner scanner = new Scanner(System.in);
System.out.println("Input number of producers threads, number of consumer threads, size of the buffer, and the number of items to be produced.");
System.out.println("Format Ex: 4 5 10 1000");
String input = scanner.nextLine();
String[] numbers = input.split(" ");
int num_producers, num_consumers, size_buffer, num_items, num_consumed, num_produced;
num_producers = Integer.parseInt(numbers[0]);
num_consumers = Integer.parseInt(numbers[1]);
size_buffer = Integer.parseInt(numbers[2]);
num_items = Integer.parseInt(numbers[3]);
ProducerConsumer implementation = new ProducerConsumer(size_buffer, num_items);
Producer[] producers = new Producer[num_producers];
Consumer[] consumers = new Consumer[num_consumers];
for (int producer_count = 0; producer_count < num_producers; producer_count++) { //creating multiple producers
producers[producer_count] = new Producer(implementation);
producers[producer_count].start();
}
for (int consumer_count = 0; consumer_count < num_consumers; consumer_count++) { //creating multiple consumers
consumers[consumer_count] = new Consumer(implementation);
consumers[consumer_count].start();
}
for (int producer_count = 0; producer_count < num_producers; producer_count++) {
producers[producer_count].join();
}
for (int consumer_count = 0; consumer_count < num_consumers; consumer_count++) {
consumers[consumer_count].join();
}
System.out.println("Number of Produced items: " + implementation.num_produced +
" Number of Consumed items: " + implementation.num_consumed);
}
}
在classProducerConsumer
,
您应该使用关键字
volatile
声明以下变量,因此 reader 个线程总能得到更新值:- done_processing
- done_consuming
- num_produced
- num_consumed
- 你应该改用
Semaphore.tryAcquire(long timeout, TimeUnit unit)
Semaphore.acquire()
,这将防止线程被阻塞时done_consuming = true
或done_processing = true
. - 当
done_consuming = true
或done_processing = true
,而不是继续执行。
这里是ProducerConsumer
没有日志的代码:
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
class ProducerConsumer {
//Queue Creation
private final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
private final Random theRandom = new Random();
//Semaphore Creation
static Semaphore semProd = new Semaphore(1);
static Semaphore semCon = new Semaphore(0);
public volatile boolean done_processing = false;
public volatile boolean done_consuming = false;
public int num_items = 0;
private int size_buffer = 0;
public volatile int num_produced = 0;
public volatile int num_consumed = 0;
public ProducerConsumer(int size_buffer, int num_items) {
this.size_buffer = size_buffer;
this.num_items = num_items;
}
public void put() throws InterruptedException {
if (semProd.tryAcquire(100, TimeUnit.MILLISECONDS)) {
try {
if (num_produced >= num_items) {
done_processing = true;
return;
}
if (queue.size() == size_buffer) {
return;
}
int number = theRandom.nextInt();
boolean isAdded = queue.add(number);
num_produced++;
} finally {
semCon.release();
}
}
}
public void get() throws InterruptedException {
if (semCon.tryAcquire(100, TimeUnit.MILLISECONDS)) {
try {
if (num_consumed >= num_items) {
done_consuming = true;
return;
}
if (queue.isEmpty()) {
return;
}
Integer value = queue.take();
num_consumed++;
} finally {
semProd.release();
}
}
}
}
测试结果:
Input number of producers threads, number of consumer threads, size of the buffer, and the number of items to be produced.
Format Ex: 4 5 10 1000
4 5 10 1000
Number of Produced items: 1000 Number of Consumed items: 1000
Input number of producers threads, number of consumer threads, size of the buffer, and the number of items to be produced.
Format Ex: 4 5 10 1000
10 20 30 10000
Number of Produced items: 10000 Number of Consumed items: 10000