使用 JavaFX 任务正确执行多线程和线程池

Properly doing multithreading and thread pools with JavaFX Tasks

我有一个选项供用户从 FileChooser 提交多个文件以供某些代码处理。结果将是读取文件的 IO,然后是对存储数据的实际繁重计算。允许用户 select 多个文件,并且由于文件处理不依赖于任何其他文件 selected,这使我的生活更容易处理线程。

此外,用户需要有一个按钮列表,每个按钮对应一个要取消的任务,以及一个 "Cancel All" 按钮。因此,我必须考虑select主动或集体终止一个或所有任务的能力。

最后一个要求是,我不会让用户打开大量文件来阻塞系统。因此,我想出了一个线程数有限的线程池(假设我将任意数量的线程限制在 4 个)。

我不确定如何正确设置这一切。我有我需要做的事情的逻辑,但使用正确的 类 是我卡住的地方。

我已经检查过 this resource,所以如果答案不知何故在那里,那么我看错了这篇文章。

我更愿意使用 Java 库中已有的一个,因为我不是多线程专家,并且担心我可能会做错。由于线程错误似乎是地球上最难调试的东西,我正在 非常 努力确保我尽可能正确地执行此操作。

如果没有办法做到这一点并且我必须自己实施,那么最好的方法是什么?

编辑:我应该注意,我通常是线程的新手,我以前使用过它们并且正在阅读有关它们的书籍,但这将是我第一次主要使用它们,我真的很想做它正确。

JavaFX 有一个 javafx.concurrent API;特别是 Task class fits your use case very nicely. This API is designed to work in conjunction with the java.util.concurrent API. For example, Task is an implementation of FutureTask, so it can be submitted to an Executor。如果你想使用线程池,你可以创建一个Executor为你实现线程池,然后将你的任务提交给它:

final int MAX_THREADS = 4 ;

Executor exec = Executors.newFixedThreadPool(MAX_THREADS);

由于这些线程 运行 在 UI 应用程序的后台,您可能不希望它们阻止应用程序退出。您可以通过创建由执行程序守护程序线程创建的线程来实现此目的:

Executor exec = Executors.newFixedThreadPool(MAX_THREADS, runnable -> {
    Thread t = new Thread(runnable);
    t.setDaemon(true);
    return t ;
});

生成的执行程序将有一个最多 MAX_THREADS 线程的池。如果在没有线程可用时提交任务,它们将在队列中等待,直到有线程可用。

要实现实际的 Task,有几点需要牢记:

不得从后台线程更新UI。由于您的 Task 已提交给上面的执行程序,因此将在后台线程上调用它的 call() 方法。如果确实需要在call方法的执行过程中改变UI,可以将改变UI的代码包装在Platform.runLater(...)中,但是最好结构化事情让你避免这种情况。特别是,Task 有一组 updateXXX(...) 方法,可以更改 FX 应用程序线程上相应 Task 属性的值。您的 UI 元素可以根据需要绑定到这些属性。

建议 call 方法不要访问任何共享数据(通过上述 updateXXX(...) 方法除外)。实例化您的 Task subclass 设置仅 final 变量,让 call() 方法计算一个值,然后 return 该值。

为了取消TaskTaskclass定义了一个内置的cancel()方法。如果你有一个 long-运行 call() 方法,你应该定期检查 isCancelled() 的值,如果它 returns true.

这是一个基本示例:

import java.io.File;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

import javafx.application.Application;
import javafx.beans.property.ReadOnlyObjectWrapper;
import javafx.beans.value.ChangeListener;
import javafx.concurrent.Task;
import javafx.concurrent.Worker;
import javafx.geometry.Insets;
import javafx.geometry.Pos;
import javafx.scene.Scene;
import javafx.scene.control.Button;
import javafx.scene.control.TableCell;
import javafx.scene.control.TableColumn;
import javafx.scene.control.TableView;
import javafx.scene.control.cell.ProgressBarTableCell;
import javafx.scene.layout.BorderPane;
import javafx.scene.layout.HBox;
import javafx.stage.FileChooser;
import javafx.stage.Stage;

public class FileTaskExample extends Application {

    private static final Random RNG = new Random();

    private static final int MAX_THREADS = 4 ;

    private final Executor exec = Executors.newFixedThreadPool(MAX_THREADS, runnable -> {
        Thread t = new Thread(runnable);
        t.setDaemon(true);
        return t ;
    });

