在不使用 "parallel" 运算符的情况下使顺序处理异步
Make sequential processing asynchronous without using the "parallel" operator
我现在正在研究 Java 方法的一部分:
Observable<Map.Entry<String, ConstituentInfo>> obs = Observable.from(constituents.entrySet());
Subscriber<Map.Entry<String, ConstituentInfo>> sub = new Subscriber<Map.Entry<String, ConstituentInfo>>(){
String securityBySymbol, company, symbol, companyName, marketPlace, countryName, tier, tierId;
ConstituentInfo constituent;
Integer compId;
@Override
public void onNext(Map.Entry<String, ConstituentInfo> entry) {
logger.info("blah blah test");
}
@Override
public void onCompleted() {
logger.info("completed successfully");
}
@Override
public void onError(Throwable throwable) {
logger.error(throwable.getMessage());
throwable.printStackTrace();
}
};
obs.observeOn(Schedulers.io()).subscribe(sub);
该方法本质上是处理 Map.Entry
中的每个条目,但这似乎是按顺序处理(同一线程)。如果不使用 "parallel" 运算符(即同时处理条目),我将如何使这个过程异步?我尝试 运行 上面的代码,但我遗漏了一些结果(有些未正确处理)。
据报道,您的可观察对象在主线程上 运行 并且 Subscriber 方法将由 Schedulers.io()
指定的一名工作人员调用,这就是它出现在一个线程上的原因。您的代码不表示订阅者记录之外的任何实际工作,因此这里没有任何异步操作。
也许你打算这样做?
obs.subscribeOn(Schedulers.io()).subscribe(sub);
就并行处理而言,如果您的最后一行是这样的:
obs.doOnNext(entry -> doWork(entry)).observeOn(Schedulers.io()).subscribe(sub);
然后您可以像这样异步完成 doWork
位:
int numProcessors = Runtime.getRuntime().availableProcessors();
obs.buffer(Math.max(1, constituents.size()/numProcessors))
.flatMap(list -> Observable.from(list)
.doOnNext(entry -> doWork(entry)
.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.io())
.subscribe(sub);
您需要通过处理器缓冲工作,否则可能会发生大量线程上下文切换。
我不确定您为什么缺少结果。如果您有可重复的测试用例,请将代码粘贴到此处或在 github 上将其作为问题报告给 RxJava。
我现在正在研究 Java 方法的一部分:
Observable<Map.Entry<String, ConstituentInfo>> obs = Observable.from(constituents.entrySet());
Subscriber<Map.Entry<String, ConstituentInfo>> sub = new Subscriber<Map.Entry<String, ConstituentInfo>>(){
String securityBySymbol, company, symbol, companyName, marketPlace, countryName, tier, tierId;
ConstituentInfo constituent;
Integer compId;
@Override
public void onNext(Map.Entry<String, ConstituentInfo> entry) {
logger.info("blah blah test");
}
@Override
public void onCompleted() {
logger.info("completed successfully");
}
@Override
public void onError(Throwable throwable) {
logger.error(throwable.getMessage());
throwable.printStackTrace();
}
};
obs.observeOn(Schedulers.io()).subscribe(sub);
该方法本质上是处理 Map.Entry
中的每个条目,但这似乎是按顺序处理(同一线程)。如果不使用 "parallel" 运算符(即同时处理条目),我将如何使这个过程异步?我尝试 运行 上面的代码,但我遗漏了一些结果(有些未正确处理)。
据报道,您的可观察对象在主线程上 运行 并且 Subscriber 方法将由 Schedulers.io()
指定的一名工作人员调用,这就是它出现在一个线程上的原因。您的代码不表示订阅者记录之外的任何实际工作,因此这里没有任何异步操作。
也许你打算这样做?
obs.subscribeOn(Schedulers.io()).subscribe(sub);
就并行处理而言,如果您的最后一行是这样的:
obs.doOnNext(entry -> doWork(entry)).observeOn(Schedulers.io()).subscribe(sub);
然后您可以像这样异步完成 doWork
位:
int numProcessors = Runtime.getRuntime().availableProcessors(); obs.buffer(Math.max(1, constituents.size()/numProcessors)) .flatMap(list -> Observable.from(list) .doOnNext(entry -> doWork(entry) .subscribeOn(Schedulers.computation()) .observeOn(Schedulers.io()) .subscribe(sub);
您需要通过处理器缓冲工作,否则可能会发生大量线程上下文切换。
我不确定您为什么缺少结果。如果您有可重复的测试用例,请将代码粘贴到此处或在 github 上将其作为问题报告给 RxJava。