RxJava:PublishSubject 同步动作
RxJava: PublishSubject acts synchronously
我需要一种功能,允许将异步消息推送到我的 PublishSubject
并通过 ConnectableObservable
以一定的速度(实际上是一条一条地)处理它们。不幸的是,在底层 Subscriber
处理消息之前,似乎不会释放对 PublishSubject
的 onNext
的调用。
处理每条消息需要几秒钟,在调试模式下我看到它在调用将消息推送到 PublishSubject 的方法从堆栈中删除之前执行 - "After push..."
总是在内部之后出现在控制台中登录 Subscriber
...
所以我有这个 RestEndpoint:
@PUT
@Path("{id}")
@TokenAuthenticated
public Response postResource(@PathParam(value="id") final String extId) {
executorService.execute(new Runnable() {
@Override
public void run() {
try {
Message metadata = processor.apply(extId);
log.info("Before push...");
dataImporter.pushData(metadata);
log.info("After push...");
} catch (Exception e) {
e.printStackTrace();
}
}
});
return Response.ok("Request received successfully").build();
}
这是 DataImporter 的构造函数:
public DataImporter(final String configFile) {
dataToImportSubject = PublishSubject.create();
dataToImportObservable = dataToImportSubject.publish();
dataToImportObservable.connect();
dataToImportObservable
.onBackpressureBuffer(1, new Action0() {
@Override
public void call() {
logger.debug("Buffer full...");
}
})
.subscribeOn(Schedulers.io())
.subscribe(new Subscriber<Message>() {
@Override
public void onCompleted() {
// TODO Auto-generated method stub
}
@Override
public void onError(Throwable e) {
logger.error("Error importing "+e.getMessage());
}
@Override
public void onNext(Message value) {
request(1);
importResult(configFile, value);
}
@Override
public void onStart() {
request(1);
}
});
}
然后 DataImporter 的 pushData
只是推送到 PublishSubject
的 onNext
方法..:[=26=]
public void pushData(Message metadata) {
dataToImportSubject.onNext(metadata);
}
这里是 PublishSubject
和 ConnectableObservable
的声明:
public class DataImporter implements ImporterProxy{
private final PublishSubject<Message> dataToImportSubject;
private final ConnectableObservable<Message> dataToImportObservable;
默认情况下,RxJava 是同步的。您需要将运算符引入您的观察者链中以在其他线程上执行操作。当您阅读 Observable
中每个运算符的文档时,您会看到类似“...不在特定调度程序上运算符”这样的语句——这表明数据同步流经该运算符。
要让观察者链在其他线程上执行操作,您可以将 subscribeOn()
之类的运算符与调度程序一起使用,以便在该调度程序上执行操作。在您的示例中,您可能希望使用 Schedulers.io()
来提供后台线程。
PublishSubject
s 在原始 onXXX
调用的线程上发送给他们的消费者:
Scheduler:
PublishSubject
does not operate by default on a particular Scheduler
and the Observer
s get notified on the thread the respective onXXX
methods were invoked.
您必须使用 observeOn
将处理移动到其他线程,因为 observeOn
可以将 onXXX
调用移动到另一个线程。
subscribeOn
通常对 Subject
s 没有任何实际影响,因为它只影响订阅线程,不会调节对这些主题的后续 onXXX
调用。
我需要一种功能,允许将异步消息推送到我的 PublishSubject
并通过 ConnectableObservable
以一定的速度(实际上是一条一条地)处理它们。不幸的是,在底层 Subscriber
处理消息之前,似乎不会释放对 PublishSubject
的 onNext
的调用。
处理每条消息需要几秒钟,在调试模式下我看到它在调用将消息推送到 PublishSubject 的方法从堆栈中删除之前执行 - "After push..."
总是在内部之后出现在控制台中登录 Subscriber
...
所以我有这个 RestEndpoint:
@PUT
@Path("{id}")
@TokenAuthenticated
public Response postResource(@PathParam(value="id") final String extId) {
executorService.execute(new Runnable() {
@Override
public void run() {
try {
Message metadata = processor.apply(extId);
log.info("Before push...");
dataImporter.pushData(metadata);
log.info("After push...");
} catch (Exception e) {
e.printStackTrace();
}
}
});
return Response.ok("Request received successfully").build();
}
这是 DataImporter 的构造函数:
public DataImporter(final String configFile) {
dataToImportSubject = PublishSubject.create();
dataToImportObservable = dataToImportSubject.publish();
dataToImportObservable.connect();
dataToImportObservable
.onBackpressureBuffer(1, new Action0() {
@Override
public void call() {
logger.debug("Buffer full...");
}
})
.subscribeOn(Schedulers.io())
.subscribe(new Subscriber<Message>() {
@Override
public void onCompleted() {
// TODO Auto-generated method stub
}
@Override
public void onError(Throwable e) {
logger.error("Error importing "+e.getMessage());
}
@Override
public void onNext(Message value) {
request(1);
importResult(configFile, value);
}
@Override
public void onStart() {
request(1);
}
});
}
然后 DataImporter 的 pushData
只是推送到 PublishSubject
的 onNext
方法..:[=26=]
public void pushData(Message metadata) {
dataToImportSubject.onNext(metadata);
}
这里是 PublishSubject
和 ConnectableObservable
的声明:
public class DataImporter implements ImporterProxy{
private final PublishSubject<Message> dataToImportSubject;
private final ConnectableObservable<Message> dataToImportObservable;
默认情况下,RxJava 是同步的。您需要将运算符引入您的观察者链中以在其他线程上执行操作。当您阅读 Observable
中每个运算符的文档时,您会看到类似“...不在特定调度程序上运算符”这样的语句——这表明数据同步流经该运算符。
要让观察者链在其他线程上执行操作,您可以将 subscribeOn()
之类的运算符与调度程序一起使用,以便在该调度程序上执行操作。在您的示例中,您可能希望使用 Schedulers.io()
来提供后台线程。
PublishSubject
s 在原始 onXXX
调用的线程上发送给他们的消费者:
Scheduler:
PublishSubject
does not operate by default on a particularScheduler
and theObserver
s get notified on the thread the respectiveonXXX
methods were invoked.
您必须使用 observeOn
将处理移动到其他线程,因为 observeOn
可以将 onXXX
调用移动到另一个线程。
subscribeOn
通常对 Subject
s 没有任何实际影响,因为它只影响订阅线程,不会调节对这些主题的后续 onXXX
调用。