如何使用 Rx 和 Retrofit 仅发送最后一个请求?

How can I send only last requst with Rx and Retrofit?

我有一个 EditText 视图和它的 TextWatcher,在 onTextChanged 方法中,我必须请求服务器获取来自 EditText 字段的查询结果。 在我的演示者中,我为此使用 rx,但我需要延迟搜索,直到用户输入结束。此刻我得到了这个:

service.getData(query)
            .delaySubscription(REQUEST_DELAY_FROM_SERVER, TimeUnit.MILLISECONDS, Schedulers.io())
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(
                    data-> {
                        getViewState().showData(data);
                    },
                    error -> {
                        Log.e(this.getClass().getSimpleName(), error.getMessage(), error);
                    }
            );

但是 delaySubscription 没有按预期工作。它收集所有呼叫并在延迟后发送每个呼叫。我必须像使用 handler.postDelayed() 一样做,当只发送一次请求时。

尝试改用去抖动。例如。下面的代码查找 TextView 中的变化,并在有变化但去抖动为 100 毫秒时执行某些操作

RxTextView
.textChanges(queryEditText)
.debounce(100, TimeUnit.MILLISECONDS)
.doOnNext(new Action1<CharSequence>() {
    @Override
    public void call(CharSequence charSequence) {

    }
})
.subscribe();

我有类似的地址研究,结合 RxAndroid 可以提供类似的东西:

 RxTextView.textChanges(searchEditText)
                        .debounce(100, TimeUnit.MILLISECONDS)
                        .subscribe(....);

在这种情况下,debounce 运算符将等待可观察对象停止发出 100 毫秒,然后再发出下一个值。

编辑 2:

RxJava2

中演示者的 saple
class Presenter {
    private PublishSubject<String> queryPublishSubject = PublishSubject.create();

    public Presenter() {
        queryPublishSubject
                .debounce(1000, TimeUnit.MILLISECONDS)
                // You might want to skip empty strings
                .filter(new Predicate<CharSequence>() {
                    @Override
                    public boolean test(CharSequence charSequence) {
                        return charSequence.length() > 0;
                    }
                })
                // Switch to IO thread for network call and flatMap text input to API request
                .observeOn(Schedulers.io())
                .flatMap(new Function<CharSequence, Observable<...>() {
                    @Override
                    public Observable<...> apply(final CharSequence charSequence) {
                        return ...; // Call API
                    }
                })
                // Receive and process response on Main thread (if you need to update UI)
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(...);
    }

    public void onSearchTextChanged(String query) {
        queryPublishSubject.onNext(query);
    }
}

编辑 1:

RxJava 1中的相同代码:

class Presenter {
    private PublishSubject<String> queryPublishSubject = PublishSubject.crate();

    public Presenter() {
        queryPublishSubject
            .debounce(1000, TimeUnit.MILLISECONDS)
            // You might want to skip empty strings
            .filter(new Func1<CharSequence, Boolean>() {
                @Override
                public Boolean call(CharSequence charSequence) {
                    return charSequence.length() > 0;
                }
            })
            // Switch to IO thread for network call and flatMap text input to API request
            .observeOn(Schedulers.io())
            .flatMap(new Func1<CharSequence, Observable<...>() {
                @Override
                public Observable<...> call(final CharSequence charSequence) {
                    return ... // Call API
                }
            })
            // Receive and process response on Main thread (if you need to update UI)
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(...);
    }

    public void onSearchTextChanged(String query) {
        queryPublishSubject.onNext(query);
    } 
}  

初始答案(使用 RxBinding 和 RxJava 1)

正确答案是使用 Debounce,但除此之外还有一些其他技巧可能对您有用

textChangeListener = RxTextView
    .textChanges(queryEditText)
    // as far as I know, subscription to textChanges is allowed from Main thread only
    .subscribeOn(AndroidSchedulers.mainThread()) 
    // On subscription Observable emits current text field value. You might not need that
    .skip(1) 
    .debounce(1000, TimeUnit.MILLISECONDS)
    // You might want to skip empty strings
    .filter(new Func1<CharSequence, Boolean>() {
        @Override
        public Boolean call(CharSequence charSequence) {
            return charSequence.length() > 0;
        }
    })
    // Switch to IO thread for network call and flatMap text input to API request
    .observeOn(Schedulers.io())
    .flatMap(new Func1<CharSequence, Observable<...>() {
        @Override
        public Observable<...> call(final CharSequence charSequence) {
            return ... // Call API
        }
    })
    // Receive and process response on Main thread (if you need to update UI)
    .observeOn(AndroidSchedulers.mainThread())