如何从 JavaFX 中的二级线程获取更新 message/property?

How to get update message/property from second level threads in JavaFX?

在 JavaFX 应用程序中,我有一个主线程,它通过套接字侦听多个客户端,并为每个客户端连接创建单独的线程以与客户端通信。我想在 JavaFX 场景中显示来自所有客户端的消息(通过 属性 绑定或其他方式)。我按照 JavaFX 中的并发教程 https://docs.oracle.com/javase/8/javafx/interoperability-tutorial/concurrency.htm 中的说明尝试了任务和服务。我能够通过 updateMessage 从主线程获取消息,但无法弄清楚如何从为客户端通信创建的二级线程获取消息。下面给出了我尝试的示例代码。

public class SocketService
    {
        int portNumber;

        public SocketService(int portNumber)
        {
            this.portNumber = portNumber;

            Task task = new Task<Void> ()
            {
                @Override
                public Void call()
                {
                    try (ServerSocket serverSocket = new ServerSocket(portNumber))
                    {
                        while (true)
                        {
                            ClientService cs = new ClientService(serverSocket.accept());
                            cs.start();
                            updateMessage("Connected");
                        }
                    } catch (IOException e)
                    {
                        System.out.println("Exception!!! "+e.getMessage());
                    }
                    return null;
                }
            };
           new Thread(task).start();
        }
    }

    class ClientService extends Service<Void>
    {
        Socket connectSocket;
        public ClientService(Socket connectSocket)
        {
            this.connectSocket = connectSocket;
        }

        @Override
        public Task<Void> createTask()
        {
            return new Task<Void>()
            {
                @Override public Void call()
                {
                    try (PrintWriter out = new PrintWriter(connectSocket.getOutputStream(), true);
                         BufferedReader in = new BufferedReader(new InputStreamReader(connectSocket.getInputStream()));)
                    {
                        String inText;

                        while ((inText = in.readLine()) != null)
                        {
                            System.out.println(inText);
                            updateMessage(inText); // want to get this message updated in JavaFX
                        }
                    } catch (IOException e)
                    {
                        System.out.println("Exception!!! "+e.getMessage());
                    }
                    return null;
                }
            };
        }
    }

在此先感谢您的帮助。

您可以观察您创建的服务的消息 属性。 属性 的更改通知在 FX 应用程序线程上执行(请注意,顺便说一下,您有一个错误:您只能从 FX 应用程序线程调用 Service.start();已在此处修复):

while (true) {
    ClientService cs = new ClientService(serverSocket.accept());
    cs.messageProperty().addListener((obs, oldMessage, newMessage) -> {
        // update UI with newMessage...
    }
    Platform.runLater(cs::start);
    updateMessage("Connected");
}

对于更复杂的方法,您可能不想将 UI 的详细信息公开给 SocketService class。所以你可能会考虑像

这样的东西
public class SocketService {

    private final ObservableMap<ClientService, String> messageLookup 
        = FXCollections.observableMap(new HashMap<>()) ;

    private final ObservableList<String> messages = FXCollections.observableArrayList();

    public ObservableList<String> getMessages() { return messages ; }

    int portNumber ;

    public SocketService(int portNumber) {

        this.portNumber = portNumber ;

        messageLookup.addListener((Change<? extends ClientService, ? extends String> change) -> {
            if (change.wasAdded()) {
                messages.add(change.getValueAdded());
            }
            if (change.wasRemoved()) {
                messages.remove(change.getValueRemoved());
            }
        });

        Task<Void> task = new Task<Void>() {
            @Override
            public void call() {
                try (ServerSocket serverSocket = new ServerSocket(portNumber)) {
                    while(true) {
                        ClientService cs = new ClientService(serverSocket.accept());
                        cs.messageProperty().addListener((obs, oldMessage, newMessage) -> 
                            messageLookup.put(cs, newMessage));
                        Platform.runLater(cs::start);
                        updateMessage("Connected");
                    }
                } catch (...) { ... }
                return null ;
            }
        }
    }
}

现在,在您的 UI 中,您可以执行以下操作

ListView<String> messages = new ListView<>();
SocketService socketService = ... ;
messages.setItems(socketService.getMessages());

你如何使用它显然取决于你在做什么以及你想如何显示客户端的状态,但这应该给你一个想法。

这是一个完整的、可运行的示例:

import java.util.HashMap;
import java.util.Random;

import javafx.application.Application;
import javafx.application.Platform;
import javafx.collections.FXCollections;
import javafx.collections.MapChangeListener.Change;
import javafx.collections.ObservableList;
import javafx.collections.ObservableMap;
import javafx.collections.transformation.SortedList;
import javafx.concurrent.Service;
import javafx.concurrent.Task;
import javafx.geometry.Pos;
import javafx.scene.Scene;
import javafx.scene.control.Label;
import javafx.scene.control.ListView;
import javafx.scene.layout.VBox;
import javafx.stage.Stage;

public class TaskMessageUpdateExample extends Application {



    @Override
    public void start(Stage primaryStage) {

        ObservableMap<ClientService, String> messageLookup = FXCollections.observableMap(new HashMap<>());
        ObservableList<String> messages = FXCollections.observableArrayList();

        messageLookup.addListener((Change<? extends ClientService, ? extends String> change) -> {
            if (change.wasAdded()) {
                messages.add(change.getValueAdded());
            }
            if (change.wasRemoved()) {
                messages.remove(change.getValueRemoved());
            }
        });

        Task<Void> serverTask = new Task<Void>() {

            private Random rng = new Random();

            @Override
            public Void call() throws Exception {
                int nClients = rng.nextInt(11);
                for (int i = 1 ; i <= nClients; i++) {
                    Thread.sleep(rng.nextInt(2000)+500);
                    ClientService cs = new ClientService();
                    String clientID = "Client "+i ;
                    updateMessage("Connected "+clientID);
                    cs.messageProperty().addListener((obs, oldMessage, newMessage) -> {
                        messageLookup.put(cs, clientID +": " +newMessage);
                    });
                    Platform.runLater(cs::start);
                }
                return null ;
            }
        };

        Thread t = new Thread(serverTask);
        t.setDaemon(true);
        t.start();

        Label statusLabel =  new Label();
        statusLabel.textProperty().bind(serverTask.messageProperty());

        ListView<String> listView = new ListView<>();
        listView.setItems(new SortedList<String>(messages, String::compareTo));

        VBox root = new VBox(5, listView, statusLabel);
        root.setAlignment(Pos.TOP_CENTER);
        Scene scene = new Scene(root, 250, 600);
        primaryStage.setScene(scene);
        primaryStage.show();
    }

    private static class ClientService extends Service<Void> {

        @Override
        protected Task<Void> createTask() {
            Task<Void> task =  new Task<Void>() {
                @Override
                public Void call() throws InterruptedException {
                   Random rng = new Random();
                   for (int i = 1 ; i <= 10 ; i++) {
                       Thread.sleep((rng.nextInt(1000)+500));
                       updateMessage("Step "+i);
                   }
                   return null ;
                }
            };
            return task ;
        }

    }

    public static void main(String[] args) {
        launch(args);
    }
}