如何在仅使用单个线程的情况下在 RxJava 中递归?
How do I recurse in RxJava while only using a single thread?
我在基于 RxJava 的网络堆栈中有一些工作递归代码,用于将 HTTP 主体字符串文件列表平面映射到表示文件的自定义 POJO 的 Observable 中。
但是,我使用的是 .scheduleOn(Schedulers.newThread())
,它使用多个新线程来完成工作。我刚刚发现我需要连接的设备之一只允许每六十秒进行一次 HTTP 连接以进行锁定。这意味着我需要对与此设备的所有连接使用相同的线程和连接。
以下代码在我用 .scheduleOn(Schedulers.from(Executors.newSingleThreadExecutor()))
调用时发生死锁。我如何重写这个 RxJava 递归以使其在仅使用单个工作线程时更好地运行?我在 Rx.NET 中看到了使用嵌套操作的示例,这些示例让我相信这是可能的。
public Observable<? extends MyFile> getAllFiles(final String path) {
Observable<String> response = NetworkUtils.getRequestAsString(GET_FILE_LIST + path);
return response.flatMap(new Func1<String, Observable<MyFile>>() {
@Override
public Observable<MyFile> call(String s) {
List<MyFile> files = new ArrayList<MyFile>();
Observable<MyFile> fileObservable = Observable.empty();
try {
ObjectMapper mapper = new ObjectMapper();
JSONObject jo = new JSONObject(s);
JSONArray a = jo.names();
for (int i = 0; i < a.length(); i++) {
JSONObject o = jo.getJSONObject(a.getString(i));
MyFile file = mapper.readValue(o.toString(), MyFile.class);
file.setDirectory(path);
if (file.getType().equals("dir")) {
fileObservable = fileObservable.mergeWith(getAllFiles(file.getFullPath())); // add files from subdirectories
} else if (file.getType().equals("file")) {
files.add(file);
}
}
return fileObservable.mergeWith(Observable.from(files)); // add files in the current directory
} catch (Exception e) {
return Observable.error(e);
}
}
});
}
我正在调用这样的函数,其中 mExecutor
被初始化为 Executors.newSingleThreadExecutor()
:
mDevice.getAllFiles("/")
.toList()
.subscribeOn(Schedulers.from(mExecutor))
.observeOn(Schedulers.trampoline())
.subscribe(
//...
);
getRequestAsString 看起来像这样:
public static Observable<String> getRequestAsString(final String urlString) {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
HttpURLConnection urlConnection = null;
try {
// Open connection
URL url = new URL(urlString);
urlConnection = (HttpURLConnection) url.openConnection();
// Set parameters
urlConnection.setDoOutput(false);
urlConnection.setRequestMethod(REQUEST_METHOD_GET);
urlConnection.setRequestProperty(ACCEPT_PROPERTY, ACCEPT_VALUE);
urlConnection.setRequestProperty(CONTENT_TYPE_PROPERTY, CONTENT_TYPE_VALUE);
urlConnection.setUseCaches(false);
urlConnection.connect();
// Get response code
int code = urlConnection.getResponseCode();
if (code == RESULT_OK) {
// Get and return string
BufferedInputStream stream = null;
stream = new BufferedInputStream(urlConnection.getInputStream());
String s = stringFromStream(stream);
subscriber.onNext(s); // Emit string network response to Observable
subscriber.onCompleted(); // Complete Observable
urlConnection.disconnect();
stream.close();
}
} catch (Exception e) {
subscriber.onError(e);
} finally {
if (urlConnection != null)
urlConnection.disconnect();
}
}
});
}
我认为您的代码可以更容易 understand/use,如果您重新制作 flatMap 部分,将更容易找到您的问题。
您可能尝试合并很多可观察对象并将其用于等等
为什么你不使用 flatMap 和 return 一个会发出值的新 Observable(而不是尝试通过合并 observable 来做同样的事情)
yourObs.flatMap(v -> Observable.create(new FromJson(v)).subscribe();
并且在您的 FromJson class 中,使用 :
发出每个文件
subscriber.onNext(yourFile);
我已尝试通过递归列出本地目录对您的代码进行 analogue,但我没有遇到任何挂起。
我的猜测是您的单一连接由服务器强制执行,递归调用会尝试在前一个连接终止之前建立新连接。我会将 urlConnection.disconnect();
移到 onNext
之前,以便可以立即建立新连接。
我在基于 RxJava 的网络堆栈中有一些工作递归代码,用于将 HTTP 主体字符串文件列表平面映射到表示文件的自定义 POJO 的 Observable 中。
但是,我使用的是 .scheduleOn(Schedulers.newThread())
,它使用多个新线程来完成工作。我刚刚发现我需要连接的设备之一只允许每六十秒进行一次 HTTP 连接以进行锁定。这意味着我需要对与此设备的所有连接使用相同的线程和连接。
以下代码在我用 .scheduleOn(Schedulers.from(Executors.newSingleThreadExecutor()))
调用时发生死锁。我如何重写这个 RxJava 递归以使其在仅使用单个工作线程时更好地运行?我在 Rx.NET 中看到了使用嵌套操作的示例,这些示例让我相信这是可能的。
public Observable<? extends MyFile> getAllFiles(final String path) {
Observable<String> response = NetworkUtils.getRequestAsString(GET_FILE_LIST + path);
return response.flatMap(new Func1<String, Observable<MyFile>>() {
@Override
public Observable<MyFile> call(String s) {
List<MyFile> files = new ArrayList<MyFile>();
Observable<MyFile> fileObservable = Observable.empty();
try {
ObjectMapper mapper = new ObjectMapper();
JSONObject jo = new JSONObject(s);
JSONArray a = jo.names();
for (int i = 0; i < a.length(); i++) {
JSONObject o = jo.getJSONObject(a.getString(i));
MyFile file = mapper.readValue(o.toString(), MyFile.class);
file.setDirectory(path);
if (file.getType().equals("dir")) {
fileObservable = fileObservable.mergeWith(getAllFiles(file.getFullPath())); // add files from subdirectories
} else if (file.getType().equals("file")) {
files.add(file);
}
}
return fileObservable.mergeWith(Observable.from(files)); // add files in the current directory
} catch (Exception e) {
return Observable.error(e);
}
}
});
}
我正在调用这样的函数,其中 mExecutor
被初始化为 Executors.newSingleThreadExecutor()
:
mDevice.getAllFiles("/")
.toList()
.subscribeOn(Schedulers.from(mExecutor))
.observeOn(Schedulers.trampoline())
.subscribe(
//...
);
getRequestAsString 看起来像这样:
public static Observable<String> getRequestAsString(final String urlString) {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
HttpURLConnection urlConnection = null;
try {
// Open connection
URL url = new URL(urlString);
urlConnection = (HttpURLConnection) url.openConnection();
// Set parameters
urlConnection.setDoOutput(false);
urlConnection.setRequestMethod(REQUEST_METHOD_GET);
urlConnection.setRequestProperty(ACCEPT_PROPERTY, ACCEPT_VALUE);
urlConnection.setRequestProperty(CONTENT_TYPE_PROPERTY, CONTENT_TYPE_VALUE);
urlConnection.setUseCaches(false);
urlConnection.connect();
// Get response code
int code = urlConnection.getResponseCode();
if (code == RESULT_OK) {
// Get and return string
BufferedInputStream stream = null;
stream = new BufferedInputStream(urlConnection.getInputStream());
String s = stringFromStream(stream);
subscriber.onNext(s); // Emit string network response to Observable
subscriber.onCompleted(); // Complete Observable
urlConnection.disconnect();
stream.close();
}
} catch (Exception e) {
subscriber.onError(e);
} finally {
if (urlConnection != null)
urlConnection.disconnect();
}
}
});
}
我认为您的代码可以更容易 understand/use,如果您重新制作 flatMap 部分,将更容易找到您的问题。
您可能尝试合并很多可观察对象并将其用于等等
为什么你不使用 flatMap 和 return 一个会发出值的新 Observable(而不是尝试通过合并 observable 来做同样的事情)
yourObs.flatMap(v -> Observable.create(new FromJson(v)).subscribe();
并且在您的 FromJson class 中,使用 :
发出每个文件subscriber.onNext(yourFile);
我已尝试通过递归列出本地目录对您的代码进行 analogue,但我没有遇到任何挂起。
我的猜测是您的单一连接由服务器强制执行,递归调用会尝试在前一个连接终止之前建立新连接。我会将 urlConnection.disconnect();
移到 onNext
之前,以便可以立即建立新连接。