数组 returns 相同数组上的 Observable concat 方法
Observable concat method on array returns the same array
在我的服务中,我尝试解决下一个问题。我有一个 json 文件,其中包含其他 json 个文件(卡片)的名称:
{
"filename1" : ... ,
"filename2" : ... ,
...
"filenameN" : ...
}
我需要加载所有文件。 "filenameX" 个文件有一些卡片数据:
{
dataX
}
我需要合并对象中加载的数据:
{
"filename1" : { data1 },
"filename2" : { data2 },
...
"filenameN" : { dataN }
}
我为任何文件加载创建观察者,并尝试将它们组合成一个高级的单一观察者,当所有相应的观察者都被解析时。这是我的代码:
import { Observable } from "rxjs/Observable";
import "rxjs/add/observable/concat";
import "rxjs/add/operator/map";
...
_loadCards(dir, cardsListFile) {
var http = ...
var url = ...
return Observable.create(function (observer) {
http.get(url + dir + "/" + cardsListFile + ".json").map(res => { return res.json(); }).subscribe(list => {
let data = {};
let observers = [];
for(var card in list) {
if(list.hasOwnProperty(card)) {
let obs = http.get(url + dir + "/cards/" + card + ".json").map(res => { return res.json(); });
observers.push(obs);
let getData = function(card) {
obs.subscribe(cardData => {
data[card] = cardData;
});
};
getData(card);
}
}
let concatResult = Observable.concat(observers);
console.log(concatResult);
concatResult.subscribe(result => {
observer.onNext(data);
observer.onCompleted();
});
});
});
};
但是,Observer 的 concat 运算符并不像描述的那样工作 - 它 returns 与其输入相同的 Observers 数组。问题出在哪里,我可以使用哪些其他运算符来使我的解决方案更直接(因为现在它确实很丑陋)?
规则 #1:从不 在其他订阅中使用自定义订阅 - 可能只有 0.1% 的情况确实需要这样做。
我已经对您的流进行了一些重写:
_loadCards(dir, cardsListFile) {
var http = ...
var url = ...
return http.get(url + dir + "/" + cardsListFile + ".json")
.map(res => res.json())
.switchMap(fileMap => Rx.Observable.from(Object.keys(fileMap)))
.concatMap(card => http.get(url + dir + "/cards/" + card + ".json")
.map(res => ({[card]: res.json()})))
.map(arrayOfCardResultObjects => Object.assign.apply({}, arrayOfCardResultObjects));
};
此调整后的版本 运行 作为片段发布:
getMockedCardMap()
.switchMap(fileMap => Rx.Observable.from(Object.keys(fileMap)))
.concatMap(card => getMockedCardFromFile(card).map(res => ({[card]: res})))
.toArray()
.map(arrayOfCardResultObjects => Object.assign.apply({}, arrayOfCardResultObjects))
.subscribe(console.log);
// Mocked helper-methods
function getMockedCardMap() {
return Rx.Observable.of({
"file1": "foo",
"file2": "bar",
"file3": "baz"
});
}
function getMockedCardFromFile(file) {
return Rx.Observable.of("Result from: " + file);
}
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>
我不完全确定我明白你想做什么,但 concat()
运算符将其参数解压缩而不是作为数组。
这意味着 Observable.concat(observables)
将重新发送 observables
数组中的 Observables。
你想要的是传递解压缩的数组Observable.concat(...observables)
,这与:
Observable.concat(observable1, observable2, observable3 ,...)
我认为这模拟了您正在尝试做的事情。它使用 forkJoin
而不是 concat
所以它会等到所有 Observables 完成。
var input = {
"filename1": 'data1',
"filename2": 'data2',
"filenameN": 'dataN'
};
let keys = Object.keys(input);
let observables = keys.map(key => {
return Observable.of(input[key]).map(s => s.split("").reverse().join(""));
});
Observable.forkJoin(observables, (...results) => {
let combined = {};
keys.forEach((key, i) => {
combined[key] = results[i];
});
return combined;
})
.subscribe(val => console.log(val));
这将打印到控制台:
{ filename1: '1atad', filename2: '2atad', filenameN: 'Natad' }
JSBIN 与解决方案:http://jsbin.com/dahaqif/edit?js,console
我假设您有两个可观察对象,一个用于获取文件名列表,另一个用于获取特定文件名的数据:
const filenamesObs = Observable.of({
"filename1" : null,
"filename2" : null,
"filename3" : null
});
// This is actually a function that returns an observable.
const filedataObs = (filename) => {
return Observable.of(`This is the data for ${filename}`);
}
现在,您可以通过以下方式组合这两个可观察量以获得您描述的数据结构:
const source = filenamesObs
// Extract the list of filenames as an array.
.map(obj => Object.keys(obj))
// Flatten the array, i.e. emit each filename individually
// vs a SINGLE array containing ALL filenames.
.mergeMap(val => val)
// Get the data for each filename.
.mergeMap(filename =>
filedataObs(filename).map(data => Object.assign({}, { filename: filename, data: data }))
)
// At this point, you have a stream of { filename, data } objects.
// Reduce everything back to a single object.
.reduce((acc, curr) => {
acc[curr.filename] = curr.data;
return acc;
}, {});
如果您订阅并将结果记录到控制台,您将看到:
[Object] {
filename1: "This is the data for filename1",
filename2: "This is the data for filename2",
filename3: "This is the data for filename3"
}
Arseniy 评论后的补充说明
- 第一个
mergeMap()
是一个 "trick" 将我们在开始时拥有的单个值数组转换为多个独立值。由于您希望能够单独处理每个文件名(以获取相应的数据),因此一个接一个地接收文件名比作为包含所有文件名的大数组更方便。为了crystal清楚,你应该去我的 JSBIN 并添加行 .do(console.log)
before 和 after 第一个mergeMap()
。您会立即了解其中的区别。
- 第二个
mergeMap()
"projects" 将源可观察值转换为目标可观察值。这听起来很花哨,但它只是意味着我们正在将文件名(第一个可观察到的)转换为 HTTP 请求以获取文件的数据(第二个可观察到的)。
- 最后,
Object.assign()
让 met 把所有东西放回原处。由于我们有来自两个可观察对象的数据——一个发出文件名,另一个发出文件数据——我们需要将所有内容合并到一个容器中,然后才能使用它(或进一步转换它)。为此,我使用 Object.assign()
创建一个具有两个属性的临时对象,filename
和 data
。请注意,这是 vanilla JavaScript,它与 observables 无关。同样,您可以在第二个 mergeMap()
之后添加 .do(console.log)
行,以将此时流包含的内容打印到控制台。
在我的服务中,我尝试解决下一个问题。我有一个 json 文件,其中包含其他 json 个文件(卡片)的名称:
{
"filename1" : ... ,
"filename2" : ... ,
...
"filenameN" : ...
}
我需要加载所有文件。 "filenameX" 个文件有一些卡片数据:
{
dataX
}
我需要合并对象中加载的数据:
{
"filename1" : { data1 },
"filename2" : { data2 },
...
"filenameN" : { dataN }
}
我为任何文件加载创建观察者,并尝试将它们组合成一个高级的单一观察者,当所有相应的观察者都被解析时。这是我的代码:
import { Observable } from "rxjs/Observable";
import "rxjs/add/observable/concat";
import "rxjs/add/operator/map";
...
_loadCards(dir, cardsListFile) {
var http = ...
var url = ...
return Observable.create(function (observer) {
http.get(url + dir + "/" + cardsListFile + ".json").map(res => { return res.json(); }).subscribe(list => {
let data = {};
let observers = [];
for(var card in list) {
if(list.hasOwnProperty(card)) {
let obs = http.get(url + dir + "/cards/" + card + ".json").map(res => { return res.json(); });
observers.push(obs);
let getData = function(card) {
obs.subscribe(cardData => {
data[card] = cardData;
});
};
getData(card);
}
}
let concatResult = Observable.concat(observers);
console.log(concatResult);
concatResult.subscribe(result => {
observer.onNext(data);
observer.onCompleted();
});
});
});
};
但是,Observer 的 concat 运算符并不像描述的那样工作 - 它 returns 与其输入相同的 Observers 数组。问题出在哪里,我可以使用哪些其他运算符来使我的解决方案更直接(因为现在它确实很丑陋)?
规则 #1:从不 在其他订阅中使用自定义订阅 - 可能只有 0.1% 的情况确实需要这样做。
我已经对您的流进行了一些重写:
_loadCards(dir, cardsListFile) {
var http = ...
var url = ...
return http.get(url + dir + "/" + cardsListFile + ".json")
.map(res => res.json())
.switchMap(fileMap => Rx.Observable.from(Object.keys(fileMap)))
.concatMap(card => http.get(url + dir + "/cards/" + card + ".json")
.map(res => ({[card]: res.json()})))
.map(arrayOfCardResultObjects => Object.assign.apply({}, arrayOfCardResultObjects));
};
此调整后的版本 运行 作为片段发布:
getMockedCardMap()
.switchMap(fileMap => Rx.Observable.from(Object.keys(fileMap)))
.concatMap(card => getMockedCardFromFile(card).map(res => ({[card]: res})))
.toArray()
.map(arrayOfCardResultObjects => Object.assign.apply({}, arrayOfCardResultObjects))
.subscribe(console.log);
// Mocked helper-methods
function getMockedCardMap() {
return Rx.Observable.of({
"file1": "foo",
"file2": "bar",
"file3": "baz"
});
}
function getMockedCardFromFile(file) {
return Rx.Observable.of("Result from: " + file);
}
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>
我不完全确定我明白你想做什么,但 concat()
运算符将其参数解压缩而不是作为数组。
这意味着 Observable.concat(observables)
将重新发送 observables
数组中的 Observables。
你想要的是传递解压缩的数组Observable.concat(...observables)
,这与:
Observable.concat(observable1, observable2, observable3 ,...)
我认为这模拟了您正在尝试做的事情。它使用 forkJoin
而不是 concat
所以它会等到所有 Observables 完成。
var input = {
"filename1": 'data1',
"filename2": 'data2',
"filenameN": 'dataN'
};
let keys = Object.keys(input);
let observables = keys.map(key => {
return Observable.of(input[key]).map(s => s.split("").reverse().join(""));
});
Observable.forkJoin(observables, (...results) => {
let combined = {};
keys.forEach((key, i) => {
combined[key] = results[i];
});
return combined;
})
.subscribe(val => console.log(val));
这将打印到控制台:
{ filename1: '1atad', filename2: '2atad', filenameN: 'Natad' }
JSBIN 与解决方案:http://jsbin.com/dahaqif/edit?js,console
我假设您有两个可观察对象,一个用于获取文件名列表,另一个用于获取特定文件名的数据:
const filenamesObs = Observable.of({
"filename1" : null,
"filename2" : null,
"filename3" : null
});
// This is actually a function that returns an observable.
const filedataObs = (filename) => {
return Observable.of(`This is the data for ${filename}`);
}
现在,您可以通过以下方式组合这两个可观察量以获得您描述的数据结构:
const source = filenamesObs
// Extract the list of filenames as an array.
.map(obj => Object.keys(obj))
// Flatten the array, i.e. emit each filename individually
// vs a SINGLE array containing ALL filenames.
.mergeMap(val => val)
// Get the data for each filename.
.mergeMap(filename =>
filedataObs(filename).map(data => Object.assign({}, { filename: filename, data: data }))
)
// At this point, you have a stream of { filename, data } objects.
// Reduce everything back to a single object.
.reduce((acc, curr) => {
acc[curr.filename] = curr.data;
return acc;
}, {});
如果您订阅并将结果记录到控制台,您将看到:
[Object] {
filename1: "This is the data for filename1",
filename2: "This is the data for filename2",
filename3: "This is the data for filename3"
}
Arseniy 评论后的补充说明
- 第一个
mergeMap()
是一个 "trick" 将我们在开始时拥有的单个值数组转换为多个独立值。由于您希望能够单独处理每个文件名(以获取相应的数据),因此一个接一个地接收文件名比作为包含所有文件名的大数组更方便。为了crystal清楚,你应该去我的 JSBIN 并添加行.do(console.log)
before 和 after 第一个mergeMap()
。您会立即了解其中的区别。 - 第二个
mergeMap()
"projects" 将源可观察值转换为目标可观察值。这听起来很花哨,但它只是意味着我们正在将文件名(第一个可观察到的)转换为 HTTP 请求以获取文件的数据(第二个可观察到的)。 - 最后,
Object.assign()
让 met 把所有东西放回原处。由于我们有来自两个可观察对象的数据——一个发出文件名,另一个发出文件数据——我们需要将所有内容合并到一个容器中,然后才能使用它(或进一步转换它)。为此,我使用Object.assign()
创建一个具有两个属性的临时对象,filename
和data
。请注意,这是 vanilla JavaScript,它与 observables 无关。同样,您可以在第二个mergeMap()
之后添加.do(console.log)
行,以将此时流包含的内容打印到控制台。