如何在 RxJava 中创建一个以固定时间间隔运行的作业?

How to create a job that runs at a fixed interval in RxJava?

我正在尝试创建一个任务,该任务将定期查询我的数据库并以其他状态写入所有结果,我想使用 RxJava 执行此操作。

我正在使用 RxJava-JDBC 查询我的数据库。代码如下所示:

    final Database db = Database.from(url);

    db
        .select("SELECT f1,f2 FROM mydata")
        .autoMap(MyDatum.class)
        .subscribe(
            new Action1<MyDatum>() {
                @Override
                public void call(MyDatum t) {
                    state.add(t);
                }
            },
            new Action1<Throwable>() {
                @Override
                public void call(Throwable t) {
                    L.error("Task failed", t);
                }
            },
            new Action0() {
                @Override
                public void call() {
                    state.makeAvailable();
                }
            }
        );

问题是,当我订阅时,它会工作一次,然后就停止了。所以我使用了 Observable.interval 并且有这个工作:

    Observable
        .interval(10, TimeUnit.SECONDS)
        .forEach(
            new Action1<Long>() {
                @Override
                public void call(Long arg) {
                    db
                        .select("SELECT f1,f2 FROM mydata")
                        .autoMap(MyDatum.class)
                        .subscribe(
                            new Action1<MyDatum>() {
                                @Override
                                public void call(MyDatum t) {
                                    state.add(t);
                                }
                            },
                            new Action1<Throwable>() {
                                @Override
                                public void call(Throwable t) {
                                    L.error("Task failed", t);
                                }
                            },
                            new Action0() {
                                @Override
                                public void call() {
                                    state.makeAvailable();
                                }
                            }
                        );
                }
            }
        );

但我想知道我是否在将一个流嵌套在另一个流中做错了什么。 我考虑过使用 flatMaponComplete 永远不会执行,因为 interval 永远不会调用 onComplete.

我希望它能够进化并被使用,不仅可以由间隔触发,也可以由传入事件触发。

我是不是漏掉了什么? 谢谢

flatMapmerge 是您要使用的运算符。首先,您应该避免在操作符主体中订阅可观察对象。而是使用 flatMap 和 return observable。这将为您订阅所有发出的可观察量。

为了手动触发查询,您可以合并 PublishSubject (documentation),您可以调用 onNext 来推送事件并手动触发查询。将您的代码更改为类似这样的内容。

PublishSubject<Long> subject = PublishSubject.create();
Observable.merge(subject, Observable.timer(0, 1, TimeUnit.SECONDS))
    .flatMap(new Func1<Long, Observable<MyDatum>>() {
        @Override
        public Observable<MyDatum> call(Long arg) {
            return db
                .select("SELECT f1,f2 FROM mydata")
                .autoMap(MyDatum.class);
    }}).subscribe(
        new Action1<MyDatum>() {
            @Override
            public void call(MyDatum t) {
                state.add(t);
            }
        },
        new Action1<Throwable>() {
            @Override
            public void call(Throwable t) {
                L.error("Task failed", t);
            }
        },
        new Action0() {
            @Override
            public void call() {
                state.makeAvailable();
            }
        }
    );
// you can call onNext with any value to trigger a manual query
subject.onNext(999L);

这是一个演示此行为的简单 RxJava 片段。

CountDownLatch l = new CountDownLatch(5);
PublishSubject<Long> subject = PublishSubject.create();
Observable.merge(subject, Observable.timer(0, 1, TimeUnit.SECONDS).take(3))
        .flatMap((Long arg) -> {
            System.out.println("tick: " + arg);
            l.countDown();
            return Observable.just(arg+10);
        })
        .forEach(System.out::println);
l.await(1, TimeUnit.SECONDS);
subject.onNext(999L);
l.await();

输出

tick: 0
10
tick: 999
1009
tick: 1
11
tick: 2
12

doOnNextdoOnCompleted 运算符对您的情况非常有用。下面是一个示例,您可以如何使用这些运算符实现所描述的行为:

final Observable<MyDatum> observable = Observable.interval(10, TimeUnit.SECONDS).flatMap(new Func1<Long, Observable<MyDatum>>() {
    @Override
    public Observable<MyDatum> call(final Long counter) {
        return db.select("SELECT f1,f2 FROM mydata")
                .autoMap(MyDatum.class)
                .doOnNext(new Action1<MyDatum>() {
                    @Override
                    public void call(final MyDatum value) {
                        state.add(value);
                    }
                })
                .doOnCompleted(new Action0() {
                    @Override
                    public void call() {
                        state.makeAvailable();
                    }
                });
    }
});

final Subscription subscription = observable.subscribe();