重复改造 Observable 请求
Repeat A Retrofit Observable Request
我正在尝试发出从服务器拉取数据的请求,直到完成拉取数据。问题是,响应每页有 21 个数据。但是,有一个元标记可以知道是否有下一页。所以,我可以拉到 nextPage == totalPage
.
public static Observable<LgaListResponse> getPages(Context acontext) {
String token = PrefUtils.getToken(acontext);
BehaviorSubject<Integer> pageControl = BehaviorSubject.<Integer>create(1);
Observable<LgaListResponse> ret2 = pageControl.asObservable().concatMap(integer -> {
if (integer > 0) {
Log.e(TAG, "Integer: " + integer);
return ServiceGenerator.createService(ApiService.class, token)
.getLgas(String.valueOf(integer), String.valueOf(21))
.doOnNext(lgaListResponse -> {
if (lgaListResponse.getMeta().getPage() != lgaListResponse.getMeta().getPageCount()) {
pageControl.onNext(initialPage + 1);
} else {
pageControl.onNext(-1);
}
});
} else {
return Observable.<LgaListResponse>empty().doOnCompleted(pageControl::onCompleted);
}
});
return Observable.defer(() -> ret2);
}
还有我的ServiceGenerator
public class ServiceGenerator {
private static final String TAG = "ServiceGen";
private static OkHttpClient.Builder builder = new OkHttpClient.Builder();
private static Retrofit.Builder retrofitBuilder =
new Retrofit.Builder()
.baseUrl(BuildConfig.HOST)
.addCallAdapterFactory(RxJavaCallAdapterFactory.createWithScheduler(Schedulers.io()))
.addConverterFactory(GsonConverterFactory.create(CustomGsonParser.returnCustomParser()));
public static <S> S createService(Class<S> serviceClass, String token) {
builder.addInterceptor(new HttpLoggingInterceptor().setLevel(HttpLoggingInterceptor.Level.BODY));
/*builder.addNetworkInterceptor(new StethoInterceptor());*/
builder.connectTimeout(30000, TimeUnit.SECONDS);
builder.readTimeout(30000, TimeUnit.SECONDS);
if (token != null) {
Interceptor interceptor = chain -> {
Request newRequest = chain.request().newBuilder()
.addHeader("x-mobile", "true")
.addHeader("Authorization", "Bearer " + token).build();
return chain.proceed(newRequest);
};
builder.addInterceptor(interceptor);
}
OkHttpClient client = builder.build();
Retrofit retrofit = retrofitBuilder.client(client).build();
Log.e(TAG, retrofit.baseUrl().toString());
return retrofit.create(serviceClass);
}
public static Retrofit retrofit() {
OkHttpClient client = builder.build();
return retrofitBuilder.client(client).build();
}
public static class CustomGsonParser {
public static Gson returnCustomParser(){
return new GsonBuilder()
.setExclusionStrategies(new ExclusionStrategy() {
@Override
public boolean shouldSkipField(FieldAttributes f) {
return f.getDeclaringClass().equals(RealmObject.class);
}
@Override
public boolean shouldSkipClass(Class<?> clazz) {
return false;
}
})
.create();
}
}
}
我的请求日志
E/ServiceGen: http://theUrl.net/
D/OkHttp: --> GET http://theUrl.net/lga?page=1&per_page=21 http/1.1
D/OkHttp: x-mobile: true
D/OkHttp: --> END GET
D/OkHttp: --> GET http://theUrl.net/lga?page=1&per_page=21 http/1.1
D/OkHttp: x-mobile: true
D/OkHttp: --> END GET
D/OkHttp: <-- 200 OK http://theUrl.net/lga?page=1&per_page=21 (929ms)
D/OkHttp: Date: Wed, 10 Aug 2016 09:01:00 GMT
D/OkHttp: Content-Type: application/json; charset=utf-8
D/OkHttp: <-- 200 OK http://theUrl.net/lga?page=1&per_page=21 (933ms)
D/OkHttp: Date: Wed, 10 Aug 2016 09:01:00 GMT
D/OkHttp: Content-Type: application/json; charset=utf-8
D/OkHttp: --> GET http://theUrl.net/lga?page=2&per_page=21 http/1.1
D/OkHttp: --> END GET
D/OkHttp: --> GET http://theUrl.net/lga?page=2&per_page=21 http/1.1
D/OkHttp: --> END GET
D/OkHttp: --> GET http://theUrl.net/lga?page=2&per_page=21 http/1.1
D/OkHttp: --> END GET
D/OkHttp: <-- 400 Bad Request http://theUrl.net/lga?page=2&per_page=21 (695ms)
D/OkHttp: <-- END HTTP (177-byte body)
D/OkHttp: <-- 400 Bad Request http://theUrl.net/lga?page=2&per_page=21 (696ms)
D/OkHttp: <-- END HTTP (177-byte body)
D/OkHttp: <-- 400 Bad Request http://theUrl.net/lga?page=2&per_page=21 (696ms)
如果您注意到 http://theUrl.net/lga?page=1&per_page=21
被调用了两次,而 http://theUrl.net/lga?page=3&per_page=21
被调用了 3 次。
所以,我决定使用我的旧 RestClient Class
文件。而且效果很好。但是,没有什么不对的。它 运行 整个请求直到最后。我仍然找不到我的 ServiceGenerator class
有什么问题
RestClientClass文件
public class RestClient {
private static final String TAG = "RestClient";
private static ApiService apiEndpointInterface;
private static Context context;
/*static {
setupRestClient();
}*/
public static ApiService get(Context cont) {
context = cont;
if (apiEndpointInterface != null)
return apiEndpointInterface;
setupRestClient();
return apiEndpointInterface;
}
private static void setupRestClient() {
// Define the interceptor, add authentication headers
Interceptor interceptor = chain -> {
Request newRequest = chain.request().newBuilder()
/*.addHeader("x-mobile", "true")*/
.addHeader("Authorization", "Bearer " + PrefUtils.getToken(context)).build();
return chain.proceed(newRequest);
};
// Add the interceptor to OkHttpClient
OkHttpClient.Builder builder = new OkHttpClient.Builder();
builder.interceptors().add(interceptor);
builder.addInterceptor(new HttpLoggingInterceptor().setLevel(HttpLoggingInterceptor.Level.BODY));
builder.addNetworkInterceptor(new StethoInterceptor());
builder.connectTimeout(30000, TimeUnit.SECONDS);
builder.readTimeout(30000, TimeUnit.SECONDS);
OkHttpClient client = builder.build();
Retrofit retrofit = new Retrofit.Builder()
.baseUrl(BuildConfig.HOST)
.addCallAdapterFactory(RxJavaCallAdapterFactory.createWithScheduler(Schedulers.io()))
.addConverterFactory(GsonConverterFactory.create(CustomGsonParser.returnCustomParser()))
.client(client)
.build();
apiEndpointInterface = retrofit.create(ApiService.class);
}
public static class CustomGsonParser {
public static Gson returnCustomParser(){
return new GsonBuilder()
.setExclusionStrategies(new ExclusionStrategy() {
@Override
public boolean shouldSkipField(FieldAttributes f) {
return f.getDeclaringClass().equals(RealmObject.class);
}
@Override
public boolean shouldSkipClass(Class<?> clazz) {
return false;
}
})
.create();
}
}
}
首先,为什么要为每次调用创建一个新的客户端和服务?创建一次,存储它们并重新使用它们。
其次,我想说你应该使用 BehaviorSubject(对不起 Java 8,但它使逻辑更加明显):
BehaviorSubject<Integer> subject = new BehaviorSubject<>();
Observable<T> obs =
subject
.flatMap(page ->
getPage(page)
.doOnNext(result -> {
if(result has next page) subject.onNext(page+1);
else subject.onComplete();
}), 1)
;
现在您可以 obs
提取对象并执行您想要的操作。
编辑:post-评论,我想尝试这样的事情:
public static Observable<LgaListResponse> getPages(Context acontext) {
String token = PrefUtils.getToken(acontext);
BehaviorSubject<Integer> pageControl = BehaviorSubject.<Integer>create(1);
return pageControl.concatMap(integer -> {
Log.e(TAG, "Integer: " + integer);
return ServiceGenerator.createService(ApiService.class, token)
.getLgas(String.valueOf(integer), String.valueOf(21))
.doOnNext(lgaListResponse -> {
if (lgaListResponse.getMeta().getPage() != lgaListResponse.getMeta().getPageCount()) {
pageControl.onNext(initialPage + 1);
} else {
pageControl.onComplete();
}
});
}).cache();
}
请记住,您应该使用一次 getPages(),并且 return 每个上下文每次都使用相同的 Observable; cache() 可以处理多个订阅者和取消订阅。
我正在尝试发出从服务器拉取数据的请求,直到完成拉取数据。问题是,响应每页有 21 个数据。但是,有一个元标记可以知道是否有下一页。所以,我可以拉到 nextPage == totalPage
.
public static Observable<LgaListResponse> getPages(Context acontext) {
String token = PrefUtils.getToken(acontext);
BehaviorSubject<Integer> pageControl = BehaviorSubject.<Integer>create(1);
Observable<LgaListResponse> ret2 = pageControl.asObservable().concatMap(integer -> {
if (integer > 0) {
Log.e(TAG, "Integer: " + integer);
return ServiceGenerator.createService(ApiService.class, token)
.getLgas(String.valueOf(integer), String.valueOf(21))
.doOnNext(lgaListResponse -> {
if (lgaListResponse.getMeta().getPage() != lgaListResponse.getMeta().getPageCount()) {
pageControl.onNext(initialPage + 1);
} else {
pageControl.onNext(-1);
}
});
} else {
return Observable.<LgaListResponse>empty().doOnCompleted(pageControl::onCompleted);
}
});
return Observable.defer(() -> ret2);
}
还有我的ServiceGenerator
public class ServiceGenerator {
private static final String TAG = "ServiceGen";
private static OkHttpClient.Builder builder = new OkHttpClient.Builder();
private static Retrofit.Builder retrofitBuilder =
new Retrofit.Builder()
.baseUrl(BuildConfig.HOST)
.addCallAdapterFactory(RxJavaCallAdapterFactory.createWithScheduler(Schedulers.io()))
.addConverterFactory(GsonConverterFactory.create(CustomGsonParser.returnCustomParser()));
public static <S> S createService(Class<S> serviceClass, String token) {
builder.addInterceptor(new HttpLoggingInterceptor().setLevel(HttpLoggingInterceptor.Level.BODY));
/*builder.addNetworkInterceptor(new StethoInterceptor());*/
builder.connectTimeout(30000, TimeUnit.SECONDS);
builder.readTimeout(30000, TimeUnit.SECONDS);
if (token != null) {
Interceptor interceptor = chain -> {
Request newRequest = chain.request().newBuilder()
.addHeader("x-mobile", "true")
.addHeader("Authorization", "Bearer " + token).build();
return chain.proceed(newRequest);
};
builder.addInterceptor(interceptor);
}
OkHttpClient client = builder.build();
Retrofit retrofit = retrofitBuilder.client(client).build();
Log.e(TAG, retrofit.baseUrl().toString());
return retrofit.create(serviceClass);
}
public static Retrofit retrofit() {
OkHttpClient client = builder.build();
return retrofitBuilder.client(client).build();
}
public static class CustomGsonParser {
public static Gson returnCustomParser(){
return new GsonBuilder()
.setExclusionStrategies(new ExclusionStrategy() {
@Override
public boolean shouldSkipField(FieldAttributes f) {
return f.getDeclaringClass().equals(RealmObject.class);
}
@Override
public boolean shouldSkipClass(Class<?> clazz) {
return false;
}
})
.create();
}
}
}
我的请求日志
E/ServiceGen: http://theUrl.net/
D/OkHttp: --> GET http://theUrl.net/lga?page=1&per_page=21 http/1.1
D/OkHttp: x-mobile: true
D/OkHttp: --> END GET
D/OkHttp: --> GET http://theUrl.net/lga?page=1&per_page=21 http/1.1
D/OkHttp: x-mobile: true
D/OkHttp: --> END GET
D/OkHttp: <-- 200 OK http://theUrl.net/lga?page=1&per_page=21 (929ms)
D/OkHttp: Date: Wed, 10 Aug 2016 09:01:00 GMT
D/OkHttp: Content-Type: application/json; charset=utf-8
D/OkHttp: <-- 200 OK http://theUrl.net/lga?page=1&per_page=21 (933ms)
D/OkHttp: Date: Wed, 10 Aug 2016 09:01:00 GMT
D/OkHttp: Content-Type: application/json; charset=utf-8
D/OkHttp: --> GET http://theUrl.net/lga?page=2&per_page=21 http/1.1
D/OkHttp: --> END GET
D/OkHttp: --> GET http://theUrl.net/lga?page=2&per_page=21 http/1.1
D/OkHttp: --> END GET
D/OkHttp: --> GET http://theUrl.net/lga?page=2&per_page=21 http/1.1
D/OkHttp: --> END GET
D/OkHttp: <-- 400 Bad Request http://theUrl.net/lga?page=2&per_page=21 (695ms)
D/OkHttp: <-- END HTTP (177-byte body)
D/OkHttp: <-- 400 Bad Request http://theUrl.net/lga?page=2&per_page=21 (696ms)
D/OkHttp: <-- END HTTP (177-byte body)
D/OkHttp: <-- 400 Bad Request http://theUrl.net/lga?page=2&per_page=21 (696ms)
如果您注意到 http://theUrl.net/lga?page=1&per_page=21
被调用了两次,而 http://theUrl.net/lga?page=3&per_page=21
被调用了 3 次。
所以,我决定使用我的旧 RestClient Class
文件。而且效果很好。但是,没有什么不对的。它 运行 整个请求直到最后。我仍然找不到我的 ServiceGenerator class
RestClientClass文件
public class RestClient {
private static final String TAG = "RestClient";
private static ApiService apiEndpointInterface;
private static Context context;
/*static {
setupRestClient();
}*/
public static ApiService get(Context cont) {
context = cont;
if (apiEndpointInterface != null)
return apiEndpointInterface;
setupRestClient();
return apiEndpointInterface;
}
private static void setupRestClient() {
// Define the interceptor, add authentication headers
Interceptor interceptor = chain -> {
Request newRequest = chain.request().newBuilder()
/*.addHeader("x-mobile", "true")*/
.addHeader("Authorization", "Bearer " + PrefUtils.getToken(context)).build();
return chain.proceed(newRequest);
};
// Add the interceptor to OkHttpClient
OkHttpClient.Builder builder = new OkHttpClient.Builder();
builder.interceptors().add(interceptor);
builder.addInterceptor(new HttpLoggingInterceptor().setLevel(HttpLoggingInterceptor.Level.BODY));
builder.addNetworkInterceptor(new StethoInterceptor());
builder.connectTimeout(30000, TimeUnit.SECONDS);
builder.readTimeout(30000, TimeUnit.SECONDS);
OkHttpClient client = builder.build();
Retrofit retrofit = new Retrofit.Builder()
.baseUrl(BuildConfig.HOST)
.addCallAdapterFactory(RxJavaCallAdapterFactory.createWithScheduler(Schedulers.io()))
.addConverterFactory(GsonConverterFactory.create(CustomGsonParser.returnCustomParser()))
.client(client)
.build();
apiEndpointInterface = retrofit.create(ApiService.class);
}
public static class CustomGsonParser {
public static Gson returnCustomParser(){
return new GsonBuilder()
.setExclusionStrategies(new ExclusionStrategy() {
@Override
public boolean shouldSkipField(FieldAttributes f) {
return f.getDeclaringClass().equals(RealmObject.class);
}
@Override
public boolean shouldSkipClass(Class<?> clazz) {
return false;
}
})
.create();
}
}
}
首先,为什么要为每次调用创建一个新的客户端和服务?创建一次,存储它们并重新使用它们。
其次,我想说你应该使用 BehaviorSubject(对不起 Java 8,但它使逻辑更加明显):
BehaviorSubject<Integer> subject = new BehaviorSubject<>();
Observable<T> obs =
subject
.flatMap(page ->
getPage(page)
.doOnNext(result -> {
if(result has next page) subject.onNext(page+1);
else subject.onComplete();
}), 1)
;
现在您可以 obs
提取对象并执行您想要的操作。
编辑:post-评论,我想尝试这样的事情:
public static Observable<LgaListResponse> getPages(Context acontext) {
String token = PrefUtils.getToken(acontext);
BehaviorSubject<Integer> pageControl = BehaviorSubject.<Integer>create(1);
return pageControl.concatMap(integer -> {
Log.e(TAG, "Integer: " + integer);
return ServiceGenerator.createService(ApiService.class, token)
.getLgas(String.valueOf(integer), String.valueOf(21))
.doOnNext(lgaListResponse -> {
if (lgaListResponse.getMeta().getPage() != lgaListResponse.getMeta().getPageCount()) {
pageControl.onNext(initialPage + 1);
} else {
pageControl.onComplete();
}
});
}).cache();
}
请记住,您应该使用一次 getPages(),并且 return 每个上下文每次都使用相同的 Observable; cache() 可以处理多个订阅者和取消订阅。