Java 8 CompletableFuture 、流和超时
Java 8 CompletableFuture , Stream and Timeouts
我正在尝试使用 CompletableFuture
和 Stream
同时处理一些数据
到目前为止我有:
public static void main(String[] args) throws InterruptedException, ExecutionException {
System.out.println("start");
List<String> collect = Stream.of("1", "2", "3", "4", "5",
"6", "7")
.map(x -> CompletableFuture.supplyAsync(getStringSupplier(x)))
.collect(Collectors.toList())
.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
System.out.println("stop out!");
}
public static Supplier<String> getStringSupplier(String text) {
return () -> {
System.out.println("start " + text);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("stop " + text);
return "asd" + text;
};
}
并且输出很好:
start
start 1
start 4
start 3
start 2
start 5
start 6
start 7
stop 4
stop 1
stop 5
stop 2
stop 6
stop 3
stop 7
stop out!
但是现在我想为该作业添加超时。假设它应该在 1 秒后取消。 return null 或 collect
列表中的其他值。 (我更喜欢一些指示原因的值)。
我怎样才能做到这一点?
提前感谢您的帮助。
您可以将作业包装在另一个 CompletableFuture 中,如果超过给定时间,它会发出 TimeoutException。如果你想特殊处理,你可以把TimeoutException catch块分开。
List<String> collect = null;
try {
collect = CompletableFuture.supplyAsync(() ->
Stream.of("1", "2", "3", "4", "5",
"6", "7")
.map(x -> CompletableFuture.supplyAsync(getStringSupplier(x)))
.collect(Collectors.toList())
.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
).get(5, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
e.printStackTrace();
//separate out the TimeoutException if you want to handle it differently
}
System.out.println(collect); //would be null in case of any exception
您可以使用执行器参数尝试 CompletableFuture 的重载 supplyAsync 方法 (CompletableFuture.supplyAsync(getStringSupplier(x), timeoutExecutorService)) timeoutExecutorService 可以参考link
我找到了这样做的方法:
private static final ScheduledExecutorService scheduler =
Executors.newScheduledThreadPool(
1,
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("failAfter-%d")
.build());
public static void main(String[] args) throws InterruptedException, ExecutionException {
System.out.println("start");
final CompletableFuture<Object> oneSecondTimeout = failAfter(Duration.ofSeconds(1))
.exceptionally(xxx -> "timeout exception");
List<Object> collect = Stream.of("1", "2", "3", "4", "5", "6", "7")
.map(x -> CompletableFuture.anyOf(createTaskSupplier(x)
, oneSecondTimeout))
.collect(Collectors.toList())
.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
System.out.println("stop out!");
System.out.println(collect);
}
public static CompletableFuture<String> createTaskSupplier(String x) {
return CompletableFuture.supplyAsync(getStringSupplier(x))
.exceptionally(xx -> "PROCESSING ERROR : " + xx.getMessage());
}
public static Supplier<String> getStringSupplier(String text) {
return () -> {
System.out.println("start " + text);
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (text.equals("1")) {
throw new RuntimeException("LOGIC ERROR");
}
try {
if (text.equals("7"))
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("stop " + text);
return "result " + text;
};
}
public static <T> CompletableFuture<T> failAfter(Duration duration) {
final CompletableFuture<T> promise = new CompletableFuture<>();
scheduler.schedule(() -> {
final TimeoutException ex = new TimeoutException("Timeout after " + duration);
return promise.completeExceptionally(ex);
}, duration.toMillis(), MILLISECONDS);
return promise;
}
它returns :
start
start 1
start 3
start 4
start 2
start 5
start 6
start 7
stop 6
stop 4
stop 3
stop 5
stop 2
stop out!
[PROCESSING ERROR : java.lang.RuntimeException: LOGIC ERROR, result 2, result 3, result 4, result 5, result 6, timeout exception]`
您对此有何看法,您能发现该解决方案的任何缺陷吗?
对于其他不受Java 8限制的人,可以使用completeOnTimeout方法,该方法在Java 9.
中介绍
List<String> collect = Stream.of("1", "2", "3", "4", "5", "6", "7")
.map(x -> CompletableFuture.supplyAsync(getStringSupplier(x))
.completeOnTimeout(null , 1, SECONDS))
.filter(Objects::nonNull)
.collect(toList())
.stream()
.map(CompletableFuture::join)
.collect(toList());
我正在尝试使用 CompletableFuture
和 Stream
同时处理一些数据
到目前为止我有:
public static void main(String[] args) throws InterruptedException, ExecutionException {
System.out.println("start");
List<String> collect = Stream.of("1", "2", "3", "4", "5",
"6", "7")
.map(x -> CompletableFuture.supplyAsync(getStringSupplier(x)))
.collect(Collectors.toList())
.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
System.out.println("stop out!");
}
public static Supplier<String> getStringSupplier(String text) {
return () -> {
System.out.println("start " + text);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("stop " + text);
return "asd" + text;
};
}
并且输出很好:
start
start 1
start 4
start 3
start 2
start 5
start 6
start 7
stop 4
stop 1
stop 5
stop 2
stop 6
stop 3
stop 7
stop out!
但是现在我想为该作业添加超时。假设它应该在 1 秒后取消。 return null 或 collect
列表中的其他值。 (我更喜欢一些指示原因的值)。
我怎样才能做到这一点?
提前感谢您的帮助。
您可以将作业包装在另一个 CompletableFuture 中,如果超过给定时间,它会发出 TimeoutException。如果你想特殊处理,你可以把TimeoutException catch块分开。
List<String> collect = null;
try {
collect = CompletableFuture.supplyAsync(() ->
Stream.of("1", "2", "3", "4", "5",
"6", "7")
.map(x -> CompletableFuture.supplyAsync(getStringSupplier(x)))
.collect(Collectors.toList())
.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
).get(5, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
e.printStackTrace();
//separate out the TimeoutException if you want to handle it differently
}
System.out.println(collect); //would be null in case of any exception
您可以使用执行器参数尝试 CompletableFuture 的重载 supplyAsync 方法 (CompletableFuture.supplyAsync(getStringSupplier(x), timeoutExecutorService)) timeoutExecutorService 可以参考link
我找到了这样做的方法:
private static final ScheduledExecutorService scheduler =
Executors.newScheduledThreadPool(
1,
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("failAfter-%d")
.build());
public static void main(String[] args) throws InterruptedException, ExecutionException {
System.out.println("start");
final CompletableFuture<Object> oneSecondTimeout = failAfter(Duration.ofSeconds(1))
.exceptionally(xxx -> "timeout exception");
List<Object> collect = Stream.of("1", "2", "3", "4", "5", "6", "7")
.map(x -> CompletableFuture.anyOf(createTaskSupplier(x)
, oneSecondTimeout))
.collect(Collectors.toList())
.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
System.out.println("stop out!");
System.out.println(collect);
}
public static CompletableFuture<String> createTaskSupplier(String x) {
return CompletableFuture.supplyAsync(getStringSupplier(x))
.exceptionally(xx -> "PROCESSING ERROR : " + xx.getMessage());
}
public static Supplier<String> getStringSupplier(String text) {
return () -> {
System.out.println("start " + text);
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (text.equals("1")) {
throw new RuntimeException("LOGIC ERROR");
}
try {
if (text.equals("7"))
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("stop " + text);
return "result " + text;
};
}
public static <T> CompletableFuture<T> failAfter(Duration duration) {
final CompletableFuture<T> promise = new CompletableFuture<>();
scheduler.schedule(() -> {
final TimeoutException ex = new TimeoutException("Timeout after " + duration);
return promise.completeExceptionally(ex);
}, duration.toMillis(), MILLISECONDS);
return promise;
}
它returns :
start
start 1
start 3
start 4
start 2
start 5
start 6
start 7
stop 6
stop 4
stop 3
stop 5
stop 2
stop out!
[PROCESSING ERROR : java.lang.RuntimeException: LOGIC ERROR, result 2, result 3, result 4, result 5, result 6, timeout exception]`
您对此有何看法,您能发现该解决方案的任何缺陷吗?
对于其他不受Java 8限制的人,可以使用completeOnTimeout方法,该方法在Java 9.
中介绍List<String> collect = Stream.of("1", "2", "3", "4", "5", "6", "7")
.map(x -> CompletableFuture.supplyAsync(getStringSupplier(x))
.completeOnTimeout(null , 1, SECONDS))
.filter(Objects::nonNull)
.collect(toList())
.stream()
.map(CompletableFuture::join)
.collect(toList());