如何在仅使用单个线程的情况下在 RxJava 中递归?

How do I recurse in RxJava while only using a single thread?

我在基于 RxJava 的网络堆栈中有一些工作递归代码,用于将 HTTP 主体字符串文件列表平面映射到表示文件的自定义 POJO 的 Observable 中。

但是,我使用的是 .scheduleOn(Schedulers.newThread()),它使用多个新线程来完成工作。我刚刚发现我需要连接的设备之一只允许每六十秒进行一次 HTTP 连接以进行锁定。这意味着我需要对与此设备的所有连接使用相同的线程和连接。

以下代码在我用 .scheduleOn(Schedulers.from(Executors.newSingleThreadExecutor())) 调用时发生死锁。我如何重写这个 RxJava 递归以使其在仅使用单个工作线程时更好地运行?我在 Rx.NET 中看到了使用嵌套操作的示例,这些示例让我相信这是可能的。

public Observable<? extends MyFile> getAllFiles(final String path) {

    Observable<String> response = NetworkUtils.getRequestAsString(GET_FILE_LIST + path);

    return response.flatMap(new Func1<String, Observable<MyFile>>() {
        @Override
        public Observable<MyFile> call(String s) {
            List<MyFile> files = new ArrayList<MyFile>();
            Observable<MyFile> fileObservable = Observable.empty();

            try {
                ObjectMapper mapper = new ObjectMapper();
                JSONObject jo = new JSONObject(s);
                JSONArray a = jo.names();
                for (int i = 0; i < a.length(); i++) {
                    JSONObject o = jo.getJSONObject(a.getString(i));
                    MyFile file = mapper.readValue(o.toString(), MyFile.class);
                    file.setDirectory(path);
                    if (file.getType().equals("dir")) {
                        fileObservable = fileObservable.mergeWith(getAllFiles(file.getFullPath())); // add files from subdirectories
                    } else if (file.getType().equals("file")) {
                        files.add(file);
                    }
                }
                return fileObservable.mergeWith(Observable.from(files)); // add files in the current directory
            } catch (Exception e) {
                return Observable.error(e);
            }
        }
    });
}

我正在调用这样的函数,其中 mExecutor 被初始化为 Executors.newSingleThreadExecutor():

mDevice.getAllFiles("/")
 .toList()
 .subscribeOn(Schedulers.from(mExecutor))
 .observeOn(Schedulers.trampoline())
 .subscribe(
   //...
 );

getRequestAsString 看起来像这样:

public static Observable<String> getRequestAsString(final String urlString) {

    return Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            HttpURLConnection urlConnection = null;
            try {
                // Open connection
                URL url = new URL(urlString);
                urlConnection = (HttpURLConnection) url.openConnection();

                // Set parameters
                urlConnection.setDoOutput(false);
                urlConnection.setRequestMethod(REQUEST_METHOD_GET);
                urlConnection.setRequestProperty(ACCEPT_PROPERTY, ACCEPT_VALUE);
                urlConnection.setRequestProperty(CONTENT_TYPE_PROPERTY, CONTENT_TYPE_VALUE);
                urlConnection.setUseCaches(false);
                urlConnection.connect();

                // Get response code
                int code = urlConnection.getResponseCode();
                if (code == RESULT_OK) {

                    // Get and return string
                    BufferedInputStream stream = null;
                    stream = new BufferedInputStream(urlConnection.getInputStream());
                    String s = stringFromStream(stream);
                    subscriber.onNext(s); // Emit string network response to Observable
                    subscriber.onCompleted(); // Complete Observable
                    urlConnection.disconnect();
                    stream.close();
                }
            } catch (Exception e) {
                subscriber.onError(e);
            } finally {
                if (urlConnection != null)
                    urlConnection.disconnect();
            }
        }
    });
}

我认为您的代码可以更容易 understand/use,如果您重新制作 flatMap 部分,将更容易找到您的问题。

您可能尝试合并很多可观察对象并将其用于等等

为什么你不使用 flatMap 和 return 一个会发出值的新 Observable(而不是尝试通过合并 observable 来做同样的事情)

yourObs.flatMap(v -> Observable.create(new FromJson(v)).subscribe();

并且在您的 FromJson class 中,使用 :

发出每个文件
subscriber.onNext(yourFile);

我已尝试通过递归列出本地目录对您的代码进行 analogue,但我没有遇到任何挂起。

我的猜测是您的单一连接由服务器强制执行,递归调用会尝试在前一个连接终止之前建立新连接。我会将 urlConnection.disconnect(); 移到 onNext 之前,以便可以立即建立新连接。