使用 RxJava 处理可变对象流

Using RxJava to process varying object stream

我正在尝试处理对象流(通过 http JSon 请求)。

Observble returns 项如下:

"2015-05-06T13:24:20Z", Foo, Foo, 1, 2, 3, Foo, Foo

第一项是时间戳,然后是要存储在数据库中的 Foo 对象,然后是代表需要从数据库中删除的 Foo 对象的 ID,最后是需要更新的 Foo 对象(我将为他们做一个更新)。

我当前的实现如下所示:

public void updateFoos(final CallBack callBack) {

    final String lastFooUpdateTimestamp = localStorage.getLastFooUpdateTimestamp();

    fooService.getFoos(lastFooUpdateTimestamp)
            .subscribe(new Subscriber<Object>() {
                @Override
                public void onCompleted() {
                    callBack.onSuccess();
                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onNext(Object o) {
                    if (o instanceof String) {
                        localStorage.setLastFooUpdateTimestamp((String) o);
                    }

                    if (o instanceof Foo) {
                        databaseManager.save((Foo) o);
                    }
                }
            });
}

有很多问题:

  1. instanceof检查不是很RxJava,有没有更好的方法?
  2. 时间戳始终是第一个字段,无论如何要表达清楚?
  3. 我也想批处理数据库插入,所以有一个单独的块来处理 Foo 对象,同时批处理它们会很好。
  4. 有没有更好的设计,我可以按类型发出多个 Observable?但是我该如何订阅多个观察者呢?

这是一个如何使用 RxJava 完成的示例:

public class MultikindSource {
    enum ValueType {
        TIMESTAMP,
        NUMBER,
        FOO
    }
    static final class Foo { }
    static Observable<Object> source(String timestamp) {
        return Observable.from(Arrays.asList(timestamp, new Foo(), new Foo(),
            1, 2, 3, new Foo()));
    }
    public static void main(String[] args) {
        Func1<Object, ValueType> keySelector = o -> {
            if (o instanceof String) {
                return ValueType.TIMESTAMP;
            } else
            if (o instanceof Foo) {
                return ValueType.FOO;
            }
            return ValueType.NUMBER;
        };
        AtomicReference<String> lastTimestamp = new AtomicReference<>(
            "2015-05-08T11:38:00.000Z");
        source(lastTimestamp.get())
        .groupBy(keySelector)
        .flatMap(g -> {
            if (g.getKey() == ValueType.TIMESTAMP) {
                g.subscribe(v -> {
                    System.out.println("Updating timestamp to " + v);
                    lastTimestamp.set((String)v);
                });
            } else
            if (g.getKey() == ValueType.FOO) {
                g.buffer(2).subscribe(v -> 
                    System.out.println("Batch inserting " + v));
            } else {
                g.subscribe(v -> 
                    System.out.println("Got some number: " + v));
            }
            return Observable.just(1);
        }).count().subscribe(v -> 
            System.out.println("Got " + v + " kinds of events."));
    }
}

本质上,您通过一些枚举对源数据进行分组,然后链接到这些组并订阅它们以执行工作。