Rx DotNet - 使用线程链接可观察对象
Rx DotNet - Chaining observables with threading
我正在尝试在我的 Xamarin 移动应用程序中引入 Rx,我想在应用程序启动时的登录阶段链接一系列调用。
TL;DR; How do I run 2 / 3 observables one after the other to retrieve data and setup threading properly.
更详细的我执行以下操作:
- 登录用户并检索相关数据
LoginUser() : IObservable<User>
- 根据用户输入的 ID 为
RetrieveRemoteA() : IObservable<A>
或 RetrieveRemoteB() : IObservable<B>
- 一旦数据可用(
A
或 B
类型),我就会更新 UI。
这是一种描述流程的图表(上面已解释):
考虑到我想避免从前一个 Subscribe()
中调用新的可观察对象(源),这就是我检索数据并更新 UI (图像中的流程已在下面的代码中序列化。
IObservable<User> loginUserObservable = LogInUser(currentUser);
loginUserObservable
.SubscribeOn(ThreadPoolScheduler.Instance)
.SelectMany(
(user) =>
{
if (user.Type == UserType.A)
return RetrieveRemoteA(user.UserId); // outputs IObservable<A>
return Observable.Return(new A());
},
(user, a) =>
{
B b = null;
return new { user, a, b }; // Create anonymous type to keet track of 'user'
})
.SelectMany(
(xType) =>
{
if (xType.user.Type == UserType.B)
return RetrieveRemoteB(xType.user.UserId); // outputs IObservable<B>
return Observable.Return(new B());
},
(xType, bData) =>
{
var user = xType.user;
var a = xType.a;
var b = b;
return new { user, a, b };
})
.ObserveOn(ImmediateScheduler.Instance)
.Select((xType) =>
{
if (xType.user.Type == UserType.A)
{
A a = xType.a;
B b = null;
return new { a, b };
}
else {
A a = null;
B b = xType.b;
return new { a, b };
}
})
.Subscribe((result) =>
{
if (result.a != null)
{
Console.WriteLine($"ID: {result.a.Id}");
}
else {
Console.WriteLine($"ID: {result.b.Id}");
}
});
一旦 运行,流程似乎在 RetrieveRemoteA(user.UserId)
内堆叠,即使方法完成时没有错误。
public IObservable<A> RetrieveRemoteA(string userId)
{
return Observable.FromAsync<A>(async () =>
{
A a = await CustomAPI(userId)
return a;
}
}
当我通过调用前一个 Subscribe 中的每个新 observable 顺序实现流程时,它工作正常(但这不是正确的方法)。
我认为这是 Threads 的问题,或者是我错误的 Rx 实现。
请问有什么线索吗?
我认为您可以将其折叠成如下所示。 Reactive.Extensions 可以将异步方法视为单个 return 可观察对象,因此无需使用 RetrieveRemoteA
方法包装异步调用。我们可以在 SelectMany
中利用该行为,让它调用 API 并检索您的用户信息并等待响应,然后再通知链的更下方。
对于线程,我建议您查看添加 ReactiveUI into your app and using their schedulers。例如,ImmediateScheduler.Instance
不会保证您在订阅中执行的代码会在 UI 线程上得到通知。
IObservable<User> loginUserObservable = LogInUser(currentUser);
loginUserObservable
.SubscribeOn(ThreadPoolScheduler.Instance)
.SelectMany(
async user =>
{
A userA = null;
if (user.Type == UserType.A)
userA = await CustomAPI(user.UserId).ConfigureAwait(false);
B userB = null;
if (user.Type == UserType.B)
userB = await CustomAPI(user.UserId).ConfigureAwait(false);
return new { UserA = userA, UserB = userB }
})
.ObserveOn(ImmediateScheduler.Instance)
.Subscribe((result) =>
{
if (result.a != null)
{
Console.WriteLine($"ID: {result.UserA.Id}");
}
else {
Console.WriteLine($"ID: {result.UserB.Id}");
}
});
使用 Observable.Amb
运算符可以很容易地完成您想要做的事情。它的工作是允许两个 observables 运行,但是一旦一个 returns 一个值然后忽略另一个。
查询的编写方法如下:
var query =
from user in loginUserObservable
let a_observable = user.Type == UserType.A ? RetrieveRemoteA(user.UserId) : Observable.Never<A>()
let b_observable = user.Type == UserType.B ? RetrieveRemoteB(user.UserId) : Observable.Never<B>()
from ab in
Observable
.Amb(
a_observable.Select(x => new { a = x, b = (B)null }),
b_observable.Select(x => new { a = (A)null, b = x }))
select ab;
就是这样。
根据这个示例数据,您可以玩一玩:
var currentUser = "";
IObservable<User> LogInUser(string cu) => Observable.Start(() => new User() { Type = UserType.A, UserId = "Z1" });
IObservable<A> RetrieveRemoteA(string id) => Observable.Start(() => new A() { Id = "A.Z2" });
IObservable<B> RetrieveRemoteB(string id) => Observable.Start(() => new B() { Id = "B.Z3" });
IObservable<User> loginUserObservable = LogInUser(currentUser);
/* put `query` here */
query
.Subscribe(x =>
Console.WriteLine($@"ID: {(x.a != null ? x.a.Id : x.b.Id)}"));
或者,您可以通过执行以下简单查询完全避免 Amb
运算符:
var query =
from user in loginUserObservable
from ab in
user.Type == UserType.A
? RetrieveRemoteA(user.UserId).Select(x => new { a = x, b = (B)null })
: RetrieveRemoteB(user.UserId).Select(x => new { a = (A)null, b = x })
select ab;
我正在尝试在我的 Xamarin 移动应用程序中引入 Rx,我想在应用程序启动时的登录阶段链接一系列调用。
TL;DR; How do I run 2 / 3 observables one after the other to retrieve data and setup threading properly.
更详细的我执行以下操作:
- 登录用户并检索相关数据
LoginUser() : IObservable<User>
- 根据用户输入的 ID 为
RetrieveRemoteA() : IObservable<A>
或RetrieveRemoteB() : IObservable<B>
- 一旦数据可用(
A
或B
类型),我就会更新 UI。
这是一种描述流程的图表(上面已解释):
考虑到我想避免从前一个 Subscribe()
中调用新的可观察对象(源),这就是我检索数据并更新 UI (图像中的流程已在下面的代码中序列化。
IObservable<User> loginUserObservable = LogInUser(currentUser);
loginUserObservable
.SubscribeOn(ThreadPoolScheduler.Instance)
.SelectMany(
(user) =>
{
if (user.Type == UserType.A)
return RetrieveRemoteA(user.UserId); // outputs IObservable<A>
return Observable.Return(new A());
},
(user, a) =>
{
B b = null;
return new { user, a, b }; // Create anonymous type to keet track of 'user'
})
.SelectMany(
(xType) =>
{
if (xType.user.Type == UserType.B)
return RetrieveRemoteB(xType.user.UserId); // outputs IObservable<B>
return Observable.Return(new B());
},
(xType, bData) =>
{
var user = xType.user;
var a = xType.a;
var b = b;
return new { user, a, b };
})
.ObserveOn(ImmediateScheduler.Instance)
.Select((xType) =>
{
if (xType.user.Type == UserType.A)
{
A a = xType.a;
B b = null;
return new { a, b };
}
else {
A a = null;
B b = xType.b;
return new { a, b };
}
})
.Subscribe((result) =>
{
if (result.a != null)
{
Console.WriteLine($"ID: {result.a.Id}");
}
else {
Console.WriteLine($"ID: {result.b.Id}");
}
});
一旦 运行,流程似乎在 RetrieveRemoteA(user.UserId)
内堆叠,即使方法完成时没有错误。
public IObservable<A> RetrieveRemoteA(string userId)
{
return Observable.FromAsync<A>(async () =>
{
A a = await CustomAPI(userId)
return a;
}
}
当我通过调用前一个 Subscribe 中的每个新 observable 顺序实现流程时,它工作正常(但这不是正确的方法)。
我认为这是 Threads 的问题,或者是我错误的 Rx 实现。
请问有什么线索吗?
我认为您可以将其折叠成如下所示。 Reactive.Extensions 可以将异步方法视为单个 return 可观察对象,因此无需使用 RetrieveRemoteA
方法包装异步调用。我们可以在 SelectMany
中利用该行为,让它调用 API 并检索您的用户信息并等待响应,然后再通知链的更下方。
对于线程,我建议您查看添加 ReactiveUI into your app and using their schedulers。例如,ImmediateScheduler.Instance
不会保证您在订阅中执行的代码会在 UI 线程上得到通知。
IObservable<User> loginUserObservable = LogInUser(currentUser);
loginUserObservable
.SubscribeOn(ThreadPoolScheduler.Instance)
.SelectMany(
async user =>
{
A userA = null;
if (user.Type == UserType.A)
userA = await CustomAPI(user.UserId).ConfigureAwait(false);
B userB = null;
if (user.Type == UserType.B)
userB = await CustomAPI(user.UserId).ConfigureAwait(false);
return new { UserA = userA, UserB = userB }
})
.ObserveOn(ImmediateScheduler.Instance)
.Subscribe((result) =>
{
if (result.a != null)
{
Console.WriteLine($"ID: {result.UserA.Id}");
}
else {
Console.WriteLine($"ID: {result.UserB.Id}");
}
});
使用 Observable.Amb
运算符可以很容易地完成您想要做的事情。它的工作是允许两个 observables 运行,但是一旦一个 returns 一个值然后忽略另一个。
查询的编写方法如下:
var query =
from user in loginUserObservable
let a_observable = user.Type == UserType.A ? RetrieveRemoteA(user.UserId) : Observable.Never<A>()
let b_observable = user.Type == UserType.B ? RetrieveRemoteB(user.UserId) : Observable.Never<B>()
from ab in
Observable
.Amb(
a_observable.Select(x => new { a = x, b = (B)null }),
b_observable.Select(x => new { a = (A)null, b = x }))
select ab;
就是这样。
根据这个示例数据,您可以玩一玩:
var currentUser = "";
IObservable<User> LogInUser(string cu) => Observable.Start(() => new User() { Type = UserType.A, UserId = "Z1" });
IObservable<A> RetrieveRemoteA(string id) => Observable.Start(() => new A() { Id = "A.Z2" });
IObservable<B> RetrieveRemoteB(string id) => Observable.Start(() => new B() { Id = "B.Z3" });
IObservable<User> loginUserObservable = LogInUser(currentUser);
/* put `query` here */
query
.Subscribe(x =>
Console.WriteLine($@"ID: {(x.a != null ? x.a.Id : x.b.Id)}"));
或者,您可以通过执行以下简单查询完全避免 Amb
运算符:
var query =
from user in loginUserObservable
from ab in
user.Type == UserType.A
? RetrieveRemoteA(user.UserId).Select(x => new { a = x, b = (B)null })
: RetrieveRemoteB(user.UserId).Select(x => new { a = (A)null, b = x })
select ab;