如何在 rxJava 中手动调用 observer.onNext
How to manually call observer.onNext in rxJava
我对 RxJava/RxAndroid 比较陌生。之前我一直在使用 AsyncTask 来完成我的 运行 长任务。
我已经将我的大部分 AsyncTask 转换为 RxJava,但这个。
我遇到的特殊问题是在中调用类似 AsyncTask 的 publishProgress(params);
后台线程。我需要这样做来更新 ProgressBar
.
的进度
首先这是AsyncTask中的代码
private static class AddBooksToDatabase extends AsyncTask<String, String, String> {
//dependencies removed
AddBooksToDatabase(AddBooksDbParams params) {
//Removed assignment codes
}
@Override
protected String doInBackground(String... strings) {
//Initializing custom SQLiteOpenHelper and SQLite database
File mFile = new File(mFolderPath);
int booksSize = getFilesInFolder(mFile).size();
String[] sizeList = {String.valueOf(booksSize)};
//The first publishProgress is used to set the max of the progressbar
publishProgress(sizeList);
for (int i = 0; i < booksSize; i++) {
//publishProgress with current item, current file
publishProgress(String.valueOf(i), getFilesInFolder(mFile).get(i).getName());
//Inserting current items in database. Code removed
}
return null;
}
@Override
protected void onPreExecute() {
//Show ProgressBar
}
@Override
protected void onPostExecute(String s) {
//Hide ProgressBar
}
@Override
protected void onProgressUpdate(String... values) {
super.onProgressUpdate(values);
if (values.length == 1) {
//The first call to publishProgress
mProgressBar.setMax(Integer.parseInt(values[0]));
} else {
//Subsequent calls to publish progress
Log.i(TAG, "Current item is " + values[0] + " and current file is " + values[1]);
infoText.setText(values[1]);
mProgressBar.setProgress(Integer.parseInt(values[0]), true);
}
}
@Override
protected void onCancelled() {
cancel(true);
}
}
使用 RxJava 的代码
final Observable<String[]> addBooksObserver = Observable.create(new Observable.OnSubscribe<String[]>() {
@Override
public void call(Subscriber<? super String[]> subscriber) {
subscriber.onNext(setAddSubscription());
subscriber.onCompleted();
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
private String[] setAddSubscription() {
S//Initializing custom SQLiteOpenHelper and SQLite database
File mFile = new File(mFolderPath);
int booksSize = getFilesInFolder(mFile).size();
String[] sizeList = {String.valueOf(booksSize)};
//The first publishProgress is used to set the max of the progressbar
addBooksObserver.doOnNext(addReturnParams(String.valueOf(sizeList.length), null, null));
for (int i = 0; i < booksSize; i++) {
EpubReader reader = new EpubReader();
//publishProgress with current item, current file*
addBooksObserver.doOnNext(addReturnParams(String.valueOf(sizeList.length),
String.valueOf(i), getFilesInFolder(mFile).get(i).getName()));
//Inserting current item in database. Code removed
}
return null;
}
private String[] addReturnParams(String totalItems, String currentItem, String currentFile) {
return new String[]{totalItems, currentItem, currentFile};
}
问题是第 addBooksObserver.doOnNext(addReturnParams(
行显示了这个错误
doOnNext (rx.functions.Action1) 无法应用于 (java.lang.String[])
我不知道如何解决这个问题,因为我认为既然 setAddSubscription()
和 addReturnParams(String totalItems, String currentItem, String currentFile)
正在返回字符串数组,那么这应该不是问题。你能帮帮我吗?
你的 Observer
应该像 Observable.create(new Observable.OnSubscribe<String>()
并且在你的 call
方法中你应该循环遍历 StringArray
并将它传递给 onNext
例如:
@Override
public void call(Subscriber<? super String> subscriber) {
for(String val : setAddSubscription()) {
subscriber.onNext(val);
}
subscriber.onCompleted();
}
现在 onNext
应 return 您的个人项目 & onCompleted
将在循环完成后调用。
编辑
myObserver.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
// handle completion.
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String value) {
// do whatever with each value passed to onNext
}
});
您只需将值传递给 subscriber
的 onNext
方法, 而不是 可观察对象的 doOnNext
方法!
您还必须订阅该服务。为您的观察者尝试这样的事情:
Observable.create(new Observable.OnSubscribe<String[]>() {
@Override
public void call(Subscriber<? super String[]> subscriber) {
setAddSubscription(subscriber);
subscriber.onCompleted();
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<String[]>() {
@Override
public void onCompleted() {
// handle 'oparation is done'
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String[] values) {
if (values.length == 1) {
//The first call to publishProgress
mProgressBar.setMax(Integer.parseInt(values[0]));
} else {
//Subsequent calls to publish progress
Log.i(TAG, "Current item is " + values[0] + " and current file is " + values[1]);
infoText.setText(values[1]);
mProgressBar.setProgress(Integer.parseInt(values[0]), true);
}
}
});
你还需要稍微修改一下你的私有方法:
private void setAddSubscription(Subscriber<? super String[]> subscriber) {
//Initializing custom SQLiteOpenHelper and SQLite database
File mFile = new File(mFolderPath);
int booksSize = getFilesInFolder(mFile).size();
String[] sizeList = {String.valueOf(booksSize)};
//The first publishProgress is used to set the max of the progressbar
subscriber.onNext(addReturnParams(String.valueOf(sizeList.length), null, null));
for (int i = 0; i < booksSize; i++) {
EpubReader reader = new EpubReader();
//publishProgress with current item, current file*
subscriber.onNext(addReturnParams(String.valueOf(sizeList.length),
String.valueOf(i), getFilesInFolder(mFile).get(i).getName()));
//Inserting current item in database. Code removed
}
}
private String[] addReturnParams(String totalItems, String currentItem, String currentFile) {
return new String[]{totalItems, currentItem, currentFile};
}
Observable.create(emitter -> {
for (int i = 0; i < 10; i++) {
int[] ii = {i, i * 2};
emitter.onNext(ii);
}
emitter.onComplete();
}).observeOn(AndroidSchedulers.mainThread()).subscribeOn(Schedulers.io()).subscribe(o -> {
// update progress
int[] i = (int[]) o;
Toast.makeText(SearchActivity.this, "oftad " + i[0] + " - " + i[1], Toast.LENGTH_SHORT).show();
}, t -> {
// on error
Toast.makeText(SearchActivity.this, t.getMessage(), Toast.LENGTH_SHORT).show();
}, () -> {
// progress tamom shod
Toast.makeText(SearchActivity.this, "completed", Toast.LENGTH_SHORT).show();
});
您可以使用 Subject
像这样手动调用 onNext() :
Subject<Event> event = Subject.create();
现在调用 onNext() 发送事件,例如:
event.onNext("event");
最后您可以 return Observable
使用此代码:
event.toFlowable(BackpressureStrategy.LATEST)
.toObservable();
我对 RxJava/RxAndroid 比较陌生。之前我一直在使用 AsyncTask 来完成我的 运行 长任务。
我已经将我的大部分 AsyncTask 转换为 RxJava,但这个。
我遇到的特殊问题是在中调用类似 AsyncTask 的 publishProgress(params);
后台线程。我需要这样做来更新 ProgressBar
.
首先这是AsyncTask中的代码
private static class AddBooksToDatabase extends AsyncTask<String, String, String> {
//dependencies removed
AddBooksToDatabase(AddBooksDbParams params) {
//Removed assignment codes
}
@Override
protected String doInBackground(String... strings) {
//Initializing custom SQLiteOpenHelper and SQLite database
File mFile = new File(mFolderPath);
int booksSize = getFilesInFolder(mFile).size();
String[] sizeList = {String.valueOf(booksSize)};
//The first publishProgress is used to set the max of the progressbar
publishProgress(sizeList);
for (int i = 0; i < booksSize; i++) {
//publishProgress with current item, current file
publishProgress(String.valueOf(i), getFilesInFolder(mFile).get(i).getName());
//Inserting current items in database. Code removed
}
return null;
}
@Override
protected void onPreExecute() {
//Show ProgressBar
}
@Override
protected void onPostExecute(String s) {
//Hide ProgressBar
}
@Override
protected void onProgressUpdate(String... values) {
super.onProgressUpdate(values);
if (values.length == 1) {
//The first call to publishProgress
mProgressBar.setMax(Integer.parseInt(values[0]));
} else {
//Subsequent calls to publish progress
Log.i(TAG, "Current item is " + values[0] + " and current file is " + values[1]);
infoText.setText(values[1]);
mProgressBar.setProgress(Integer.parseInt(values[0]), true);
}
}
@Override
protected void onCancelled() {
cancel(true);
}
}
使用 RxJava 的代码
final Observable<String[]> addBooksObserver = Observable.create(new Observable.OnSubscribe<String[]>() {
@Override
public void call(Subscriber<? super String[]> subscriber) {
subscriber.onNext(setAddSubscription());
subscriber.onCompleted();
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
private String[] setAddSubscription() {
S//Initializing custom SQLiteOpenHelper and SQLite database
File mFile = new File(mFolderPath);
int booksSize = getFilesInFolder(mFile).size();
String[] sizeList = {String.valueOf(booksSize)};
//The first publishProgress is used to set the max of the progressbar
addBooksObserver.doOnNext(addReturnParams(String.valueOf(sizeList.length), null, null));
for (int i = 0; i < booksSize; i++) {
EpubReader reader = new EpubReader();
//publishProgress with current item, current file*
addBooksObserver.doOnNext(addReturnParams(String.valueOf(sizeList.length),
String.valueOf(i), getFilesInFolder(mFile).get(i).getName()));
//Inserting current item in database. Code removed
}
return null;
}
private String[] addReturnParams(String totalItems, String currentItem, String currentFile) {
return new String[]{totalItems, currentItem, currentFile};
}
问题是第 addBooksObserver.doOnNext(addReturnParams(
行显示了这个错误
doOnNext (rx.functions.Action1) 无法应用于 (java.lang.String[])
我不知道如何解决这个问题,因为我认为既然 setAddSubscription()
和 addReturnParams(String totalItems, String currentItem, String currentFile)
正在返回字符串数组,那么这应该不是问题。你能帮帮我吗?
你的 Observer
应该像 Observable.create(new Observable.OnSubscribe<String>()
并且在你的 call
方法中你应该循环遍历 StringArray
并将它传递给 onNext
例如:
@Override
public void call(Subscriber<? super String> subscriber) {
for(String val : setAddSubscription()) {
subscriber.onNext(val);
}
subscriber.onCompleted();
}
现在 onNext
应 return 您的个人项目 & onCompleted
将在循环完成后调用。
编辑
myObserver.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
// handle completion.
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String value) {
// do whatever with each value passed to onNext
}
});
您只需将值传递给 subscriber
的 onNext
方法, 而不是 可观察对象的 doOnNext
方法!
您还必须订阅该服务。为您的观察者尝试这样的事情:
Observable.create(new Observable.OnSubscribe<String[]>() {
@Override
public void call(Subscriber<? super String[]> subscriber) {
setAddSubscription(subscriber);
subscriber.onCompleted();
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<String[]>() {
@Override
public void onCompleted() {
// handle 'oparation is done'
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String[] values) {
if (values.length == 1) {
//The first call to publishProgress
mProgressBar.setMax(Integer.parseInt(values[0]));
} else {
//Subsequent calls to publish progress
Log.i(TAG, "Current item is " + values[0] + " and current file is " + values[1]);
infoText.setText(values[1]);
mProgressBar.setProgress(Integer.parseInt(values[0]), true);
}
}
});
你还需要稍微修改一下你的私有方法:
private void setAddSubscription(Subscriber<? super String[]> subscriber) {
//Initializing custom SQLiteOpenHelper and SQLite database
File mFile = new File(mFolderPath);
int booksSize = getFilesInFolder(mFile).size();
String[] sizeList = {String.valueOf(booksSize)};
//The first publishProgress is used to set the max of the progressbar
subscriber.onNext(addReturnParams(String.valueOf(sizeList.length), null, null));
for (int i = 0; i < booksSize; i++) {
EpubReader reader = new EpubReader();
//publishProgress with current item, current file*
subscriber.onNext(addReturnParams(String.valueOf(sizeList.length),
String.valueOf(i), getFilesInFolder(mFile).get(i).getName()));
//Inserting current item in database. Code removed
}
}
private String[] addReturnParams(String totalItems, String currentItem, String currentFile) {
return new String[]{totalItems, currentItem, currentFile};
}
Observable.create(emitter -> {
for (int i = 0; i < 10; i++) {
int[] ii = {i, i * 2};
emitter.onNext(ii);
}
emitter.onComplete();
}).observeOn(AndroidSchedulers.mainThread()).subscribeOn(Schedulers.io()).subscribe(o -> {
// update progress
int[] i = (int[]) o;
Toast.makeText(SearchActivity.this, "oftad " + i[0] + " - " + i[1], Toast.LENGTH_SHORT).show();
}, t -> {
// on error
Toast.makeText(SearchActivity.this, t.getMessage(), Toast.LENGTH_SHORT).show();
}, () -> {
// progress tamom shod
Toast.makeText(SearchActivity.this, "completed", Toast.LENGTH_SHORT).show();
});
您可以使用 Subject
像这样手动调用 onNext() :
Subject<Event> event = Subject.create();
现在调用 onNext() 发送事件,例如:
event.onNext("event");
最后您可以 return Observable
使用此代码:
event.toFlowable(BackpressureStrategy.LATEST)
.toObservable();