RxJava:PublishSubject 同步动作

RxJava: PublishSubject acts synchronously

我需要一种功能,允许将异步消息推送到我的 PublishSubject 并通过 ConnectableObservable 以一定的速度(实际上是一条一条地)处理它们。不幸的是,在底层 Subscriber 处理消息之前,似乎不会释放对 PublishSubjectonNext 的调用。

处理每条消息需要几秒钟,在调试模式下我看到它在调用将消息推送到 PublishSubject 的方法从堆栈中删除之前执行 - "After push..." 总是在内部之后出现在控制台中登录 Subscriber...

所以我有这个 RestEndpoint:

@PUT
@Path("{id}")
@TokenAuthenticated
public Response postResource(@PathParam(value="id") final String extId) {
    executorService.execute(new Runnable() {

        @Override
        public void run() {
            try {
                Message metadata = processor.apply(extId);
                log.info("Before push...");
                dataImporter.pushData(metadata);
                log.info("After push...");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    });
    return Response.ok("Request received successfully").build();

}

这是 DataImporter 的构造函数:

public DataImporter(final String configFile) {
        dataToImportSubject = PublishSubject.create();
        dataToImportObservable = dataToImportSubject.publish();
        dataToImportObservable.connect();
        dataToImportObservable
            .onBackpressureBuffer(1, new Action0() {

                @Override
                public void call() {
                    logger.debug("Buffer full...");
                }
            })
            .subscribeOn(Schedulers.io())
            .subscribe(new Subscriber<Message>() {

                @Override
                public void onCompleted() {
                    // TODO Auto-generated method stub

                }

                @Override
                public void onError(Throwable e) {
                    logger.error("Error importing "+e.getMessage());
                }

                @Override
                public void onNext(Message value) {
                    request(1);
                    importResult(configFile, value);
                }

                @Override
                public void onStart() {
                    request(1);
                }
            });
    }

然后 DataImporter 的 pushData 只是推送到 PublishSubjectonNext 方法..:[=​​26=]

public void pushData(Message metadata) {
    dataToImportSubject.onNext(metadata);       
}

这里是 PublishSubjectConnectableObservable 的声明:

public class DataImporter implements ImporterProxy{

    private final PublishSubject<Message> dataToImportSubject;
    private final ConnectableObservable<Message> dataToImportObservable;

默认情况下,RxJava 是同步的。您需要将运算符引入您的观察者链中以在其他线程上执行操作。当您阅读 Observable 中每个运算符的文档时,您会看到类似“...不在特定调度程序上运算符”这样的语句——这表明数据同步流经该运算符。

要让观察者链在其他线程上执行操作,您可以将 subscribeOn() 之类的运算符与调度程序一起使用,以便在该调度程序上执行操作。在您的示例中,您可能希望使用 Schedulers.io() 来提供后台线程。

PublishSubjects 在原始 onXXX 调用的线程上发送给他们的消费者:

JavaDocs

Scheduler:

PublishSubject does not operate by default on a particular Scheduler and the Observers get notified on the thread the respective onXXX methods were invoked.

您必须使用 observeOn 将处理移动到其他线程,因为 observeOn 可以将 onXXX 调用移动到另一个线程。

subscribeOn 通常对 Subjects 没有任何实际影响,因为它只影响订阅线程,不会调节对这些主题的后续 onXXX 调用。