    @Override
    public void start(Stage primaryStage) {

        // table to display all tasks:
        TableView<FileProcessingTask> table = new TableView<>();

        TableColumn<FileProcessingTask, File> fileColumn = new TableColumn<>("File");
        fileColumn.setCellValueFactory(cellData -> new ReadOnlyObjectWrapper<File>(cellData.getValue().getFile()));
        fileColumn.setCellFactory(col -> new TableCell<FileProcessingTask, File>() {
            @Override
            public void updateItem(File file, boolean empty) {
                super.updateItem(file, empty);
                if (empty) {
                    setText(null);
                } else {
                    setText(file.getName());
                }
            }
        });
        fileColumn.setPrefWidth(200);

        TableColumn<FileProcessingTask, Worker.State> statusColumn = new TableColumn<>("Status");
        statusColumn.setCellValueFactory(cellData -> cellData.getValue().stateProperty());
        statusColumn.setPrefWidth(100);

        TableColumn<FileProcessingTask, Double> progressColumn = new TableColumn<>("Progress");
        progressColumn.setCellValueFactory(cellData -> cellData.getValue().progressProperty().asObject());
        progressColumn.setCellFactory(ProgressBarTableCell.forTableColumn());
        progressColumn.setPrefWidth(100);

        TableColumn<FileProcessingTask, Long> resultColumn = new TableColumn<>("Result");
        resultColumn.setCellValueFactory(cellData -> cellData.getValue().valueProperty());
        resultColumn.setPrefWidth(100);

        TableColumn<FileProcessingTask, FileProcessingTask> cancelColumn = new TableColumn<>("Cancel");
        cancelColumn.setCellValueFactory(cellData -> new ReadOnlyObjectWrapper<FileProcessingTask>(cellData.getValue()));
        cancelColumn.setCellFactory(col -> {
            TableCell<FileProcessingTask, FileProcessingTask> cell = new TableCell<>();
            Button cancelButton = new Button("Cancel");
            cancelButton.setOnAction(e -> cell.getItem().cancel());

            // listener for disabling button if task is not running:
            ChangeListener<Boolean> disableListener = (obs, wasRunning, isNowRunning) -> 
                cancelButton.setDisable(! isNowRunning);

            cell.itemProperty().addListener((obs, oldTask, newTask) -> {
                if (oldTask != null) {
                    oldTask.runningProperty().removeListener(disableListener);
                }
                if (newTask == null) {
                    cell.setGraphic(null);
                } else {
                    cell.setGraphic(cancelButton);
                    cancelButton.setDisable(! newTask.isRunning());
                    newTask.runningProperty().addListener(disableListener);
                }
            });

            return cell ;
        });
        cancelColumn.setPrefWidth(100);

        table.getColumns().addAll(Arrays.asList(fileColumn, statusColumn, progressColumn, resultColumn, cancelColumn));

        Button cancelAllButton = new Button("Cancel All");
        cancelAllButton.setOnAction(e -> 
            table.getItems().stream().filter(Task::isRunning).forEach(Task::cancel));

        Button newTasksButton = new Button("Process files");
        FileChooser chooser = new FileChooser();
        newTasksButton.setOnAction(e -> {
            List<File> files = chooser.showOpenMultipleDialog(primaryStage);
            if (files != null) {
                files.stream().map(FileProcessingTask::new).peek(exec::execute).forEach(table.getItems()::add);
            }
        });

        HBox controls = new HBox(5, newTasksButton, cancelAllButton);
        controls.setAlignment(Pos.CENTER);
        controls.setPadding(new Insets(10));

        BorderPane root = new BorderPane(table, null, null, controls, null);

        Scene scene = new Scene(root, 800, 600);
        primaryStage.setScene(scene);
        primaryStage.show();
    }

    public static class FileProcessingTask extends Task<Long> {

        private final File file ;

        public FileProcessingTask(File file) {
            this.file = file ;
        }

        public File getFile() {
            return file ;
        }

        @Override
        public Long call() throws Exception {

            // just to show you can return the result of the computation:
            long fileLength = file.length();

            // dummy processing, in real life read file and do something with it:
            int delay = RNG.nextInt(50) + 50 ;
            for (int i = 0 ; i < 100; i++) {
                Thread.sleep(delay);
                updateProgress(i, 100);

                // check for cancellation and bail if cancelled:
                if (isCancelled()) {
                    updateProgress(0, 100);
                    break ;
                }
            }

            return fileLength ;
        }
    }

    public static void main(String[] args) {
        launch(args);
    }
}