RxJS 序列等同于 promise.then()?
RxJS sequence equivalent to promise.then()?
我曾经有很多开发承诺,现在我正在转向 RxJS。 RxJS 的文档没有提供一个非常清晰的例子来说明如何从 promise 链移动到观察者序列。
例如,我通常写多步的promise链,比如
// a function that returns a promise
getPromise()
.then(function(result) {
// do something
})
.then(function(result) {
// do something
})
.then(function(result) {
// do something
})
.catch(function(err) {
// handle error
});
我应该如何用 RxJS 风格重写这个 promise 链?
对于数据流(相当于then
):
Rx.Observable.fromPromise(...)
.flatMap(function(result) {
// do something
})
.flatMap(function(result) {
// do something
})
.subscribe(function onNext(result) {
// end of chain
}, function onError(error) {
// process the error
});
可以使用 Rx.Observable.fromPromise
.
将 promise 转换为 observable
一些 promise 运算符有直接翻译。例如RSVP.all
,或者jQuery.when
可以用Rx.Observable.forkJoin
代替。
请记住,您有一堆运算符可以异步转换数据,并执行您不能或很难用 promises 完成的任务。 Rxjs 通过异步数据序列(序列即多于 1 个异步值)显示其所有功能。
对于错误管理,这个主题有点复杂。
要获得精确的语义,请深入查看可在网络上找到的文档和示例,或在此处提出具体问题。
这绝对是深入使用 Rxjs 进行错误管理的良好起点:https://xgrommx.github.io/rx-book/content/getting_started_with_rxjs/creating_and_querying_observable_sequences/error_handling.html
更现代的选择:
import {from as fromPromise} from 'rxjs';
import {catchError, flatMap} from 'rxjs/operators';
fromPromise(...).pipe(
flatMap(result => {
// do something
}),
flatMap(result => {
// do something
}),
flatMap(result => {
// do something
}),
catchError(error => {
// handle error
})
)
另请注意,要使所有这些工作正常,您需要 subscribe
到此管道 Observable
某处,但我假设它是在应用程序的其他部分处理的。
如果 getPromise
函数位于流管道的中间,您应该将其简单地包装到函数 mergeMap
、switchMap
或 concatMap
之一(通常是 mergeMap
):
stream$.pipe(
mergeMap(data => getPromise(data)),
filter(...),
map(...)
).subscribe(...);
如果你想用 getPromise()
开始你的流然后将它包装到 from
函数中:
import {from} from 'rxjs';
from(getPromise()).pipe(
filter(...)
map(...)
).subscribe(...);
据我刚刚发现,如果您 return flatMap 中的结果,它会将其转换为数组,即使您 return 编辑了一个字符串。
但是如果你return一个Observable,那个observable可以return一个字符串;
如果我没理解错的话,你的意思是消费这些值,在这种情况下你使用 sbuscribe 即
const arrObservable = from([1,2,3,4,5,6,7,8]);
arrObservable.subscribe(number => console.log(num) );
此外,您可以使用 toPromise() 将 observable 转换为 promise,如下所示:
arrObservable.toPromise().then()
2019 年 5 月更新,使用 RxJs 6
同意上面提供的答案,希望使用 RxJs v6 添加一个带有一些玩具数据和简单承诺(使用 setTimeout)的具体示例以增加清晰度。
只需将传递的 ID(当前硬编码为 1
)更新为不存在的内容即可执行错误处理逻辑。重要的是,还要注意使用 of
和 catchError
消息。
import { from as fromPromise, of } from "rxjs";
import { catchError, flatMap, tap } from "rxjs/operators";
const posts = [
{ title: "I love JavaScript", author: "Wes Bos", id: 1 },
{ title: "CSS!", author: "Chris Coyier", id: 2 },
{ title: "Dev tools tricks", author: "Addy Osmani", id: 3 }
];
const authors = [
{ name: "Wes Bos", twitter: "@wesbos", bio: "Canadian Developer" },
{
name: "Chris Coyier",
twitter: "@chriscoyier",
bio: "CSS Tricks and CodePen"
},
{ name: "Addy Osmani", twitter: "@addyosmani", bio: "Googler" }
];
function getPostById(id) {
return new Promise((resolve, reject) => {
setTimeout(() => {
const post = posts.find(post => post.id === id);
if (post) {
console.log("ok, post found!");
resolve(post);
} else {
reject(Error("Post not found!"));
}
}, 200);
});
}
function hydrateAuthor(post) {
return new Promise((resolve, reject) => {
setTimeout(() => {
const authorDetails = authors.find(person => person.name === post.author);
if (authorDetails) {
post.author = authorDetails;
console.log("ok, post hydrated with author info");
resolve(post);
} else {
reject(Error("Author not Found!"));
}
}, 200);
});
}
function dehydratePostTitle(post) {
return new Promise((resolve, reject) => {
setTimeout(() => {
delete post.title;
console.log("ok, applied transformation to remove title");
resolve(post);
}, 200);
});
}
// ok, here is how it looks regarding this question..
let source$ = fromPromise(getPostById(1)).pipe(
flatMap(post => {
return hydrateAuthor(post);
}),
flatMap(post => {
return dehydratePostTitle(post);
}),
catchError(error => of(`Caught error: ${error}`))
);
source$.subscribe(console.log);
输出数据:
ok, post found!
ok, post hydrated with author info
ok, applied transformation to remove title
{ author:
{ name: 'Wes Bos',
twitter: '@wesbos',
bio: 'Canadian Developer' },
id: 1 }
关键部分,等价于以下使用普通的promise控制流:
getPostById(1)
.then(post => {
return hydrateAuthor(post);
})
.then(post => {
return dehydratePostTitle(post);
})
.then(author => {
console.log(author);
})
.catch(err => {
console.error(err);
});
我就是这样做的。
以前
public fetchContacts(onCompleteFn: (response: gapi.client.Response<gapi.client.people.ListConnectionsResponse>) => void) {
const request = gapi.client.people.people.connections.list({
resourceName: 'people/me',
pageSize: 100,
personFields: 'phoneNumbers,organizations,emailAddresses,names'
}).then(response => {
onCompleteFn(response as gapi.client.Response<gapi.client.people.ListConnectionsResponse>);
});
}
// caller:
this.gapi.fetchContacts((rsp: gapi.client.Response<gapi.client.people.ListConnectionsResponse>) => {
// handle rsp;
});
之后(是?)
public fetchContacts(): Observable<gapi.client.Response<gapi.client.people.ListConnectionsResponse>> {
return from(
new Promise((resolve, reject) => {
gapi.client.people.people.connections.list({
resourceName: 'people/me',
pageSize: 100,
personFields: 'phoneNumbers,organizations,emailAddresses,names'
}).then(result => {
resolve(result);
});
})
).pipe(map((result: gapi.client.Response<gapi.client.people.ListConnectionsResponse>) => {
return result; //map is not really required if you not changing anything in the response. you can just return the from() and caller would subscribe to it.
}));
}
// caller
this.gapi.fetchContacts().subscribe(((rsp: gapi.client.Response<gapi.client.people.ListConnectionsResponse>) => {
// handle rsp
}), (error) => {
// handle error
});
RxJS 序列等同于 promise.then()?
例如
function getdata1 (argument) {
return this.http.get(url)
.map((res: Response) => res.json());
}
function getdata2 (argument) {
return this.http.get(url)
.map((res: Response) => res.json());
}
getdata1.subscribe((data1: any) => {
console.log("got data one. get data 2 now");
getdata2.subscribe((data2: any) => {
console.log("got data one and two here");
});
});
我曾经有很多开发承诺,现在我正在转向 RxJS。 RxJS 的文档没有提供一个非常清晰的例子来说明如何从 promise 链移动到观察者序列。
例如,我通常写多步的promise链,比如
// a function that returns a promise
getPromise()
.then(function(result) {
// do something
})
.then(function(result) {
// do something
})
.then(function(result) {
// do something
})
.catch(function(err) {
// handle error
});
我应该如何用 RxJS 风格重写这个 promise 链?
对于数据流(相当于then
):
Rx.Observable.fromPromise(...)
.flatMap(function(result) {
// do something
})
.flatMap(function(result) {
// do something
})
.subscribe(function onNext(result) {
// end of chain
}, function onError(error) {
// process the error
});
可以使用 Rx.Observable.fromPromise
.
一些 promise 运算符有直接翻译。例如RSVP.all
,或者jQuery.when
可以用Rx.Observable.forkJoin
代替。
请记住,您有一堆运算符可以异步转换数据,并执行您不能或很难用 promises 完成的任务。 Rxjs 通过异步数据序列(序列即多于 1 个异步值)显示其所有功能。
对于错误管理,这个主题有点复杂。
要获得精确的语义,请深入查看可在网络上找到的文档和示例,或在此处提出具体问题。
这绝对是深入使用 Rxjs 进行错误管理的良好起点:https://xgrommx.github.io/rx-book/content/getting_started_with_rxjs/creating_and_querying_observable_sequences/error_handling.html
更现代的选择:
import {from as fromPromise} from 'rxjs';
import {catchError, flatMap} from 'rxjs/operators';
fromPromise(...).pipe(
flatMap(result => {
// do something
}),
flatMap(result => {
// do something
}),
flatMap(result => {
// do something
}),
catchError(error => {
// handle error
})
)
另请注意,要使所有这些工作正常,您需要 subscribe
到此管道 Observable
某处,但我假设它是在应用程序的其他部分处理的。
如果 getPromise
函数位于流管道的中间,您应该将其简单地包装到函数 mergeMap
、switchMap
或 concatMap
之一(通常是 mergeMap
):
stream$.pipe(
mergeMap(data => getPromise(data)),
filter(...),
map(...)
).subscribe(...);
如果你想用 getPromise()
开始你的流然后将它包装到 from
函数中:
import {from} from 'rxjs';
from(getPromise()).pipe(
filter(...)
map(...)
).subscribe(...);
据我刚刚发现,如果您 return flatMap 中的结果,它会将其转换为数组,即使您 return 编辑了一个字符串。
但是如果你return一个Observable,那个observable可以return一个字符串;
如果我没理解错的话,你的意思是消费这些值,在这种情况下你使用 sbuscribe 即
const arrObservable = from([1,2,3,4,5,6,7,8]);
arrObservable.subscribe(number => console.log(num) );
此外,您可以使用 toPromise() 将 observable 转换为 promise,如下所示:
arrObservable.toPromise().then()
2019 年 5 月更新,使用 RxJs 6
同意上面提供的答案,希望使用 RxJs v6 添加一个带有一些玩具数据和简单承诺(使用 setTimeout)的具体示例以增加清晰度。
只需将传递的 ID(当前硬编码为 1
)更新为不存在的内容即可执行错误处理逻辑。重要的是,还要注意使用 of
和 catchError
消息。
import { from as fromPromise, of } from "rxjs";
import { catchError, flatMap, tap } from "rxjs/operators";
const posts = [
{ title: "I love JavaScript", author: "Wes Bos", id: 1 },
{ title: "CSS!", author: "Chris Coyier", id: 2 },
{ title: "Dev tools tricks", author: "Addy Osmani", id: 3 }
];
const authors = [
{ name: "Wes Bos", twitter: "@wesbos", bio: "Canadian Developer" },
{
name: "Chris Coyier",
twitter: "@chriscoyier",
bio: "CSS Tricks and CodePen"
},
{ name: "Addy Osmani", twitter: "@addyosmani", bio: "Googler" }
];
function getPostById(id) {
return new Promise((resolve, reject) => {
setTimeout(() => {
const post = posts.find(post => post.id === id);
if (post) {
console.log("ok, post found!");
resolve(post);
} else {
reject(Error("Post not found!"));
}
}, 200);
});
}
function hydrateAuthor(post) {
return new Promise((resolve, reject) => {
setTimeout(() => {
const authorDetails = authors.find(person => person.name === post.author);
if (authorDetails) {
post.author = authorDetails;
console.log("ok, post hydrated with author info");
resolve(post);
} else {
reject(Error("Author not Found!"));
}
}, 200);
});
}
function dehydratePostTitle(post) {
return new Promise((resolve, reject) => {
setTimeout(() => {
delete post.title;
console.log("ok, applied transformation to remove title");
resolve(post);
}, 200);
});
}
// ok, here is how it looks regarding this question..
let source$ = fromPromise(getPostById(1)).pipe(
flatMap(post => {
return hydrateAuthor(post);
}),
flatMap(post => {
return dehydratePostTitle(post);
}),
catchError(error => of(`Caught error: ${error}`))
);
source$.subscribe(console.log);
输出数据:
ok, post found!
ok, post hydrated with author info
ok, applied transformation to remove title
{ author:
{ name: 'Wes Bos',
twitter: '@wesbos',
bio: 'Canadian Developer' },
id: 1 }
关键部分,等价于以下使用普通的promise控制流:
getPostById(1)
.then(post => {
return hydrateAuthor(post);
})
.then(post => {
return dehydratePostTitle(post);
})
.then(author => {
console.log(author);
})
.catch(err => {
console.error(err);
});
我就是这样做的。
以前
public fetchContacts(onCompleteFn: (response: gapi.client.Response<gapi.client.people.ListConnectionsResponse>) => void) {
const request = gapi.client.people.people.connections.list({
resourceName: 'people/me',
pageSize: 100,
personFields: 'phoneNumbers,organizations,emailAddresses,names'
}).then(response => {
onCompleteFn(response as gapi.client.Response<gapi.client.people.ListConnectionsResponse>);
});
}
// caller:
this.gapi.fetchContacts((rsp: gapi.client.Response<gapi.client.people.ListConnectionsResponse>) => {
// handle rsp;
});
之后(是?)
public fetchContacts(): Observable<gapi.client.Response<gapi.client.people.ListConnectionsResponse>> {
return from(
new Promise((resolve, reject) => {
gapi.client.people.people.connections.list({
resourceName: 'people/me',
pageSize: 100,
personFields: 'phoneNumbers,organizations,emailAddresses,names'
}).then(result => {
resolve(result);
});
})
).pipe(map((result: gapi.client.Response<gapi.client.people.ListConnectionsResponse>) => {
return result; //map is not really required if you not changing anything in the response. you can just return the from() and caller would subscribe to it.
}));
}
// caller
this.gapi.fetchContacts().subscribe(((rsp: gapi.client.Response<gapi.client.people.ListConnectionsResponse>) => {
// handle rsp
}), (error) => {
// handle error
});
RxJS 序列等同于 promise.then()?
例如
function getdata1 (argument) {
return this.http.get(url)
.map((res: Response) => res.json());
}
function getdata2 (argument) {
return this.http.get(url)
.map((res: Response) => res.json());
}
getdata1.subscribe((data1: any) => {
console.log("got data one. get data 2 now");
getdata2.subscribe((data2: any) => {
console.log("got data one and two here");
});
});