将显式和隐式并行与 java-8 流混合
Mix explicit and implicit parallelism with java-8 streams
过去我用两个线程编写了一些 java 程序。
第一个线程(生产者)正在从 API(C 库)读取数据,创建一个 java 对象,将该对象发送到另一个线程。
C API 正在传递事件流(无限)。
线程使用 LinkedBlockingQueue 作为管道来交换对象(放置、轮询)。
第二个线程(消费者)正在处理对象。
(我还发现线程内的代码更具可读性。第一个线程正在处理 C API 内容并生成
正确的 java 对象,第二个线程不受 C API 处理,正在处理数据)。
现在我很感兴趣,我如何通过 java 8 中的新流 API 实现上述场景。
但假设我想保留这两个线程 (producer/consumer)!
第一个线程正在写入流。第二个线程正在从流中读取。
我也希望,我可以用这种技术处理更好的显式并行性 (producer/consumer)
在流中我可以使用一些隐式并行性(例如 stream.parallel()).
我对新流 api 没有太多经验。
所以我在下面尝试了以下代码,以解决上面的想法。
- 我使用 'generate' 访问 C API 并将其提供给 java 流。
- 我在消费者线程中使用 .parallel() 来测试和处理隐式并行性。看起来不错。但见下文。
问题:
- 'generate'在这种情况下对生产者来说是最好的方法吗?
- 我对如何 terminate/close 生产者中的流有理解上的问题,
如果 API 有一些错误 AND 我想关闭整个管道。
我是使用 stream.close 还是抛出异常?
- 2.1 我使用了 stream.close()。但是关闭后'generate'还是运行,
我发现只抛出一个异常来终止生成部分。
此异常将进入流并且消费者正在接收异常
(这对我来说很好,消费者可以识别并终止)。
但在这种情况下,生产者生产的数量超过消费者处理的数量,同时出现异常。
- 2.2 如果消费者使用隐式并行 stream.parallel()。生产者正在处理更多的项目。
所以我看不到这个问题的任何解决方案。 (访问CAPI,检查错误,做出决定)。
- 2.3 producer抛出异常到达consumer stream,插入的对象没有处理完
再说一次:想法是与线程有明确的并行性。
但在内部我可以处理新功能并尽可能使用并行处理
也感谢您提出这个问题。
package sandbox.test;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.LongStream;
public class MyStream {
private volatile LongStream stream = null;
private AtomicInteger producerCount = new AtomicInteger(0);
private AtomicInteger consumerCount = new AtomicInteger(0);
private AtomicInteger apiError = new AtomicInteger(0);
public static void main(String[] args) throws InterruptedException {
MyStream appl = new MyStream();
appl.create();
}
private static void sleep(long sleep) {
try {
Thread.sleep(sleep);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private static void apiError(final String pos, final int iteration) {
RuntimeException apiException = new RuntimeException("API error pos=" + pos + " iteration=" + iteration);
System.out.println(apiException.getMessage());
throw apiException;
}
final private int simulateErrorAfter = 10;
private Thread produce() {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("Producer started");
stream = LongStream.generate(() -> {
int localCount;
// Detect error, while using stream.parallel() processing
int error = apiError.get();
if ( error > 0 )
apiError("1", error);
// ----- Accessing the C API here -----
localCount = producerCount.incrementAndGet(); // C API access; delegate for accessing the C API
// ----- Accessing the C API here -----
// Checking error code from C API
if ( localCount > simulateErrorAfter ) { // Simulate an API error
producerCount.decrementAndGet();
stream.close();
apiError("2", apiError.incrementAndGet());
}
System.out.println("P: " + localCount);
sleep(200L);
return localCount;
});
System.out.println("Producer terminated");
}
});
thread.start();
return thread;
}
private Thread consume() {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
stream.onClose(new Runnable() {
@Override
public void run() {
System.out.println("Close detected");
}
}).parallel().forEach(l -> {
sleep(1000);
System.out.println("C: " + l);
consumerCount.incrementAndGet();
});
} catch (Exception e) {
// Capturing the stream end
System.out.println(e);
}
System.out.println("Consumer terminated");
}
});
thread.start();
return thread;
}
private void create() throws InterruptedException {
Thread producer = produce();
while ( stream == null )
sleep(10);
Thread consumer = consume();
producer.join();
consumer.join();
System.out.println("Produced: " + producerCount);
System.out.println("Consumed: " + consumerCount);
}
}
您需要了解有关 Stream
API 的一些基本要点:
在流上应用的所有操作都是惰性的,在应用终端操作之前不会做任何事情。使用“生产者”线程创建流是没有意义的,因为这个线程不会做任何事情。所有操作都在您的“消费者”线程中执行,后台线程由 Stream
实现本身启动。创建 Stream
实例的线程完全不相关
关闭流与 Stream
操作本身无关,即不会关闭线程。它旨在释放 额外的 资源,例如关闭与 Files.lines(…)
返回的流关联的文件。您可以使用 onClose
安排此类清理操作,并且 Stream
将在您调用 close
时调用它们,仅此而已。对于Stream
class本身没有任何意义。
Stream
s 不会模拟“一个线程正在写入,另一个线程正在读取”这样的场景。他们的模型是“一个线程正在调用您的 Supplier
,然后调用您的 Consumer
,另一个线程执行相同的操作,x 其他线程也是……”
如果您想使用不同的生产者和消费者线程实施 producer/consumer 方案,最好使用 Thread
或 ExecutorService
和线程安全队列。
但您仍然可以使用 Java 8 项功能。例如。无需使用内部 classes 实现 Runnable
s;你可以为它们使用 lambda 表达式。
过去我用两个线程编写了一些 java 程序。 第一个线程(生产者)正在从 API(C 库)读取数据,创建一个 java 对象,将该对象发送到另一个线程。 C API 正在传递事件流(无限)。 线程使用 LinkedBlockingQueue 作为管道来交换对象(放置、轮询)。 第二个线程(消费者)正在处理对象。 (我还发现线程内的代码更具可读性。第一个线程正在处理 C API 内容并生成 正确的 java 对象,第二个线程不受 C API 处理,正在处理数据)。
现在我很感兴趣,我如何通过 java 8 中的新流 API 实现上述场景。 但假设我想保留这两个线程 (producer/consumer)! 第一个线程正在写入流。第二个线程正在从流中读取。 我也希望,我可以用这种技术处理更好的显式并行性 (producer/consumer) 在流中我可以使用一些隐式并行性(例如 stream.parallel()).
我对新流 api 没有太多经验。 所以我在下面尝试了以下代码,以解决上面的想法。
- 我使用 'generate' 访问 C API 并将其提供给 java 流。
- 我在消费者线程中使用 .parallel() 来测试和处理隐式并行性。看起来不错。但见下文。
问题:
- 'generate'在这种情况下对生产者来说是最好的方法吗?
- 我对如何 terminate/close 生产者中的流有理解上的问题,
如果 API 有一些错误 AND 我想关闭整个管道。
我是使用 stream.close 还是抛出异常?
- 2.1 我使用了 stream.close()。但是关闭后'generate'还是运行, 我发现只抛出一个异常来终止生成部分。 此异常将进入流并且消费者正在接收异常 (这对我来说很好,消费者可以识别并终止)。 但在这种情况下,生产者生产的数量超过消费者处理的数量,同时出现异常。
- 2.2 如果消费者使用隐式并行 stream.parallel()。生产者正在处理更多的项目。 所以我看不到这个问题的任何解决方案。 (访问CAPI,检查错误,做出决定)。
- 2.3 producer抛出异常到达consumer stream,插入的对象没有处理完
再说一次:想法是与线程有明确的并行性。 但在内部我可以处理新功能并尽可能使用并行处理
也感谢您提出这个问题。
package sandbox.test;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.LongStream;
public class MyStream {
private volatile LongStream stream = null;
private AtomicInteger producerCount = new AtomicInteger(0);
private AtomicInteger consumerCount = new AtomicInteger(0);
private AtomicInteger apiError = new AtomicInteger(0);
public static void main(String[] args) throws InterruptedException {
MyStream appl = new MyStream();
appl.create();
}
private static void sleep(long sleep) {
try {
Thread.sleep(sleep);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private static void apiError(final String pos, final int iteration) {
RuntimeException apiException = new RuntimeException("API error pos=" + pos + " iteration=" + iteration);
System.out.println(apiException.getMessage());
throw apiException;
}
final private int simulateErrorAfter = 10;
private Thread produce() {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("Producer started");
stream = LongStream.generate(() -> {
int localCount;
// Detect error, while using stream.parallel() processing
int error = apiError.get();
if ( error > 0 )
apiError("1", error);
// ----- Accessing the C API here -----
localCount = producerCount.incrementAndGet(); // C API access; delegate for accessing the C API
// ----- Accessing the C API here -----
// Checking error code from C API
if ( localCount > simulateErrorAfter ) { // Simulate an API error
producerCount.decrementAndGet();
stream.close();
apiError("2", apiError.incrementAndGet());
}
System.out.println("P: " + localCount);
sleep(200L);
return localCount;
});
System.out.println("Producer terminated");
}
});
thread.start();
return thread;
}
private Thread consume() {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
stream.onClose(new Runnable() {
@Override
public void run() {
System.out.println("Close detected");
}
}).parallel().forEach(l -> {
sleep(1000);
System.out.println("C: " + l);
consumerCount.incrementAndGet();
});
} catch (Exception e) {
// Capturing the stream end
System.out.println(e);
}
System.out.println("Consumer terminated");
}
});
thread.start();
return thread;
}
private void create() throws InterruptedException {
Thread producer = produce();
while ( stream == null )
sleep(10);
Thread consumer = consume();
producer.join();
consumer.join();
System.out.println("Produced: " + producerCount);
System.out.println("Consumed: " + consumerCount);
}
}
您需要了解有关 Stream
API 的一些基本要点:
在流上应用的所有操作都是惰性的,在应用终端操作之前不会做任何事情。使用“生产者”线程创建流是没有意义的,因为这个线程不会做任何事情。所有操作都在您的“消费者”线程中执行,后台线程由
Stream
实现本身启动。创建Stream
实例的线程完全不相关关闭流与
Stream
操作本身无关,即不会关闭线程。它旨在释放 额外的 资源,例如关闭与Files.lines(…)
返回的流关联的文件。您可以使用onClose
安排此类清理操作,并且Stream
将在您调用close
时调用它们,仅此而已。对于Stream
class本身没有任何意义。Stream
s 不会模拟“一个线程正在写入,另一个线程正在读取”这样的场景。他们的模型是“一个线程正在调用您的Supplier
,然后调用您的Consumer
,另一个线程执行相同的操作,x 其他线程也是……”如果您想使用不同的生产者和消费者线程实施 producer/consumer 方案,最好使用
Thread
或ExecutorService
和线程安全队列。
但您仍然可以使用 Java 8 项功能。例如。无需使用内部 classes 实现 Runnable
s;你可以为它们使用 lambda 表达式。