RxNet 分页
RxNet Pagination
更新:签名和类型定义
GetChecklistStringParents
IObservable<ChecklistStringParents_Response> GetChecklistStringParents(int company_id, string auth_token, string q = null, int page = 1)
代表页面的顶级响应
public class ChecklistStringParents_Response
{
//the content of the page a list of string parents
public List<Resp_ChecklistStringParent> BODY { get; set; }
public Resp_Meta META { get; set; }
public List<object> ERRORS { get; set; }
}
对单个字符串 parent
的响应 class
public class Resp_ChecklistStringParent
{
public int checklist_string_type { get; set; }
public string client_date_created { get; set; }
public string uploaded_date { get; set; }
public string last_modified { get; set; }
public string value { get; set; }
public int id { get; set; }
}
我正在尝试响应式 (RxNet) 访问我构建的 REST API,但是我对范例很陌生,有点卡住了。
以下函数逐页从端点请求数据。
使用 Observable.DoWhile
和 Observable.Defer
以便为每个页面创建一个新的可观察对象,并且我们不断创建它们直到我们收到一个空列表
作为页面响应的主体。
我正在订阅由 Observable.DoWhile
编辑的可观察对象 return 以更新页面并更新结果计数。这感觉不对,但我还没有看到替代方案。
我的问题是,这是在 RxNet 中对结果进行分页的最佳方式吗?此外,我真的很想获得结果流,即将每个页面的内容展平为单个可观察对象,我可以从这个函数中 return,但我不知道如何实现这一点。
private void FetchStringParents(int checklist_id)
{
/*Setup the filter to the endpoint such that string parents returned are those associated with the passed checklist*/
Filter base_filter = new Filter("checklist_id", "eq", checklist_id.ToString());
NestedFilter nested_filter = new NestedFilter("checklists", "any", base_filter);
Filters filt = new Filters();
filt.filters.Add(nested_filter);
string sp_query_json = JsonConvert.SerializeObject(filt);
int result_count = 0;
int page = 1;
var paged_observable = Observable.DoWhile(
//At least once create an observable on the api endpoint
Observable.Defer(() => NetworkService_ChecklistStringParentService.GetInstance.GetChecklistStringParents(6,
this.cached_auth.Auth_Token, sp_query_json, page)),
/*boolean function which determines if we should hit the api again (if we got back a non empty result the last time)*/
() => result_count > 0)
.Subscribe(
st =>
{
//on successful receipt of a page
Debug.WriteLine("Success");
page++;//update page so that the next observable created is created on page++
result_count = st.BODY.Count;//update result_count (we will stop listening on a count of 0)
},
_e =>
{
Debug.WriteLine("Fail");
});
}
更新:解决方案
private IObservable<Resp_ChecklistStringParent> StreamParents(int checklist_id)
{
Filter base_filter = new Filter("checklist_id", "eq", checklist_id.ToString());
NestedFilter nested_filter = new NestedFilter("checklists", "any", base_filter);
Filters filt = new Filters();
filt.filters.Add(nested_filter);
string sp_query_json = JsonConvert.SerializeObject(filt);
return Observable.Create<List<Resp_ChecklistStringParent>>(async (obs, ct) =>
{
int pageIdx = 1;
//for testing page size is set to 1 on server
int pageSize = 1;
while (!ct.IsCancellationRequested)
{
//Pass in cancellation token here?
var page = await NetworkService_ChecklistStringParentService.GetInstance.GetChecklistStringParents(6,
this.cached_auth.Auth_Token, sp_query_json, pageIdx++);
obs.OnNext(page.BODY);
if (page.BODY.Count < pageSize)
{
obs.OnCompleted();
break;
}
}
})
.SelectMany(page => page);
}
这是一个示例,展示了如何使用 IEnumerable<T>
和 IObservable<T>
流式传输数据。
IEnumerable<T>
示例用于上下文和比较。
我没有使用你的数据类型,因为我认为它与问题无关(希望没问题)。
Linqpad 脚本
void Main()
{
var enumerableItems = GetItemsSync();
foreach (var element in enumerableItems)
{
Console.WriteLine(element);
}
Console.WriteLine("Received all synchronous items");
StreamItems().Subscribe(
element => Console.WriteLine(element),
() => Console.WriteLine("Received all asynchronous items"));
}
// Define other methods and classes here
public IObservable<string> StreamItems()
{
return Observable.Create<string[]>(async (obs, ct) =>
{
var pageIdx = 0;
var pageSize = 3;
while (!ct.IsCancellationRequested)
{
//Pass in cancellation token here?
var page = await GetPageAsync(pageIdx++, pageSize);
obs.OnNext(page);
if (page.Length < pageSize)
{
obs.OnCompleted();
break;
}
}
})
.SelectMany(page => page);
}
public IEnumerable<string> GetItemsSync()
{
return GetPagesSync().SelectMany(page => page);
}
public IEnumerable<string[]> GetPagesSync()
{
var i = 0;
var pageSize = 3;
while (true)
{
var page = GetPageSync(i++, pageSize);
yield return page;
if (page.Length < pageSize)
yield break;
}
}
private static string[] _fakeData = new string[]{
"one",
"two",
"three",
"four",
"Five",
"six",
"Se7en",
"Eight"
};
public string[] GetPageSync(int pageIndex, int pageSize)
{
var idx = pageSize * pageIndex;
var bufferSize = Math.Min(pageSize, _fakeData.Length-idx);
var buffer = new string[bufferSize];
Array.Copy(_fakeData, idx, buffer, 0, bufferSize);
return buffer;
}
public Task<string[]> GetPageAsync(int pageIndex, int pageSize)
{
//Just to emulate an Async method (like a web request).
return Task.FromResult(GetPageSync(pageIndex, pageSize));
}
更新:签名和类型定义
GetChecklistStringParents
IObservable<ChecklistStringParents_Response> GetChecklistStringParents(int company_id, string auth_token, string q = null, int page = 1)
代表页面的顶级响应
public class ChecklistStringParents_Response
{
//the content of the page a list of string parents
public List<Resp_ChecklistStringParent> BODY { get; set; }
public Resp_Meta META { get; set; }
public List<object> ERRORS { get; set; }
}
对单个字符串 parent
的响应 classpublic class Resp_ChecklistStringParent
{
public int checklist_string_type { get; set; }
public string client_date_created { get; set; }
public string uploaded_date { get; set; }
public string last_modified { get; set; }
public string value { get; set; }
public int id { get; set; }
}
我正在尝试响应式 (RxNet) 访问我构建的 REST API,但是我对范例很陌生,有点卡住了。
以下函数逐页从端点请求数据。
使用Observable.DoWhile
和 Observable.Defer
以便为每个页面创建一个新的可观察对象,并且我们不断创建它们直到我们收到一个空列表
作为页面响应的主体。
我正在订阅由 Observable.DoWhile
编辑的可观察对象 return 以更新页面并更新结果计数。这感觉不对,但我还没有看到替代方案。
我的问题是,这是在 RxNet 中对结果进行分页的最佳方式吗?此外,我真的很想获得结果流,即将每个页面的内容展平为单个可观察对象,我可以从这个函数中 return,但我不知道如何实现这一点。
private void FetchStringParents(int checklist_id)
{
/*Setup the filter to the endpoint such that string parents returned are those associated with the passed checklist*/
Filter base_filter = new Filter("checklist_id", "eq", checklist_id.ToString());
NestedFilter nested_filter = new NestedFilter("checklists", "any", base_filter);
Filters filt = new Filters();
filt.filters.Add(nested_filter);
string sp_query_json = JsonConvert.SerializeObject(filt);
int result_count = 0;
int page = 1;
var paged_observable = Observable.DoWhile(
//At least once create an observable on the api endpoint
Observable.Defer(() => NetworkService_ChecklistStringParentService.GetInstance.GetChecklistStringParents(6,
this.cached_auth.Auth_Token, sp_query_json, page)),
/*boolean function which determines if we should hit the api again (if we got back a non empty result the last time)*/
() => result_count > 0)
.Subscribe(
st =>
{
//on successful receipt of a page
Debug.WriteLine("Success");
page++;//update page so that the next observable created is created on page++
result_count = st.BODY.Count;//update result_count (we will stop listening on a count of 0)
},
_e =>
{
Debug.WriteLine("Fail");
});
}
更新:解决方案
private IObservable<Resp_ChecklistStringParent> StreamParents(int checklist_id)
{
Filter base_filter = new Filter("checklist_id", "eq", checklist_id.ToString());
NestedFilter nested_filter = new NestedFilter("checklists", "any", base_filter);
Filters filt = new Filters();
filt.filters.Add(nested_filter);
string sp_query_json = JsonConvert.SerializeObject(filt);
return Observable.Create<List<Resp_ChecklistStringParent>>(async (obs, ct) =>
{
int pageIdx = 1;
//for testing page size is set to 1 on server
int pageSize = 1;
while (!ct.IsCancellationRequested)
{
//Pass in cancellation token here?
var page = await NetworkService_ChecklistStringParentService.GetInstance.GetChecklistStringParents(6,
this.cached_auth.Auth_Token, sp_query_json, pageIdx++);
obs.OnNext(page.BODY);
if (page.BODY.Count < pageSize)
{
obs.OnCompleted();
break;
}
}
})
.SelectMany(page => page);
}
这是一个示例,展示了如何使用 IEnumerable<T>
和 IObservable<T>
流式传输数据。
IEnumerable<T>
示例用于上下文和比较。
我没有使用你的数据类型,因为我认为它与问题无关(希望没问题)。
Linqpad 脚本
void Main()
{
var enumerableItems = GetItemsSync();
foreach (var element in enumerableItems)
{
Console.WriteLine(element);
}
Console.WriteLine("Received all synchronous items");
StreamItems().Subscribe(
element => Console.WriteLine(element),
() => Console.WriteLine("Received all asynchronous items"));
}
// Define other methods and classes here
public IObservable<string> StreamItems()
{
return Observable.Create<string[]>(async (obs, ct) =>
{
var pageIdx = 0;
var pageSize = 3;
while (!ct.IsCancellationRequested)
{
//Pass in cancellation token here?
var page = await GetPageAsync(pageIdx++, pageSize);
obs.OnNext(page);
if (page.Length < pageSize)
{
obs.OnCompleted();
break;
}
}
})
.SelectMany(page => page);
}
public IEnumerable<string> GetItemsSync()
{
return GetPagesSync().SelectMany(page => page);
}
public IEnumerable<string[]> GetPagesSync()
{
var i = 0;
var pageSize = 3;
while (true)
{
var page = GetPageSync(i++, pageSize);
yield return page;
if (page.Length < pageSize)
yield break;
}
}
private static string[] _fakeData = new string[]{
"one",
"two",
"three",
"four",
"Five",
"six",
"Se7en",
"Eight"
};
public string[] GetPageSync(int pageIndex, int pageSize)
{
var idx = pageSize * pageIndex;
var bufferSize = Math.Min(pageSize, _fakeData.Length-idx);
var buffer = new string[bufferSize];
Array.Copy(_fakeData, idx, buffer, 0, bufferSize);
return buffer;
}
public Task<string[]> GetPageAsync(int pageIndex, int pageSize)
{
//Just to emulate an Async method (like a web request).
return Task.FromResult(GetPageSync(pageIndex, pageSize));
}