RxJS,如何使用动态时间戳轮询 API 以持续检查更新的记录
RxJS, how to poll an API to continuously check for updated records using a dynamic timestamp
我是 RxJS 的新手,我正在尝试编写一个应用程序来完成以下事情:
- 加载时,发出 AJAX 请求(为简单起见伪装成
fetchItems()
)以获取项目列表。
- 在那之后的每一秒,发出一个 AJAX 请求来获取项目。
- 检查新项目时,仅应返回在最近时间戳之后更改的项目。
- 在可观察对象之外不应该有任何状态。
我的 first attempt 非常直截了当,实现了目标 1、2 和 4。
var data$ = Rx.Observable.interval(1000)
.startWith('run right away')
.map(function() {
// `fetchItems(modifiedSince)` returns an array of items modified after `modifiedSince`, but
// I am not yet tracking the `modifiedSince` timestamp yet so all items will always be returned
return fetchItems();
});
现在我很兴奋,这很容易,实现目标 3 不会那么难...几个小时后,这是 where I am at:
var modifiedSince = null;
var data$ = Rx.Observable.interval(1000)
.startWith('run right away')
.flatMap(function() {
// `fetchItems(modifiedSince)` returns an array of items modified after `modifiedSince`
return fetchItems(modifiedSince);
})
.do(function(item) {
if(item.updatedAt > modifiedSince) {
modifiedSince = item.updatedAt;
}
})
.scan(function(previous, current) {
previous.push(current);
return previous;
}, []);
这解决了目标 3,但在目标 4 上倒退了。我现在将状态存储在 observable 之外。
我假设全局 modifiedSince
和 .do()
块不是实现此目的的最佳方式。任何指导将不胜感激。
编辑:希望通过这个问题澄清我在寻找什么。
这个怎么样:
var interval = 1000;
function fetchItems() {
return items;
}
var data$ = Rx.Observable.interval(interval)
.map(function() { return fetchItems(); })
.filter(function(x) {return x.lastModified > Date.now() - interval}
.skip(1)
.startWith(fetchItems());
那应该只过滤新项目的来源,再加上完整的 collection。只需编写适合您的数据源的过滤函数即可。
或者通过将参数传递给 fetchItems:
var interval = 1000;
function fetchItems(modifiedSince) {
var retVal = modifiedSince ? items.filter( function(x) {return x.lastModified > modifiedSince}) : items
return retVal;
}
var data$ = Rx.Observable.interval(interval)
.map(function() { return fetchItems(Date.now() - interval); })
.skip(1)
.startWith(fetchItems());
你的意思好像是modifiedSince
是你携带状态的一部分,所以它应该出现在scan
中。为什么 don-t 将 do
中的操作也移到扫描中?。你的种子将是 {modifiedSince: null, itemArray: []}
.
呃,我只是觉得这可能行不通,因为你需要将 modifiedSince
反馈给上游的 fetchItem
函数。你这里没有自行车吗?这意味着你将不得不使用一个主题来打破这个循环。或者,您可以尝试将 modifiedSince
封装在闭包中。像
function pollItems (fetchItems, polling_frequency) {
var modifiedSince = null;
var data$ = Rx.Observable.interval(polling_frequency)
.startWith('run right away')
.flatMap(function() {
// `fetchItems(modifiedSince)` returns an array of items modified after `modifiedSince`
return fetchItems(modifiedSince);
})
.do(function(item) {
if(item.updatedAt > modifiedSince) {
modifiedSince = item.updatedAt;
}
})
.scan(function(previous, current) {
previous.push(current);
return previous;
}, []);
return data$;
}
我要运行出去过年了,如果不行,我可以稍后再试一次(也许使用expand
运算符,其他版本的scan
).
这是另一个不使用闭包或 'external state' 的解决方案。
我做了以下假设:
fetchItems
returns Rx.Observable
个项目,即不是项目数组
它使用 expand
运算符,允许发出遵循类型 x_n+1 = f(x_n)
的递归关系的值。您通过返回一个发出该值的可观察对象来传递 x_n+1
,例如 Rx.Observable.return(x_n+1)
,您可以通过返回 Rx.Observable.empty()
来完成递归。这里似乎没有结束条件,所以这将 运行 永远。
scan
还允许按照递归关系 (x_n+1 = f(x_n, y_n)
) 发出值。不同之处在于 scan
强制您使用同步函数(因此 x_n+1
与 y_n
同步),而对于 expand
您可以使用以下形式的异步函数一个可观察的。
代码没有经过测试,所以不管是否有效,请及时通知我。
相关文档:expand, combineLatest
var modifiedSinceInitValue = // put your date here
var polling_frequency = // put your value here
var initial_state = {modifiedSince: modifiedSinceInitValue, itemArray : []}
function max(property) {
return function (acc, current) {
acc = current[property] > acc ? current[property] : acc;
}
}
var data$ = Rx.Observable.return(initial_state)
.expand (function(state){
return fetchItem(state.modifiedSince)
.toArray()
.combineLatest(Rx.Observable.interval(polling_frequency).take(1),
function (itemArray, _) {
return {
modifiedSince : itemArray.reduce(max('updatedAt'), modifiedSinceInitValue),
itemArray : itemArray
}
}
})
我是 RxJS 的新手,我正在尝试编写一个应用程序来完成以下事情:
- 加载时,发出 AJAX 请求(为简单起见伪装成
fetchItems()
)以获取项目列表。 - 在那之后的每一秒,发出一个 AJAX 请求来获取项目。
- 检查新项目时,仅应返回在最近时间戳之后更改的项目。
- 在可观察对象之外不应该有任何状态。
我的 first attempt 非常直截了当,实现了目标 1、2 和 4。
var data$ = Rx.Observable.interval(1000)
.startWith('run right away')
.map(function() {
// `fetchItems(modifiedSince)` returns an array of items modified after `modifiedSince`, but
// I am not yet tracking the `modifiedSince` timestamp yet so all items will always be returned
return fetchItems();
});
现在我很兴奋,这很容易,实现目标 3 不会那么难...几个小时后,这是 where I am at:
var modifiedSince = null;
var data$ = Rx.Observable.interval(1000)
.startWith('run right away')
.flatMap(function() {
// `fetchItems(modifiedSince)` returns an array of items modified after `modifiedSince`
return fetchItems(modifiedSince);
})
.do(function(item) {
if(item.updatedAt > modifiedSince) {
modifiedSince = item.updatedAt;
}
})
.scan(function(previous, current) {
previous.push(current);
return previous;
}, []);
这解决了目标 3,但在目标 4 上倒退了。我现在将状态存储在 observable 之外。
我假设全局 modifiedSince
和 .do()
块不是实现此目的的最佳方式。任何指导将不胜感激。
编辑:希望通过这个问题澄清我在寻找什么。
这个怎么样:
var interval = 1000;
function fetchItems() {
return items;
}
var data$ = Rx.Observable.interval(interval)
.map(function() { return fetchItems(); })
.filter(function(x) {return x.lastModified > Date.now() - interval}
.skip(1)
.startWith(fetchItems());
那应该只过滤新项目的来源,再加上完整的 collection。只需编写适合您的数据源的过滤函数即可。
或者通过将参数传递给 fetchItems:
var interval = 1000;
function fetchItems(modifiedSince) {
var retVal = modifiedSince ? items.filter( function(x) {return x.lastModified > modifiedSince}) : items
return retVal;
}
var data$ = Rx.Observable.interval(interval)
.map(function() { return fetchItems(Date.now() - interval); })
.skip(1)
.startWith(fetchItems());
你的意思好像是modifiedSince
是你携带状态的一部分,所以它应该出现在scan
中。为什么 don-t 将 do
中的操作也移到扫描中?。你的种子将是 {modifiedSince: null, itemArray: []}
.
呃,我只是觉得这可能行不通,因为你需要将 modifiedSince
反馈给上游的 fetchItem
函数。你这里没有自行车吗?这意味着你将不得不使用一个主题来打破这个循环。或者,您可以尝试将 modifiedSince
封装在闭包中。像
function pollItems (fetchItems, polling_frequency) {
var modifiedSince = null;
var data$ = Rx.Observable.interval(polling_frequency)
.startWith('run right away')
.flatMap(function() {
// `fetchItems(modifiedSince)` returns an array of items modified after `modifiedSince`
return fetchItems(modifiedSince);
})
.do(function(item) {
if(item.updatedAt > modifiedSince) {
modifiedSince = item.updatedAt;
}
})
.scan(function(previous, current) {
previous.push(current);
return previous;
}, []);
return data$;
}
我要运行出去过年了,如果不行,我可以稍后再试一次(也许使用expand
运算符,其他版本的scan
).
这是另一个不使用闭包或 'external state' 的解决方案。
我做了以下假设:
fetchItems
returnsRx.Observable
个项目,即不是项目数组
它使用 expand
运算符,允许发出遵循类型 x_n+1 = f(x_n)
的递归关系的值。您通过返回一个发出该值的可观察对象来传递 x_n+1
,例如 Rx.Observable.return(x_n+1)
,您可以通过返回 Rx.Observable.empty()
来完成递归。这里似乎没有结束条件,所以这将 运行 永远。
scan
还允许按照递归关系 (x_n+1 = f(x_n, y_n)
) 发出值。不同之处在于 scan
强制您使用同步函数(因此 x_n+1
与 y_n
同步),而对于 expand
您可以使用以下形式的异步函数一个可观察的。
代码没有经过测试,所以不管是否有效,请及时通知我。
相关文档:expand, combineLatest
var modifiedSinceInitValue = // put your date here
var polling_frequency = // put your value here
var initial_state = {modifiedSince: modifiedSinceInitValue, itemArray : []}
function max(property) {
return function (acc, current) {
acc = current[property] > acc ? current[property] : acc;
}
}
var data$ = Rx.Observable.return(initial_state)
.expand (function(state){
return fetchItem(state.modifiedSince)
.toArray()
.combineLatest(Rx.Observable.interval(polling_frequency).take(1),
function (itemArray, _) {
return {
modifiedSince : itemArray.reduce(max('updatedAt'), modifiedSinceInitValue),
itemArray : itemArray
}
}
})