SqlBrite/SqlDelight(离线数据库)和 Retrofit(Http 请求)的存储库模式
Repository pattern with SqlBrite/SqlDelight(Offline database) and Retrofit(Http request)
我正在 RxJava 中实现存储库模式,使用 SqlBrite/SqlDelight 进行离线数据存储和 Http 请求改造
这是一个示例:
protected Observable<List<Item>> getItemsFromDb() {
return database.createQuery(tableName(), selectAllStatement())
.mapToList(cursor -> selectAllMapper().map(cursor));
}
public Observable<List<Item>>getItems(){
Observable<List<Item>> server = getRequest()
.doOnNext(items -> {
BriteDatabase.Transaction transaction = database.newTransaction();
for (Item item : items){
database.insert(tableName(), contentValues(item));
}
transaction.markSuccessful();
transaction.end();
})
.flatMap(items -> getItemsFromDbById())
.delaySubscription(200, TimeUnit.MILLISECONDS);
Observable<List<Item>> db = getItemsFromDbById(id)
.filter(items -> items != null && items.size() > 0);
return Observable.amb(db, server).doOnSubscribe(() -> server.subscribe(items -> {}, throwable -> {}));
}
当前实现使用 Observable.amb
获取 2 个流中的最新流,并使用 returns db
流以防 db
有数据或服务器。为了防止在没有互联网的情况下早期失败,server
上有一个 delaySubscription
和 200ms
.
我尝试使用 Observable.concat
但 SqlBrite 流从不调用 onComplete
所以 server
observable 永远不会被触发。
我也试过 Observable.combineLatest
没有用,因为它一直在等待 server
可观察到 return 数据,然后再发出任何东西, Observable.switchOnNext
也没有用.
我正在寻找的是一个存储库:
- 保持对 SqlBrite (DB) 的订阅打开,以防数据库更新
- 始终从服务器获取数据并将其写入数据库
- 如果数据库中没有任何内容并且网络请求仍在继续,则不应发出空结果。这是因为用户应该在第一次加载的情况下看到进度条。
你的代码直接与你想做的事情相矛盾。这一行:
Observable<List<Item>> db = getItemsFromDbById(id)
.filter(items -> items != null && items.size() > 0);
自相矛盾,因为您 return 单个数据库查询的项目并将其命名为 db - 就好像数据库(或其引用)本身一样。从这一点可以看出,您提供的代码无能为力。
存储库模式有许多 java 个模板可用。例如:
https://www.bignerdranch.com/blog/the-rxjava-repository-pattern/
如果这没有足够的帮助,请尝试提供至少与您所描述的内容相差无几的代码。
这就是解决上述问题的方法,即从 2 个来源(本地和远程)获取数据并仅在需要时向 UI 发送更新。
数据class包装了你的数据,也存储了数据的来源
class Data<T> {
static final int STATE_LOCAL = 0;
static final int STATE_SERVER = 1;
private T data;
private int state;
Data(T data, int state) {
this.data = data;
this.state = state;
}
public int getState() { return state; }
public T getData() { return data; }
}
...
public Observable<Model> getData(long id) {
// Used to cache data and compare it with server data, so we can avoid unnecessary UI updates
Subject<Data<Model>> publishSubject = BehaviorSubject.create();
publishSubject.onNext(new Data<>(null, Data.STATE_LOCAL));
Observable<Data<Model>> server = getRequest()
.map(items -> new Data<>(items, Data.STATE_SERVER))
// Here we are combining data from server and our `BehaviorSubject`
// If any one has ideas how to do this without the subject, I'll be glad to hear it.
.flatMap(items -> Observable.zip(publishSubject.take(1), Observable.just(items), Pair::new))
.flatMap(oldNewPair -> {
// Here we are comparing old and new data to see if there was any new data returned from server
Data<Model> prevData = oldNewPair.first;
Data<Model> newData = oldNewPair.second;
//Could be any condition to compare the old and new data
if (prevData.data != null && prevData.data.updated_at() == newData.data.updated_at())
return Observable.just(prevData);
else
return database.insert(tableName(), contentValues(newData));
return getFromDb(id)
.map(item -> new Data<>(item, Data.STATE_LOCAL))
.onErrorResumeNext(server)
.doOnNext(item -> {
publishSubject.onNext(item);
if (item.getState() == Data.STATE_LOCAL)
server.subscribeOn(Schedulers.io()).observeOn(Schedulers.io()).subscribe();
})
.map(item -> item.data);
}
此解决方案未使用 amb
,而是使用解决了以下问题的 BehaviorSubject:
没有使用delaySubscription
(之前用于防止在没有网络的情况下早期失败。)
之前每次调用两次服务器,本例解决
我正在 RxJava 中实现存储库模式,使用 SqlBrite/SqlDelight 进行离线数据存储和 Http 请求改造
这是一个示例:
protected Observable<List<Item>> getItemsFromDb() {
return database.createQuery(tableName(), selectAllStatement())
.mapToList(cursor -> selectAllMapper().map(cursor));
}
public Observable<List<Item>>getItems(){
Observable<List<Item>> server = getRequest()
.doOnNext(items -> {
BriteDatabase.Transaction transaction = database.newTransaction();
for (Item item : items){
database.insert(tableName(), contentValues(item));
}
transaction.markSuccessful();
transaction.end();
})
.flatMap(items -> getItemsFromDbById())
.delaySubscription(200, TimeUnit.MILLISECONDS);
Observable<List<Item>> db = getItemsFromDbById(id)
.filter(items -> items != null && items.size() > 0);
return Observable.amb(db, server).doOnSubscribe(() -> server.subscribe(items -> {}, throwable -> {}));
}
当前实现使用 Observable.amb
获取 2 个流中的最新流,并使用 returns db
流以防 db
有数据或服务器。为了防止在没有互联网的情况下早期失败,server
上有一个 delaySubscription
和 200ms
.
我尝试使用 Observable.concat
但 SqlBrite 流从不调用 onComplete
所以 server
observable 永远不会被触发。
我也试过 Observable.combineLatest
没有用,因为它一直在等待 server
可观察到 return 数据,然后再发出任何东西, Observable.switchOnNext
也没有用.
我正在寻找的是一个存储库:
- 保持对 SqlBrite (DB) 的订阅打开,以防数据库更新
- 始终从服务器获取数据并将其写入数据库
- 如果数据库中没有任何内容并且网络请求仍在继续,则不应发出空结果。这是因为用户应该在第一次加载的情况下看到进度条。
你的代码直接与你想做的事情相矛盾。这一行:
Observable<List<Item>> db = getItemsFromDbById(id)
.filter(items -> items != null && items.size() > 0);
自相矛盾,因为您 return 单个数据库查询的项目并将其命名为 db - 就好像数据库(或其引用)本身一样。从这一点可以看出,您提供的代码无能为力。
存储库模式有许多 java 个模板可用。例如: https://www.bignerdranch.com/blog/the-rxjava-repository-pattern/
如果这没有足够的帮助,请尝试提供至少与您所描述的内容相差无几的代码。
这就是解决上述问题的方法,即从 2 个来源(本地和远程)获取数据并仅在需要时向 UI 发送更新。
数据class包装了你的数据,也存储了数据的来源
class Data<T> {
static final int STATE_LOCAL = 0;
static final int STATE_SERVER = 1;
private T data;
private int state;
Data(T data, int state) {
this.data = data;
this.state = state;
}
public int getState() { return state; }
public T getData() { return data; }
}
...
public Observable<Model> getData(long id) {
// Used to cache data and compare it with server data, so we can avoid unnecessary UI updates
Subject<Data<Model>> publishSubject = BehaviorSubject.create();
publishSubject.onNext(new Data<>(null, Data.STATE_LOCAL));
Observable<Data<Model>> server = getRequest()
.map(items -> new Data<>(items, Data.STATE_SERVER))
// Here we are combining data from server and our `BehaviorSubject`
// If any one has ideas how to do this without the subject, I'll be glad to hear it.
.flatMap(items -> Observable.zip(publishSubject.take(1), Observable.just(items), Pair::new))
.flatMap(oldNewPair -> {
// Here we are comparing old and new data to see if there was any new data returned from server
Data<Model> prevData = oldNewPair.first;
Data<Model> newData = oldNewPair.second;
//Could be any condition to compare the old and new data
if (prevData.data != null && prevData.data.updated_at() == newData.data.updated_at())
return Observable.just(prevData);
else
return database.insert(tableName(), contentValues(newData));
return getFromDb(id)
.map(item -> new Data<>(item, Data.STATE_LOCAL))
.onErrorResumeNext(server)
.doOnNext(item -> {
publishSubject.onNext(item);
if (item.getState() == Data.STATE_LOCAL)
server.subscribeOn(Schedulers.io()).observeOn(Schedulers.io()).subscribe();
})
.map(item -> item.data);
}
此解决方案未使用 amb
,而是使用解决了以下问题的 BehaviorSubject:
没有使用
delaySubscription
(之前用于防止在没有网络的情况下早期失败。)之前每次调用两次服务器,本例解决