使用 RxJava 处理可变对象流
Using RxJava to process varying object stream
我正在尝试处理对象流(通过 http JSon 请求)。
Observble returns 项如下:
"2015-05-06T13:24:20Z", Foo, Foo, 1, 2, 3, Foo, Foo
第一项是时间戳,然后是要存储在数据库中的 Foo 对象,然后是代表需要从数据库中删除的 Foo 对象的 ID,最后是需要更新的 Foo 对象(我将为他们做一个更新)。
我当前的实现如下所示:
public void updateFoos(final CallBack callBack) {
final String lastFooUpdateTimestamp = localStorage.getLastFooUpdateTimestamp();
fooService.getFoos(lastFooUpdateTimestamp)
.subscribe(new Subscriber<Object>() {
@Override
public void onCompleted() {
callBack.onSuccess();
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Object o) {
if (o instanceof String) {
localStorage.setLastFooUpdateTimestamp((String) o);
}
if (o instanceof Foo) {
databaseManager.save((Foo) o);
}
}
});
}
有很多问题:
- instanceof检查不是很RxJava,有没有更好的方法?
- 时间戳始终是第一个字段,无论如何要表达清楚?
- 我也想批处理数据库插入,所以有一个单独的块来处理 Foo 对象,同时批处理它们会很好。
- 有没有更好的设计,我可以按类型发出多个 Observable?但是我该如何订阅多个观察者呢?
这是一个如何使用 RxJava 完成的示例:
public class MultikindSource {
enum ValueType {
TIMESTAMP,
NUMBER,
FOO
}
static final class Foo { }
static Observable<Object> source(String timestamp) {
return Observable.from(Arrays.asList(timestamp, new Foo(), new Foo(),
1, 2, 3, new Foo()));
}
public static void main(String[] args) {
Func1<Object, ValueType> keySelector = o -> {
if (o instanceof String) {
return ValueType.TIMESTAMP;
} else
if (o instanceof Foo) {
return ValueType.FOO;
}
return ValueType.NUMBER;
};
AtomicReference<String> lastTimestamp = new AtomicReference<>(
"2015-05-08T11:38:00.000Z");
source(lastTimestamp.get())
.groupBy(keySelector)
.flatMap(g -> {
if (g.getKey() == ValueType.TIMESTAMP) {
g.subscribe(v -> {
System.out.println("Updating timestamp to " + v);
lastTimestamp.set((String)v);
});
} else
if (g.getKey() == ValueType.FOO) {
g.buffer(2).subscribe(v ->
System.out.println("Batch inserting " + v));
} else {
g.subscribe(v ->
System.out.println("Got some number: " + v));
}
return Observable.just(1);
}).count().subscribe(v ->
System.out.println("Got " + v + " kinds of events."));
}
}
本质上,您通过一些枚举对源数据进行分组,然后链接到这些组并订阅它们以执行工作。
我正在尝试处理对象流(通过 http JSon 请求)。
Observble returns 项如下:
"2015-05-06T13:24:20Z", Foo, Foo, 1, 2, 3, Foo, Foo
第一项是时间戳,然后是要存储在数据库中的 Foo 对象,然后是代表需要从数据库中删除的 Foo 对象的 ID,最后是需要更新的 Foo 对象(我将为他们做一个更新)。
我当前的实现如下所示:
public void updateFoos(final CallBack callBack) {
final String lastFooUpdateTimestamp = localStorage.getLastFooUpdateTimestamp();
fooService.getFoos(lastFooUpdateTimestamp)
.subscribe(new Subscriber<Object>() {
@Override
public void onCompleted() {
callBack.onSuccess();
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Object o) {
if (o instanceof String) {
localStorage.setLastFooUpdateTimestamp((String) o);
}
if (o instanceof Foo) {
databaseManager.save((Foo) o);
}
}
});
}
有很多问题:
- instanceof检查不是很RxJava,有没有更好的方法?
- 时间戳始终是第一个字段,无论如何要表达清楚?
- 我也想批处理数据库插入,所以有一个单独的块来处理 Foo 对象,同时批处理它们会很好。
- 有没有更好的设计,我可以按类型发出多个 Observable?但是我该如何订阅多个观察者呢?
这是一个如何使用 RxJava 完成的示例:
public class MultikindSource {
enum ValueType {
TIMESTAMP,
NUMBER,
FOO
}
static final class Foo { }
static Observable<Object> source(String timestamp) {
return Observable.from(Arrays.asList(timestamp, new Foo(), new Foo(),
1, 2, 3, new Foo()));
}
public static void main(String[] args) {
Func1<Object, ValueType> keySelector = o -> {
if (o instanceof String) {
return ValueType.TIMESTAMP;
} else
if (o instanceof Foo) {
return ValueType.FOO;
}
return ValueType.NUMBER;
};
AtomicReference<String> lastTimestamp = new AtomicReference<>(
"2015-05-08T11:38:00.000Z");
source(lastTimestamp.get())
.groupBy(keySelector)
.flatMap(g -> {
if (g.getKey() == ValueType.TIMESTAMP) {
g.subscribe(v -> {
System.out.println("Updating timestamp to " + v);
lastTimestamp.set((String)v);
});
} else
if (g.getKey() == ValueType.FOO) {
g.buffer(2).subscribe(v ->
System.out.println("Batch inserting " + v));
} else {
g.subscribe(v ->
System.out.println("Got some number: " + v));
}
return Observable.just(1);
}).count().subscribe(v ->
System.out.println("Got " + v + " kinds of events."));
}
}
本质上,您通过一些枚举对源数据进行分组,然后链接到这些组并订阅它们以执行工作。