RxJava + JavaFX 属性

RxJava + JavaFX Property

我有一个名为 rxToProperty() 的方法,可以将 Observable 转换为 JavaFX Property

 public static <T> ReadOnlyObjectProperty<T> rxToProperty(Observable<T> obs) { 
        ReadOnlyObjectWrapper<T> property = new ReadOnlyObjectWrapper<>();

        obs.onBackpressureLatest().serialize().subscribe(v -> { 
            synchronized(property) { 
                property.set(v);
            }
        });

        return property.getReadOnlyProperty();
    }

如何确保返回的 Property 始终是最新的,尤其是当它绑定到 JavaFX 中的控件时?我注意到一些非常奇怪的随机行为似乎表明线程安全已受到调度程序和 JavaFX 的损害。

例如,我尝试使用这种方法来显示一个来源 Observable 以几种不同的方式安排,当在 TableView 中使用时,结果是随机的和随意的,单元格交替在有值和没有值之间,以及有时永远不会结束的线程 activity 之间。

每当我尝试使用 Platform.invokeLater() 在 FX 线程上进行调度时,它只会使行为更加疯狂。我的 rxToProperty() 方法有什么问题?

public class ReactiveTableViewTest extends Application {

    @Override
    public void start(Stage stage) throws Exception {

        Group root = new Group();
        Scene scene = new Scene(root);

        root.getChildren().add(new ReactiveTable());

        stage.setScene(scene);
        stage.sizeToScene();
        stage.show();
    }

    private static final class ReactiveRecord {

        private final Observable<Number> obs = Observable.just(10,20,30,40,50,60).cast(Number.class);

        public Observable<Number> subscribeImmediate() { 
            return obs;
        }
        public Observable<Number> subscribeComputation() { 
            return obs.subscribeOn(Schedulers.computation());
        }
        public Observable<Number> subscribeNewthread() { 
            return obs.subscribeOn(Schedulers.newThread());
        }
        public Observable<Number> subscribeIo() { 
            return obs.subscribeOn(Schedulers.io());
        }
        public Observable<Number> subscribeParallel() {
            return obs.flatMap(i -> Observable.just(i).subscribeOn(Schedulers.computation()));
        }
        public Observable<Number> subscribeTrampoline() { 
            return obs.subscribeOn(Schedulers.trampoline());
        }

        public ImmutableList<Observable<Number>> getAll() { 
            return ImmutableList.of(subscribeImmediate(),subscribeComputation(),subscribeNewthread(),subscribeIo(),subscribeParallel(),subscribeTrampoline());
        }
        public ImmutableList<String> getHeaders() {
            return ImmutableList.of("IMMEDIATE","COMPUTATION","NEW","IO","PARALLEL","TRAMPOLINE");
        }
    }

    private static final class ReactiveTable extends TableView<ReactiveRecord> {


        private ReactiveTable() { 
            ReactiveRecord record = new ReactiveRecord();

            this.getItems().add(record);

            ImmutableList<Observable<Number>> observables = record.getAll();
            ImmutableList<String> headers = record.getHeaders();

            for (int i = 0; i < observables.size(); i++) { 
                 TableColumn<ReactiveRecord,Number> col = new TableColumn<>(headers.get(i));
                 final int index = i;
                 col.setCellValueFactory(cb -> rxToProperty(observables.get(index)));
                 this.getColumns().add(col);
            }
        }
    }
    public static <T> ReadOnlyObjectProperty<T> rxToProperty(Observable<T> obs) { 
        ReadOnlyObjectWrapper<T> property = new ReadOnlyObjectWrapper<>();

        obs.onBackpressureLatest().serialize().subscribe(v -> { 
            synchronized(property) { 
                System.out.println("Emitting val " + v + " on " + Thread.currentThread().getName());
                property.set(v);
            }
        });

        return property.getReadOnlyProperty();
    }
    public static void main(String[] args) { 
        launch(args);
    }
}

Tomas Mikula 在 ReactFX GitHub 项目中对这种行为以及解决方案给出了一些非常有用的见解。

https://github.com/TomasMikula/ReactFX/issues/22