来自链接列表的自定义 Observable REST 结果流
Custom Observable stream of REST results from a list of links
我正在使用一个实现自己的 REST 请求的库。它需要一个 'permalink' 并发出一个 REST 请求来获取内容(我在源代码中提供了用于测试的模拟)。我有一个永久链接列表,想为每个链接安排一个请求并生成内容结果流。所有这些都应该是异步的,一旦完成所有结果,我想发出它们的列表。
我正在尝试使用 RxJava 来实现这一点。这是我现在拥有的(抱歉没有使用 lambda,我只是想习惯 RxJava class 名称):
public class Main {
public static void main(String[] args) {
int count = 10;
List<String> permalinks = new ArrayList<>(count);
for (int i = 1; i <= count; ++i) {
permalinks.add("permalink_" + i);
}
ContentManager cm = new ContentManager();
Observable
.create(new Observable.OnSubscribe<Content>() {
int gotCount = 0;
@Override
public void call(Subscriber<? super Content> subscriber) {
for (String permalink : permalinks) { // 1. is iterating here the correct way?
if (!subscriber.isUnsubscribed()) { // 2. how often and where should I check isUnsubscribed?
cm.getBasicContentByPermalink(permalink, new RestCallback() {
@Override
public void onSuccess(Content content) {
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(content);
completeIfFinished(); // 3. if guarded by isUnsubscribed, onComplete might never be called
}
}
@Override
public void onFailure(int code, String message) {
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(null); // 4. is this OK or is there some other way to mark a failure?
completeIfFinished();
}
}
private void completeIfFinished() {
++gotCount;
if (gotCount == permalinks.size()) { // 5. how to know that the last request is done? am I supposed to implement such custom logic?
subscriber.onCompleted();
}
}
});
}
}
}
})
.toList()
.subscribe(new Action1<List<Content>>() {
@Override
public void call(List<Content> contents) {
System.out.println("list count: " + contents.size());
System.out.println("results: ");
contents.stream().map(content -> content != null ? content.basic : null).forEach(System.out::println);
}
});
System.out.println("finishing main");
}
}
// library mocks
class Content {
String basic;
String extended;
public Content(String basic, String extended) {
this.basic = basic;
this.extended = extended;
}
}
interface RestCallback {
void onSuccess(Content content);
void onFailure(int code, String message);
}
class ContentManager {
private final Random random = new Random();
public void getBasicContentByPermalink(String permalink, RestCallback callback) {
// just to simulate network latency and unordered results
new Thread() {
@Override
public void run() {
try {
Thread.sleep(random.nextInt(1000) + 200);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (random.nextInt(100) < 95) {
// 95% of the time we succeed
callback.onSuccess(new Content(permalink + "_basic", null));
} else {
callback.onFailure(-1, permalink + "_basic_failure");
}
}
}.start();
}
}
这有点工作,但我不确定我是否以正确的方式做事。请查看第 1-5 行:
- 从列表创建 Observable 时,我是否应该迭代
我自己在清单上,还是有其他更好的方法?为了
例如,有 Observable.from(Iterable),但我不认为我
可以用吗?
- 我在发送 REST 请求之前正在检查 isUnsubscribed,并且
也在两个结果处理程序中 (success/failure)。是这样吗
应该使用?
- 我实现了一些逻辑来在所有请求都完成时调用 onComplete
返回,无论成功还是失败(见问题 5)。但是,就像他们一样
由 isUnsubscribed 调用守卫,它们可能会发生
onComplete 永远不会被调用。流怎么知道它
应该完成?当订阅者取消订阅并且我不必考虑它时,它会过早完成吗?有什么注意事项吗?例如,如果订阅者在发送内容结果的过程中取消订阅会发生什么?在我的测试中,从未发出所有结果的列表,但我希望有一个包含到那时为止的所有结果的列表。
- 在 onFailure 方法中,我用 onNext(null) 发出 null 来标记一个
失败。我这样做是因为最后我会得到 2 个流的 zip,
并且仅当两者都压缩时才会发出自定义 class 的实例
值不为空。这是正确的做法吗?
- 正如我提到的,我有一些自定义逻辑来检查流是否是
完成的。我在这里做的是计算 REST 结果以及何时
许多已处理,因为列表中有永久链接,已完成。
这是必要的吗?这是正确的方法吗?
When creating an Observable from a list, am I supposed to iterate over the list myself, or is there some other, better way? For instance, there is Observable.from(Iterable), but I don't think I can use it?
您可以根据 link.
构建一个可观察对象,而不是在 create
方法中进行迭代
public Observable<Content> getContent(permalink) {
return Observable.create(subscriber -> {
ContentManager cm = new ContentManager();
if (!subscriber.isUnsubscribed()) {
cm.getBasicContentByPermalink(permalink, new RestCallback() {
@Override
public void onSuccess(Content content) {
subscriber.onNext(content);
subscriber.onCompleted();
}
@Override
public void onFailure(int code, String message) {
subscriber.onError(OnErrorThrowable.addValueAsLastCause(new RuntimeException(message, permalink));
}
}
});
}
然后将其与 flatMap
运算符一起使用
Observable.from(permalinks)
.flatMap(link -> getContent(link))
.subscribe();
I am checking isUnsubscribed before I send the REST request, and also in both result handlers (success/failure). Is this the way it is supposed to be used?
在发送请求之前检查 isUnsubscribed 是正确的做法。不确定这个 success/failure 回调(我认为它没用,但有人会说我错了)
I implement some logic to call onComplete when all requests have returned, no matter if successful or failed (see question 5). But, as they are guarded by the isUnsubscribed call, it may happen that they onComplete is never called.
如果您退订直播,您将停止观看直播,因此您可能不会收到直播结束的通知。
For example, what would happen if the subscriber unsubscribed in the middle of emitting the Content results?
您将能够使用取消订阅前发出的内容。
In the onFailure method, I emit null with onNext(null) to mark a failure. I do it because in the end I will have a zip of 2 streams, and will emit an instance of a custom class only when both zipped values are non null. Is this the correct way to do it?
不。而不是 null,发出错误(在订阅者上使用 onError
)。
使用它,您不必在 zip lambda 中检查 null。压缩!
As I mentioned, I have some custom logic to check if the stream is finished. What I do here is I count the REST results and when as many were processed as there are permalinks in the list, it is done. Is this necessary? Is this the right way?
如果要获取perma的个数link,当流完成后,可以使用count
运算符。
Observable.from(permalinks)
.flatMap(link -> getContent(link))
.count()
.subscribe(System.out::println);
如果你想发出link的数量,在一个成功的link结束时,你可以尝试scan
operator
Observable.from(permalinks)
.flatMap(link -> getContent(link))
.map(content -> 1) // map content to 1 in order to count it.
.scan((seed, acu) -> acu + 1)
.subscribe(System.out::println); // will print 2, 3...
我正在使用一个实现自己的 REST 请求的库。它需要一个 'permalink' 并发出一个 REST 请求来获取内容(我在源代码中提供了用于测试的模拟)。我有一个永久链接列表,想为每个链接安排一个请求并生成内容结果流。所有这些都应该是异步的,一旦完成所有结果,我想发出它们的列表。
我正在尝试使用 RxJava 来实现这一点。这是我现在拥有的(抱歉没有使用 lambda,我只是想习惯 RxJava class 名称):
public class Main {
public static void main(String[] args) {
int count = 10;
List<String> permalinks = new ArrayList<>(count);
for (int i = 1; i <= count; ++i) {
permalinks.add("permalink_" + i);
}
ContentManager cm = new ContentManager();
Observable
.create(new Observable.OnSubscribe<Content>() {
int gotCount = 0;
@Override
public void call(Subscriber<? super Content> subscriber) {
for (String permalink : permalinks) { // 1. is iterating here the correct way?
if (!subscriber.isUnsubscribed()) { // 2. how often and where should I check isUnsubscribed?
cm.getBasicContentByPermalink(permalink, new RestCallback() {
@Override
public void onSuccess(Content content) {
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(content);
completeIfFinished(); // 3. if guarded by isUnsubscribed, onComplete might never be called
}
}
@Override
public void onFailure(int code, String message) {
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(null); // 4. is this OK or is there some other way to mark a failure?
completeIfFinished();
}
}
private void completeIfFinished() {
++gotCount;
if (gotCount == permalinks.size()) { // 5. how to know that the last request is done? am I supposed to implement such custom logic?
subscriber.onCompleted();
}
}
});
}
}
}
})
.toList()
.subscribe(new Action1<List<Content>>() {
@Override
public void call(List<Content> contents) {
System.out.println("list count: " + contents.size());
System.out.println("results: ");
contents.stream().map(content -> content != null ? content.basic : null).forEach(System.out::println);
}
});
System.out.println("finishing main");
}
}
// library mocks
class Content {
String basic;
String extended;
public Content(String basic, String extended) {
this.basic = basic;
this.extended = extended;
}
}
interface RestCallback {
void onSuccess(Content content);
void onFailure(int code, String message);
}
class ContentManager {
private final Random random = new Random();
public void getBasicContentByPermalink(String permalink, RestCallback callback) {
// just to simulate network latency and unordered results
new Thread() {
@Override
public void run() {
try {
Thread.sleep(random.nextInt(1000) + 200);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (random.nextInt(100) < 95) {
// 95% of the time we succeed
callback.onSuccess(new Content(permalink + "_basic", null));
} else {
callback.onFailure(-1, permalink + "_basic_failure");
}
}
}.start();
}
}
这有点工作,但我不确定我是否以正确的方式做事。请查看第 1-5 行:
- 从列表创建 Observable 时,我是否应该迭代 我自己在清单上,还是有其他更好的方法?为了 例如,有 Observable.from(Iterable),但我不认为我 可以用吗?
- 我在发送 REST 请求之前正在检查 isUnsubscribed,并且 也在两个结果处理程序中 (success/failure)。是这样吗 应该使用?
- 我实现了一些逻辑来在所有请求都完成时调用 onComplete 返回,无论成功还是失败(见问题 5)。但是,就像他们一样 由 isUnsubscribed 调用守卫,它们可能会发生 onComplete 永远不会被调用。流怎么知道它 应该完成?当订阅者取消订阅并且我不必考虑它时,它会过早完成吗?有什么注意事项吗?例如,如果订阅者在发送内容结果的过程中取消订阅会发生什么?在我的测试中,从未发出所有结果的列表,但我希望有一个包含到那时为止的所有结果的列表。
- 在 onFailure 方法中,我用 onNext(null) 发出 null 来标记一个 失败。我这样做是因为最后我会得到 2 个流的 zip, 并且仅当两者都压缩时才会发出自定义 class 的实例 值不为空。这是正确的做法吗?
- 正如我提到的,我有一些自定义逻辑来检查流是否是 完成的。我在这里做的是计算 REST 结果以及何时 许多已处理,因为列表中有永久链接,已完成。 这是必要的吗?这是正确的方法吗?
When creating an Observable from a list, am I supposed to iterate over the list myself, or is there some other, better way? For instance, there is Observable.from(Iterable), but I don't think I can use it?
您可以根据 link.
构建一个可观察对象,而不是在create
方法中进行迭代
public Observable<Content> getContent(permalink) {
return Observable.create(subscriber -> {
ContentManager cm = new ContentManager();
if (!subscriber.isUnsubscribed()) {
cm.getBasicContentByPermalink(permalink, new RestCallback() {
@Override
public void onSuccess(Content content) {
subscriber.onNext(content);
subscriber.onCompleted();
}
@Override
public void onFailure(int code, String message) {
subscriber.onError(OnErrorThrowable.addValueAsLastCause(new RuntimeException(message, permalink));
}
}
});
}
然后将其与 flatMap
运算符一起使用
Observable.from(permalinks)
.flatMap(link -> getContent(link))
.subscribe();
I am checking isUnsubscribed before I send the REST request, and also in both result handlers (success/failure). Is this the way it is supposed to be used?
在发送请求之前检查 isUnsubscribed 是正确的做法。不确定这个 success/failure 回调(我认为它没用,但有人会说我错了)
I implement some logic to call onComplete when all requests have returned, no matter if successful or failed (see question 5). But, as they are guarded by the isUnsubscribed call, it may happen that they onComplete is never called.
如果您退订直播,您将停止观看直播,因此您可能不会收到直播结束的通知。
For example, what would happen if the subscriber unsubscribed in the middle of emitting the Content results?
您将能够使用取消订阅前发出的内容。
In the onFailure method, I emit null with onNext(null) to mark a failure. I do it because in the end I will have a zip of 2 streams, and will emit an instance of a custom class only when both zipped values are non null. Is this the correct way to do it?
不。而不是 null,发出错误(在订阅者上使用 onError
)。
使用它,您不必在 zip lambda 中检查 null。压缩!
As I mentioned, I have some custom logic to check if the stream is finished. What I do here is I count the REST results and when as many were processed as there are permalinks in the list, it is done. Is this necessary? Is this the right way?
如果要获取perma的个数link,当流完成后,可以使用count
运算符。
Observable.from(permalinks)
.flatMap(link -> getContent(link))
.count()
.subscribe(System.out::println);
如果你想发出link的数量,在一个成功的link结束时,你可以尝试scan
operator
Observable.from(permalinks)
.flatMap(link -> getContent(link))
.map(content -> 1) // map content to 1 in order to count it.
.scan((seed, acu) -> acu + 1)
.subscribe(System.out::println); // will print 2, 3...