使用FutureClass模拟任务T的多线程进程
Simulate the multi-thread process of task T by using Future Class
这里有一个任务 T:
T由许多子任务组成,每个子任务的完成时间不同。
如果其中一个子任务失败,其他的应该立即停止,任务T也会失败。
那么如何模拟任务T的过程呢? (需要快速失败)
也许 Future Class 可以解决它。但是怎么办?
最简单的解决方法是将这些 Runnable 分离到它们自己的线程池中,这样您就可以仅在该池上调用 shutdownNow()
,这会中断该池中的所有任务
下面是一个使用 FutureTask 的例子。
其中一个任务通过等待 1 秒然后取消其他任务来模拟失败。即使他们等待 5 秒,程序也会在 1 秒内完成。
您需要将您的 Runnable 包装在类似这样的东西中,如果实际任务失败可以取消。
为了能够取消任务,它需要处于 Thread.interrupt() 将停止线程的状态。例如,如果它只是处于一个循环中,那么 Future 将被取消(因此 get() 将立即 return),但任务本身将保持 运行.
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
public class FutureTaskExample {
public static void main(String[] args) throws InterruptedException {
List<FutureTask<String>> tasks = new ArrayList<>();
ExecutorService executorService = Executors.newFixedThreadPool(10);
long start = System.currentTimeMillis();
for (int i = 0; i < 10; ++i) {
final int ii = i;
FutureTask<String> task =
new FutureTask<>(new Callable<String>() {
@Override
public String call() throws Exception {
if (ii == 9) {
Thread.sleep(1000);
tasks.subList(0,9).stream().forEach(t -> t.cancel(true));
return "Failure";
} else {
long start = System.currentTimeMillis();
Thread.sleep(5000);
System.out.println("Task " + ii + " slept for " + (System.currentTimeMillis() - start));
return "Completed";
}
}
});
tasks.add(task);
executorService.execute(task);
}
List<String> results = tasks.stream().map(t -> {
try {
return t.get();
} catch (Exception e) {
// ignore
e.printStackTrace();
return "Interrupted";
}
}).collect(Collectors.toList());
System.out.println("Completed in " + (System.currentTimeMillis() - start) + " " + results);
executorService.shutdown();
executorService.awaitTermination(10, TimeUnit.SECONDS);
System.out.println("Done in " + (System.currentTimeMillis() - start));
}
}
我用CompletableFuture给出一个模拟例子
我创建了一个TaskClassTaskT,让它看起来更真实。而TaskClass包含三个state
、runTask()
方法和cancel()
方法。
- 三种状态是
Success
、Cancelling
和Cancelled
。成功表示任务运行 completed.Cancelling表示任务正在取消。 Canceled表示任务已经取消,建议其他人赶紧取消任务。
runTask() method
:我用Thread.sleep(interval)
来模拟运行ing状态。而当任务运行完成后,代码if(cancelled) return Result.CANCELLED;
检测状态是否被取消?
cancel() method
:我用double-check lock来装饰“取消”逻辑,确保它是单实例的。
最后,在主要方法中,我使用 CompletableFuture
Class 启动线程和 return 参数。 thenAccept()
方法很有用,帮助 SUCCESS TASK 建议其他 运行 TASK 取消任务。
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
public class Competable {
// three state after run task
static enum Result{
SUCCESS, FAIL, CANCELLED
}
// task list
static List<TaskT> tasks = new ArrayList<>();
/**
* Task List
*/
public static class TaskT{
private String name;
private int timeInSecond;
private Result ret;
volatile boolean cancelling = false;
volatile boolean cancelled = false;
public TaskT(String name, int timeInSecond, Result ret){
this.name = name;
this.timeInSecond = timeInSecond * 1000;
this.ret = ret;
}
/**
* Simulate task runing
* runing time in real work is uncertain
* maybe run in computing,maybe run in IO
*/
public Result runTask(){
int interval = 100;
int total = 0;
try {
for(;;){
Thread.sleep(interval);
total += interval;
if(total>=timeInSecond) break;
if(cancelled) return Result.CANCELLED;
}
}catch (InterruptedException e){
e.printStackTrace();
}
System.out.println(name + "Task End!!!");
return ret;
}
/**
* Simlulate task cancel
* and set cancel time
*/
public void cancel() {
if (!cancelled) {
synchronized (this) {
if (cancelled) return;
cancelling = true;
System.out.println(name + "cancelling!!!");
try {
TimeUnit.MILLISECONDS.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(name + "cancelled!!!");
}
cancelled = true;
}
}
}
/**
* rollback: advice the other thread cancel
*/
public static void callback(Result result, TaskT task){
if(Result.FAIL == result){
for(TaskT _task : tasks){
if(_task!=task){
_task.cancel();
}
}
}
}
public static void main(String[] args) throws IOException {
TaskT subtask1 = new TaskT("task1", 3, Result.SUCCESS);
TaskT subtask2 = new TaskT("task2", 4, Result.SUCCESS);
TaskT subtask3 = new TaskT("task3", 1, Result.FAIL);
tasks.add(subtask1);
tasks.add(subtask2);
tasks.add(subtask3);
for(TaskT task:tasks){
CompletableFuture f = CompletableFuture.supplyAsync(()->task.runTask())
.thenAccept((result -> callback(result, task)));
}
// System.in.read();
}
}
这里有一个任务 T:
T由许多子任务组成,每个子任务的完成时间不同。
如果其中一个子任务失败,其他的应该立即停止,任务T也会失败。
那么如何模拟任务T的过程呢? (需要快速失败)
也许 Future Class 可以解决它。但是怎么办?
最简单的解决方法是将这些 Runnable 分离到它们自己的线程池中,这样您就可以仅在该池上调用 shutdownNow()
,这会中断该池中的所有任务
下面是一个使用 FutureTask 的例子。
其中一个任务通过等待 1 秒然后取消其他任务来模拟失败。即使他们等待 5 秒,程序也会在 1 秒内完成。
您需要将您的 Runnable 包装在类似这样的东西中,如果实际任务失败可以取消。
为了能够取消任务,它需要处于 Thread.interrupt() 将停止线程的状态。例如,如果它只是处于一个循环中,那么 Future 将被取消(因此 get() 将立即 return),但任务本身将保持 运行.
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
public class FutureTaskExample {
public static void main(String[] args) throws InterruptedException {
List<FutureTask<String>> tasks = new ArrayList<>();
ExecutorService executorService = Executors.newFixedThreadPool(10);
long start = System.currentTimeMillis();
for (int i = 0; i < 10; ++i) {
final int ii = i;
FutureTask<String> task =
new FutureTask<>(new Callable<String>() {
@Override
public String call() throws Exception {
if (ii == 9) {
Thread.sleep(1000);
tasks.subList(0,9).stream().forEach(t -> t.cancel(true));
return "Failure";
} else {
long start = System.currentTimeMillis();
Thread.sleep(5000);
System.out.println("Task " + ii + " slept for " + (System.currentTimeMillis() - start));
return "Completed";
}
}
});
tasks.add(task);
executorService.execute(task);
}
List<String> results = tasks.stream().map(t -> {
try {
return t.get();
} catch (Exception e) {
// ignore
e.printStackTrace();
return "Interrupted";
}
}).collect(Collectors.toList());
System.out.println("Completed in " + (System.currentTimeMillis() - start) + " " + results);
executorService.shutdown();
executorService.awaitTermination(10, TimeUnit.SECONDS);
System.out.println("Done in " + (System.currentTimeMillis() - start));
}
}
我用CompletableFuture给出一个模拟例子
我创建了一个TaskClassTaskT,让它看起来更真实。而TaskClass包含三个state
、runTask()
方法和cancel()
方法。
- 三种状态是
Success
、Cancelling
和Cancelled
。成功表示任务运行 completed.Cancelling表示任务正在取消。 Canceled表示任务已经取消,建议其他人赶紧取消任务。 runTask() method
:我用Thread.sleep(interval)
来模拟运行ing状态。而当任务运行完成后,代码if(cancelled) return Result.CANCELLED;
检测状态是否被取消?cancel() method
:我用double-check lock来装饰“取消”逻辑,确保它是单实例的。
最后,在主要方法中,我使用 CompletableFuture
Class 启动线程和 return 参数。 thenAccept()
方法很有用,帮助 SUCCESS TASK 建议其他 运行 TASK 取消任务。
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
public class Competable {
// three state after run task
static enum Result{
SUCCESS, FAIL, CANCELLED
}
// task list
static List<TaskT> tasks = new ArrayList<>();
/**
* Task List
*/
public static class TaskT{
private String name;
private int timeInSecond;
private Result ret;
volatile boolean cancelling = false;
volatile boolean cancelled = false;
public TaskT(String name, int timeInSecond, Result ret){
this.name = name;
this.timeInSecond = timeInSecond * 1000;
this.ret = ret;
}
/**
* Simulate task runing
* runing time in real work is uncertain
* maybe run in computing,maybe run in IO
*/
public Result runTask(){
int interval = 100;
int total = 0;
try {
for(;;){
Thread.sleep(interval);
total += interval;
if(total>=timeInSecond) break;
if(cancelled) return Result.CANCELLED;
}
}catch (InterruptedException e){
e.printStackTrace();
}
System.out.println(name + "Task End!!!");
return ret;
}
/**
* Simlulate task cancel
* and set cancel time
*/
public void cancel() {
if (!cancelled) {
synchronized (this) {
if (cancelled) return;
cancelling = true;
System.out.println(name + "cancelling!!!");
try {
TimeUnit.MILLISECONDS.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(name + "cancelled!!!");
}
cancelled = true;
}
}
}
/**
* rollback: advice the other thread cancel
*/
public static void callback(Result result, TaskT task){
if(Result.FAIL == result){
for(TaskT _task : tasks){
if(_task!=task){
_task.cancel();
}
}
}
}
public static void main(String[] args) throws IOException {
TaskT subtask1 = new TaskT("task1", 3, Result.SUCCESS);
TaskT subtask2 = new TaskT("task2", 4, Result.SUCCESS);
TaskT subtask3 = new TaskT("task3", 1, Result.FAIL);
tasks.add(subtask1);
tasks.add(subtask2);
tasks.add(subtask3);
for(TaskT task:tasks){
CompletableFuture f = CompletableFuture.supplyAsync(()->task.runTask())
.thenAccept((result -> callback(result, task)));
}
// System.in.read();
}
}