Flowable 来自 Cache 和其他 Flowable for DataSource 使用 RxJava
Flowable from Cache and other Flowable for DataSource using RxJava
我是 RxJava 的新手,我需要创建包含多个数据源的存储库。这对我来说很复杂,因为有几个较小的子任务我不知道如何用 RxJava 实现。
我有 Dao,它在某个范围内向 DataSource class 提供 Flowable<Item>
。该数据源有本地缓存,可以随时失效。当存储库向 DataSource 询问某个范围时(可能超出 DataSourse 范围,边界在完全缓存之前是未知的)它必须产生错误(或以其他方式通知 Repository)。
我想为 DataSource 创建 Flowable<Item>
方法,它将从缓存中发出项目,如果需要,将它们与 Flowable<Item> dao.getRange(...)
连接起来,同时缓存来自 dao 的新项目。我还需要处理来自 dao 的错误,它们必须被处理或转换为更高级别的错误。
DataSource.class
:
List<Item> cache;
Flowable<Item> getRange(int start, int amount) {
final int cacheSize = cache.size();
final int canLoadFromCache = cacheSize - start;
final int loadFromDao = amount - canLoadFromCache;
if (isCorrupted) return Flowable.fromCallable(() -> {
throw new Exception("CorruptedDatasource");
});
Flowable<Item> cacheFlow = null;
Flowable<Item> daoFlow = null;
if (canLoadFromCache > 0) {
cacheFlow = Flowable.fromIterable(
cache.subList(start, canLoadFromCache)
);
daoFlow = dao.getRange(
uri,
cacheSize, //start
loadFromDao //amount
);
} else {
if (isFullyCached) return Flowable.fromCallable(() -> {
throw new Exception("OutOfBounds");
});
//To not deal with gaps load and cache data between;
//Or replace it with data structure which can handle for us;
daoFlow = dao.getRange(
uri,
cacheSize,
start - cacheSize + amount);
//all these items should be cached;
//other cached and put downstream;
//Dao errs should be converted to higher lever exceptions,
//Or set flags in DataSource;
}
// return concatenated flowable;
}
在更高级别的存储库连接来自多个数据源的数据,因此必须有一种方法来连接来自多个来源的范围,如果一个来源还不够,则应该添加下一个来源的范围。
请帮帮我!
尝试 concat
或 concatEager
连接两个可观察值。另外 doOnNext()
或 doOnError()
可以帮助您缓存和错误处理
List<Item> cache;
Flowable<Item> getRange(int start, int amount) {
...
if (isFullyCached) return Flowable.fromCallable(() -> {
throw new Exception("OutOfBounds");
});
//To not deal with gaps load and cache data between;
//Or replace it with data structure which can handle for us;
daoFlow = dao.getRange(
uri,
cacheSize,
start - cacheSize + amount);
//all these items should be cached;
//other cached and put downstream;
.doOnNext(result -> /* insert caching logic here */)
//Dao errs should be converted to higher lever exceptions,
//Or set flags in DataSource;
.doOnError(error -> /* handle error here */)
.onErrorReturn(/* and/or return some empty item */)
}
// return concatenated flowable;
return cacheFlow.concat(daoFlow);
}
我是 RxJava 的新手,我需要创建包含多个数据源的存储库。这对我来说很复杂,因为有几个较小的子任务我不知道如何用 RxJava 实现。
我有 Dao,它在某个范围内向 DataSource class 提供 Flowable<Item>
。该数据源有本地缓存,可以随时失效。当存储库向 DataSource 询问某个范围时(可能超出 DataSourse 范围,边界在完全缓存之前是未知的)它必须产生错误(或以其他方式通知 Repository)。
我想为 DataSource 创建 Flowable<Item>
方法,它将从缓存中发出项目,如果需要,将它们与 Flowable<Item> dao.getRange(...)
连接起来,同时缓存来自 dao 的新项目。我还需要处理来自 dao 的错误,它们必须被处理或转换为更高级别的错误。
DataSource.class
:
List<Item> cache;
Flowable<Item> getRange(int start, int amount) {
final int cacheSize = cache.size();
final int canLoadFromCache = cacheSize - start;
final int loadFromDao = amount - canLoadFromCache;
if (isCorrupted) return Flowable.fromCallable(() -> {
throw new Exception("CorruptedDatasource");
});
Flowable<Item> cacheFlow = null;
Flowable<Item> daoFlow = null;
if (canLoadFromCache > 0) {
cacheFlow = Flowable.fromIterable(
cache.subList(start, canLoadFromCache)
);
daoFlow = dao.getRange(
uri,
cacheSize, //start
loadFromDao //amount
);
} else {
if (isFullyCached) return Flowable.fromCallable(() -> {
throw new Exception("OutOfBounds");
});
//To not deal with gaps load and cache data between;
//Or replace it with data structure which can handle for us;
daoFlow = dao.getRange(
uri,
cacheSize,
start - cacheSize + amount);
//all these items should be cached;
//other cached and put downstream;
//Dao errs should be converted to higher lever exceptions,
//Or set flags in DataSource;
}
// return concatenated flowable;
}
在更高级别的存储库连接来自多个数据源的数据,因此必须有一种方法来连接来自多个来源的范围,如果一个来源还不够,则应该添加下一个来源的范围。
请帮帮我!
尝试 concat
或 concatEager
连接两个可观察值。另外 doOnNext()
或 doOnError()
可以帮助您缓存和错误处理
List<Item> cache;
Flowable<Item> getRange(int start, int amount) {
...
if (isFullyCached) return Flowable.fromCallable(() -> {
throw new Exception("OutOfBounds");
});
//To not deal with gaps load and cache data between;
//Or replace it with data structure which can handle for us;
daoFlow = dao.getRange(
uri,
cacheSize,
start - cacheSize + amount);
//all these items should be cached;
//other cached and put downstream;
.doOnNext(result -> /* insert caching logic here */)
//Dao errs should be converted to higher lever exceptions,
//Or set flags in DataSource;
.doOnError(error -> /* handle error here */)
.onErrorReturn(/* and/or return some empty item */)
}
// return concatenated flowable;
return cacheFlow.concat(daoFlow);
}