为什么具有可完成未来的代码比并行流慢?
why is the code with completable future slower than the parallel stream?
我有一段代码。该代码用于学习 CompletableFuture
。
package com.test.omn.hello;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
public class CompletableFutureLearning {
public static void main(String[] args) {
List<Shop> shops = new ArrayList<>();
shops.add(new Shop("Videocon Tv", "100 $"));
shops.add(new Shop("Videocon Tv", "200 $"));
shops.add(new Shop("Videocon Tv", "300 $"));
shops.add(new Shop("Videocon Tv", "400 $"));
long start_time;
long end_time;
double difference;
System.out.println("parallel stream");
start_time = System.nanoTime();
shops.parallelStream().forEach(e -> System.out.println(e.getPrice()));
end_time = System.nanoTime();
difference = (end_time - start_time) / 1e6;
System.out.println("execution time " + difference);
System.out.println("completable futures stream");
start_time = System.nanoTime();
List<CompletableFuture<String>> result = shops.parallelStream()
.map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice())).collect(Collectors.toList());
List<String> result1 = result.parallelStream().map(CompletableFuture::join).collect(Collectors.toList());
result1.forEach(e -> System.out.println(e));
end_time = System.nanoTime();
difference = (end_time - start_time) / 1e6;
System.out.println("execution time " + difference);
}
static public class Shop {
public Shop(String name, String price) {
super();
this.name = name;
this.price = price;
}
private String name;
private String price;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getPrice() {
try {
Thread.sleep(3000l);
} catch (InterruptedException e) {
}
return price;
}
public void setPrice(String price) {
this.price = price;
}
}
}
下面是我运行代码时的结果。我总是可以看到并行流的执行时间比 CompletableFuture
的执行时间快。我希望执行时间或多或少相似。知道为什么会这样吗?
parallel stream
300 $
400 $
100 $
200 $
execution time 3079.88547
completable futures stream
100 $
200 $
300 $
400 $
execution time 6018.84133
我想在第二个例子中,这里:
List<String> result1 = result.parallelStream().map(CompletableFuture::join).collect(Collectors.toList());
您通过单独的线程执行两次包装您的代码:一次是在执行 parallelStream 时,第二次是在调用 CompletableFuture::join 时调用 already async CompletableFuture。
考虑按流交换第二扇区的parallelStream:
List<String> result1 = result.stream().map(CompletableFuture::join).collect(Collectors.toList());
P.S。在我的机器上运行几次后的结果几乎相同:
parallel stream
300 $
400 $
200 $
100 $
execution time 3007.854272
completable futures stream
100 $
200 $
300 $
400 $
execution time 3006.914028
在您的情况下,公共线程池中的线程数量可能少于情况 #2 中所需的线程数量,因此按照我考虑的方式更改代码应该可以解决问题。
问题出在线程池中的线程数上。
以下代码将在线程池中使用 x 个线程执行。
shops.parallelStream().forEach(e -> System.out.println(e.getPrice()));
以下应该在线程池中使用 y 个线程执行
List<CompletableFuture<String>> result = shops.parallelStream()
.map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice())).collect(Collectors.toList());
List<String> result1 = result.parallelStream().map(CompletableFuture::join).collect(Collectors.toList());
在我的机器上它可能是。 x > y
但是一旦我更改了下面的代码,结果就不同了[=14=]
包 com.test.omn.hello;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
public class CompletableFutureLearning {
public static void main(String[] args) {
List<Shop> shops = new ArrayList<>();
shops.add(new Shop("Videocon Tv", "100 $"));
shops.add(new Shop("Videocon Tv", "200 $"));
shops.add(new Shop("Videocon Tv", "300 $"));
shops.add(new Shop("Videocon Tv", "400 $"));
shops.add(new Shop("Videocon Tv", "100 $"));
shops.add(new Shop("Videocon Tv", "200 $"));
shops.add(new Shop("Videocon Tv", "300 $"));
shops.add(new Shop("Videocon Tv", "400 $"));
shops.add(new Shop("Videocon Tv", "300 $"));
shops.add(new Shop("Videocon Tv", "400 $"));
long start_time;
long end_time;
double difference;
// System.out.println("sequential stream");
//
// long start_time = System.nanoTime();
// long end_time = System.nanoTime();
// double difference = (end_time - start_time) / 1e6;
// System.out.println("execution time "+ difference);
System.out.println("parallel stream");
start_time = System.nanoTime();
shops.parallelStream().forEach(e -> System.out.println(e.getPrice()));
end_time = System.nanoTime();
difference = (end_time - start_time) / 1e6;
System.out.println("execution time " + difference);
System.out.println("completable futures stream");
start_time = System.nanoTime();
ExecutorService threadPool = Executors.newFixedThreadPool(8);
List<CompletableFuture<String>> result = shops.stream()
.map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(),threadPool)).collect(Collectors.toList());
List<String> result1 = result.stream().map(CompletableFuture::join).collect(Collectors.toList());
result1.forEach(e -> System.out.println(e));
end_time = System.nanoTime();
difference = (end_time - start_time) / 1e6;
System.out.println("execution time " + difference);
}
static public class Shop {
public Shop(String name, String price) {
super();
this.name = name;
this.price = price;
}
private String name;
private String price;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getPrice() {
try {
Thread.sleep(2000l);
} catch (InterruptedException e) {
}
return price;
}
public void setPrice(String price) {
this.price = price;
}
}
}
现在有结果
parallel stream
300 $
200 $
300 $
300 $
400 $
100 $
100 $
200 $
400 $
400 $
execution time 6093.126747
completable futures stream
100 $
200 $
300 $
400 $
100 $
200 $
300 $
400 $
300 $
400 $
execution time 4022.263999
我有一段代码。该代码用于学习 CompletableFuture
。
package com.test.omn.hello;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
public class CompletableFutureLearning {
public static void main(String[] args) {
List<Shop> shops = new ArrayList<>();
shops.add(new Shop("Videocon Tv", "100 $"));
shops.add(new Shop("Videocon Tv", "200 $"));
shops.add(new Shop("Videocon Tv", "300 $"));
shops.add(new Shop("Videocon Tv", "400 $"));
long start_time;
long end_time;
double difference;
System.out.println("parallel stream");
start_time = System.nanoTime();
shops.parallelStream().forEach(e -> System.out.println(e.getPrice()));
end_time = System.nanoTime();
difference = (end_time - start_time) / 1e6;
System.out.println("execution time " + difference);
System.out.println("completable futures stream");
start_time = System.nanoTime();
List<CompletableFuture<String>> result = shops.parallelStream()
.map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice())).collect(Collectors.toList());
List<String> result1 = result.parallelStream().map(CompletableFuture::join).collect(Collectors.toList());
result1.forEach(e -> System.out.println(e));
end_time = System.nanoTime();
difference = (end_time - start_time) / 1e6;
System.out.println("execution time " + difference);
}
static public class Shop {
public Shop(String name, String price) {
super();
this.name = name;
this.price = price;
}
private String name;
private String price;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getPrice() {
try {
Thread.sleep(3000l);
} catch (InterruptedException e) {
}
return price;
}
public void setPrice(String price) {
this.price = price;
}
}
}
下面是我运行代码时的结果。我总是可以看到并行流的执行时间比 CompletableFuture
的执行时间快。我希望执行时间或多或少相似。知道为什么会这样吗?
parallel stream
300 $
400 $
100 $
200 $
execution time 3079.88547
completable futures stream
100 $
200 $
300 $
400 $
execution time 6018.84133
我想在第二个例子中,这里:
List<String> result1 = result.parallelStream().map(CompletableFuture::join).collect(Collectors.toList());
您通过单独的线程执行两次包装您的代码:一次是在执行 parallelStream 时,第二次是在调用 CompletableFuture::join 时调用 already async CompletableFuture。
考虑按流交换第二扇区的parallelStream:
List<String> result1 = result.stream().map(CompletableFuture::join).collect(Collectors.toList());
P.S。在我的机器上运行几次后的结果几乎相同:
parallel stream
300 $
400 $
200 $
100 $
execution time 3007.854272
completable futures stream
100 $
200 $
300 $
400 $
execution time 3006.914028
在您的情况下,公共线程池中的线程数量可能少于情况 #2 中所需的线程数量,因此按照我考虑的方式更改代码应该可以解决问题。
问题出在线程池中的线程数上。
以下代码将在线程池中使用 x 个线程执行。
shops.parallelStream().forEach(e -> System.out.println(e.getPrice()));
以下应该在线程池中使用 y 个线程执行
List<CompletableFuture<String>> result = shops.parallelStream()
.map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice())).collect(Collectors.toList());
List<String> result1 = result.parallelStream().map(CompletableFuture::join).collect(Collectors.toList());
在我的机器上它可能是。 x > y
但是一旦我更改了下面的代码,结果就不同了[=14=]
包 com.test.omn.hello;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
public class CompletableFutureLearning {
public static void main(String[] args) {
List<Shop> shops = new ArrayList<>();
shops.add(new Shop("Videocon Tv", "100 $"));
shops.add(new Shop("Videocon Tv", "200 $"));
shops.add(new Shop("Videocon Tv", "300 $"));
shops.add(new Shop("Videocon Tv", "400 $"));
shops.add(new Shop("Videocon Tv", "100 $"));
shops.add(new Shop("Videocon Tv", "200 $"));
shops.add(new Shop("Videocon Tv", "300 $"));
shops.add(new Shop("Videocon Tv", "400 $"));
shops.add(new Shop("Videocon Tv", "300 $"));
shops.add(new Shop("Videocon Tv", "400 $"));
long start_time;
long end_time;
double difference;
// System.out.println("sequential stream");
//
// long start_time = System.nanoTime();
// long end_time = System.nanoTime();
// double difference = (end_time - start_time) / 1e6;
// System.out.println("execution time "+ difference);
System.out.println("parallel stream");
start_time = System.nanoTime();
shops.parallelStream().forEach(e -> System.out.println(e.getPrice()));
end_time = System.nanoTime();
difference = (end_time - start_time) / 1e6;
System.out.println("execution time " + difference);
System.out.println("completable futures stream");
start_time = System.nanoTime();
ExecutorService threadPool = Executors.newFixedThreadPool(8);
List<CompletableFuture<String>> result = shops.stream()
.map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(),threadPool)).collect(Collectors.toList());
List<String> result1 = result.stream().map(CompletableFuture::join).collect(Collectors.toList());
result1.forEach(e -> System.out.println(e));
end_time = System.nanoTime();
difference = (end_time - start_time) / 1e6;
System.out.println("execution time " + difference);
}
static public class Shop {
public Shop(String name, String price) {
super();
this.name = name;
this.price = price;
}
private String name;
private String price;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getPrice() {
try {
Thread.sleep(2000l);
} catch (InterruptedException e) {
}
return price;
}
public void setPrice(String price) {
this.price = price;
}
}
}
现在有结果
parallel stream
300 $
200 $
300 $
300 $
400 $
100 $
100 $
200 $
400 $
400 $
execution time 6093.126747
completable futures stream
100 $
200 $
300 $
400 $
100 $
200 $
300 $
400 $
300 $
400 $
execution time 4022.263999