处理订阅时取消 Observable 内的长 运行 任务
Cancelling long running task inside Observable when a subscription is disposed
我正在 volley 库上编写一个反应式包装器,以便在我的应用程序中轻松发送 http 请求。这是 class:
/**
* Used to send a http GET/POST request.
*/
public class BasicRequest {
public static final String LOG_TAG = "BasicRequest";
public static final int GET_REQUEST = Request.Method.GET;
public static final int POST_REQUEST = Request.Method.POST;
private final int mRequestType;
private final String mServiceLocation;
private final Map<String, String> mParams;
/**
* Keeps track of all the request for this object. Will be helpful when we need to cancel
* the request when someone disposes the subscription.
*/
private List<StringRequest> mStringRequests = new ArrayList<>();
private Context mContext;
private int mRequestTimeout = BASIC_REQUEST_DEFAULT_TIMEOUT;
public BasicRequest(Context context,
String serviceLocation,
int requestType,
final Map<String, String> params) {
mContext = context;
mRequestType = requestType;
mServiceLocation = serviceLocation;
mParams = params;
}
private void fireRequest(final SingleEmitter<String> e) {
StringRequest stringRequest;
if(mRequestType == GET_REQUEST) {
stringRequest = new StringRequest(Request.Method.GET, mServiceLocation,
new Response.Listener<String>() {
@Override
public void onResponse(String response) {
e.onSuccess(response);
}
}, new Response.ErrorListener() {
@Override
public void onErrorResponse(VolleyError error) {
e.onError(error);
}
});
} else {
stringRequest = new StringRequest(Request.Method.POST, mServiceLocation,
new Response.Listener<String>() {
@Override
public void onResponse(String response) {
e.onSuccess(response);
}
}, new Response.ErrorListener() {
@Override
public void onErrorResponse(VolleyError error) {
e.onError(error);
}
}) {
@Override
protected Map<String, String> getParams() throws AuthFailureError {
return mParams;
}
};
}
mStringRequests.add(stringRequest);
stringRequest.setRetryPolicy(new DefaultRetryPolicy(
mRequestTimeout,
ConnectionUtils.BASIC_REQUEST_DEFAULT_RETRIES,
DefaultRetryPolicy.DEFAULT_BACKOFF_MULT));
VolleyInstance.getInstance(mContext).addToRequestQueue(stringRequest);
}
/**
* Returns a Single observable for results. Queues the request on Subscription. Must be
* called only once during the lifetime of object. Calling multiple times will return null.
* Expect to get VolleyException in case of error.
* @return Single observable for String results. If it's is used for second time, it will
* return null.
*/
@Nullable
public Single<String> get() {
return Single.create(new SingleOnSubscribe<String>() {
@Override
public void subscribe(@NonNull SingleEmitter<String> e) throws Exception {
fireRequest(e);
}
}).doOnDispose(new Action() {
@Override
public void run() throws Exception {
for (StringRequest stringRequest: mStringRequests) {
stringRequest.cancel();
}
}
});
}
/**
* Set the request timeout for this request.
* @param requestTimeout time in milliseconds.
*/
public void setRequestTimeout(int requestTimeout) {
mRequestTimeout = requestTimeout;
}
现在的问题是,当有人处理订阅时,与所有订阅对应的所有请求都将停止。有没有办法只能停止处理订阅的请求?
我知道实现它的一种方法是强制只能维护一个订阅,如果有人再次调用 get,缓存的观察者将被返回。有没有更好的基于订阅处理的http请求处理方式?
你不需要在 fireRequest
之外管理它,SingleEmitter
有 setCancellable
方法,在那里做取消,RxJava 会确保在什么时候调用它有人处置 Observable。
在 fireRequest()
方法中添加,并删除 doOnDispose
:
e.setCancellable(()-> stringRequest.cancel());
我正在 volley 库上编写一个反应式包装器,以便在我的应用程序中轻松发送 http 请求。这是 class:
/**
* Used to send a http GET/POST request.
*/
public class BasicRequest {
public static final String LOG_TAG = "BasicRequest";
public static final int GET_REQUEST = Request.Method.GET;
public static final int POST_REQUEST = Request.Method.POST;
private final int mRequestType;
private final String mServiceLocation;
private final Map<String, String> mParams;
/**
* Keeps track of all the request for this object. Will be helpful when we need to cancel
* the request when someone disposes the subscription.
*/
private List<StringRequest> mStringRequests = new ArrayList<>();
private Context mContext;
private int mRequestTimeout = BASIC_REQUEST_DEFAULT_TIMEOUT;
public BasicRequest(Context context,
String serviceLocation,
int requestType,
final Map<String, String> params) {
mContext = context;
mRequestType = requestType;
mServiceLocation = serviceLocation;
mParams = params;
}
private void fireRequest(final SingleEmitter<String> e) {
StringRequest stringRequest;
if(mRequestType == GET_REQUEST) {
stringRequest = new StringRequest(Request.Method.GET, mServiceLocation,
new Response.Listener<String>() {
@Override
public void onResponse(String response) {
e.onSuccess(response);
}
}, new Response.ErrorListener() {
@Override
public void onErrorResponse(VolleyError error) {
e.onError(error);
}
});
} else {
stringRequest = new StringRequest(Request.Method.POST, mServiceLocation,
new Response.Listener<String>() {
@Override
public void onResponse(String response) {
e.onSuccess(response);
}
}, new Response.ErrorListener() {
@Override
public void onErrorResponse(VolleyError error) {
e.onError(error);
}
}) {
@Override
protected Map<String, String> getParams() throws AuthFailureError {
return mParams;
}
};
}
mStringRequests.add(stringRequest);
stringRequest.setRetryPolicy(new DefaultRetryPolicy(
mRequestTimeout,
ConnectionUtils.BASIC_REQUEST_DEFAULT_RETRIES,
DefaultRetryPolicy.DEFAULT_BACKOFF_MULT));
VolleyInstance.getInstance(mContext).addToRequestQueue(stringRequest);
}
/**
* Returns a Single observable for results. Queues the request on Subscription. Must be
* called only once during the lifetime of object. Calling multiple times will return null.
* Expect to get VolleyException in case of error.
* @return Single observable for String results. If it's is used for second time, it will
* return null.
*/
@Nullable
public Single<String> get() {
return Single.create(new SingleOnSubscribe<String>() {
@Override
public void subscribe(@NonNull SingleEmitter<String> e) throws Exception {
fireRequest(e);
}
}).doOnDispose(new Action() {
@Override
public void run() throws Exception {
for (StringRequest stringRequest: mStringRequests) {
stringRequest.cancel();
}
}
});
}
/**
* Set the request timeout for this request.
* @param requestTimeout time in milliseconds.
*/
public void setRequestTimeout(int requestTimeout) {
mRequestTimeout = requestTimeout;
}
现在的问题是,当有人处理订阅时,与所有订阅对应的所有请求都将停止。有没有办法只能停止处理订阅的请求?
我知道实现它的一种方法是强制只能维护一个订阅,如果有人再次调用 get,缓存的观察者将被返回。有没有更好的基于订阅处理的http请求处理方式?
你不需要在 fireRequest
之外管理它,SingleEmitter
有 setCancellable
方法,在那里做取消,RxJava 会确保在什么时候调用它有人处置 Observable。
在 fireRequest()
方法中添加,并删除 doOnDispose
:
e.setCancellable(()-> stringRequest.cancel());