如何拆分和合并 RxJS 发出的数组值?
How to split and merge RxJS emitted array values?
我有一个 RxJS Observable,它发出 Uint8Array
值类型的二进制数据。但并非每个发出的值都包含一个完整的数据对象,可以自行处理。
完整数据对象的数据格式由一个起始字节(0xAA
)、中间的一些可变长度数据和一个结束字节(0xFF
)组成。中间的数据是 BCD 编码的,这意味着它主要不包含开始或结束字节,而只包含从 0x00
到 0x99
.
的二进制值
这是一个例子:
// This is a mock of the source observable which emits values:
const source$ = from([
// Case 1: One complete data object with start (0xAA) and end byte (0xFF)
new Uint8Array([0xAA, 0x01, 0x05, 0x95, 0x51, 0xFF,]),
// Case 2: Two complete data objects in a single value emit
new Uint8Array([0xAA, 0x12, 0x76, 0xFF, 0xAA, 0x83, 0x43, 0xFF,]),
// Case 3: Two uncomplete value emits which form a single data object
new Uint8Array([0xAA, 0x61, 0x85, 0x43, 0x67]),
new Uint8Array([0x82, 0x73, 0x44, 0x28, 0x85, 0xFF]),
// Case 4: A combination of Cases 2 and 3
new Uint8Array([0xAA, 0x61, 0x85, 0x43, 0x67]),
new Uint8Array([0x55, 0x81, 0xFF, 0xAA, 0x73, 0x96]),
new Uint8Array([0x72, 0x23, 0x11, 0x95, 0xFF]),
])
source$.subscribe((x) => {
console.log('Emitted value as Hexdump:')
console.log(hexdump(x.buffer))
})
目标是只接收完整的数据对象。也许作为一个转换后的新可观察对象?
上面的例子应该是这样的:
const transformedSource$ = from([
// Case 1
new Uint8Array([0xAA, 0x01, 0x05, 0x95, 0x51, 0xFF,]),
// Case 2
new Uint8Array([0xAA, 0x12, 0x76, 0xFF,]),
new Uint8Array([0xAA, 0x83, 0x43, 0xFF,]),
// Case 3
new Uint8Array([0xAA, 0x61, 0x85, 0x43, 0x67, 0x82, 0x73, 0x44, 0x28, 0x85, 0xFF]),
// Case 4
new Uint8Array([0xAA, 0x61, 0x85, 0x43, 0x67, 0x55, 0x81, 0xFF]),
new Uint8Array([0xAA, 0x73, 0x96, 0x72, 0x23, 0x11, 0x95, 0xFF]),
])
- 哪些 RxJS 方法或运算符适用于此?
- 我想先在
0xFF
处进行拆分,然后再进行合并。这该怎么做?非常感谢具有 RxJS 经验的人的想法。
您可以尝试拆分块以便一个一个地处理字节,将它们添加到缓冲区直到 0xff
字节出现在流中并返回所有缓冲的元素并重置缓冲区下一块:
let buffer = new Uint8Array();
transformedSource$.pipe(
mergeAll(), // this splits your array and emits the single bytes into the stream
mergeMap((next) => {
buffer = new Uint8Array([...buffer, next]);
if (next === 0xff) { (
const result = buffer;
buffer = new Uint8Array(); // resets the buffer
return of(result); // will emit a completed chunk
}
return EMPTY; // won't emit anything
})
)
.subscribe(console.log);
如果您不喜欢全局变量,或者您还想为其他流重用该代码,这里有一个带有自定义运算符的替代解决方案:
const mergeChunks = () => {
let buffer = new Uint8Array();
return (source$) =>
source$.pipe(
mergeMap((next) => {
buffer = new Uint8Array([...buffer, next]);
if (next === 0xff) {
const result = buffer;
buffer = new Uint8Array();
return of(result);
}
return EMPTY;
})
);
}
transformedSource$.pipe(
mergeAll(),
mergeChunks(),
)
.subscribe(console.log);
我决定尝试仅使用内置运算符而不使用外部变量来实现相同的目的。这更像是一个练习,可以回答问题的第一部分:
Which RxJS methods or operators are suitable for this
使用 buffer
运算符的变体是可行的方法。 的回答是第一位的,有效的,如果我必须选择的话,我可能会选择自己的答案。
source$
.pipe(
mergeMap(t => t), // this transforms an array of bytes to single byte emissions
multicast(
() => new Subject<number>(),
s =>
s.pipe(
bufferToggle(
s.pipe(filter(v => v === 0xaa)),
() => s.pipe(filter(v => v === 0xff))
)
)
),
map(v => new Uint8Array(v))
)
.subscribe(x => {
console.log(hexdump(x.buffer));
});
Stackblitz here.
我有一个 RxJS Observable,它发出 Uint8Array
值类型的二进制数据。但并非每个发出的值都包含一个完整的数据对象,可以自行处理。
完整数据对象的数据格式由一个起始字节(0xAA
)、中间的一些可变长度数据和一个结束字节(0xFF
)组成。中间的数据是 BCD 编码的,这意味着它主要不包含开始或结束字节,而只包含从 0x00
到 0x99
.
这是一个例子:
// This is a mock of the source observable which emits values:
const source$ = from([
// Case 1: One complete data object with start (0xAA) and end byte (0xFF)
new Uint8Array([0xAA, 0x01, 0x05, 0x95, 0x51, 0xFF,]),
// Case 2: Two complete data objects in a single value emit
new Uint8Array([0xAA, 0x12, 0x76, 0xFF, 0xAA, 0x83, 0x43, 0xFF,]),
// Case 3: Two uncomplete value emits which form a single data object
new Uint8Array([0xAA, 0x61, 0x85, 0x43, 0x67]),
new Uint8Array([0x82, 0x73, 0x44, 0x28, 0x85, 0xFF]),
// Case 4: A combination of Cases 2 and 3
new Uint8Array([0xAA, 0x61, 0x85, 0x43, 0x67]),
new Uint8Array([0x55, 0x81, 0xFF, 0xAA, 0x73, 0x96]),
new Uint8Array([0x72, 0x23, 0x11, 0x95, 0xFF]),
])
source$.subscribe((x) => {
console.log('Emitted value as Hexdump:')
console.log(hexdump(x.buffer))
})
目标是只接收完整的数据对象。也许作为一个转换后的新可观察对象?
上面的例子应该是这样的:
const transformedSource$ = from([
// Case 1
new Uint8Array([0xAA, 0x01, 0x05, 0x95, 0x51, 0xFF,]),
// Case 2
new Uint8Array([0xAA, 0x12, 0x76, 0xFF,]),
new Uint8Array([0xAA, 0x83, 0x43, 0xFF,]),
// Case 3
new Uint8Array([0xAA, 0x61, 0x85, 0x43, 0x67, 0x82, 0x73, 0x44, 0x28, 0x85, 0xFF]),
// Case 4
new Uint8Array([0xAA, 0x61, 0x85, 0x43, 0x67, 0x55, 0x81, 0xFF]),
new Uint8Array([0xAA, 0x73, 0x96, 0x72, 0x23, 0x11, 0x95, 0xFF]),
])
- 哪些 RxJS 方法或运算符适用于此?
- 我想先在
0xFF
处进行拆分,然后再进行合并。这该怎么做?非常感谢具有 RxJS 经验的人的想法。
您可以尝试拆分块以便一个一个地处理字节,将它们添加到缓冲区直到 0xff
字节出现在流中并返回所有缓冲的元素并重置缓冲区下一块:
let buffer = new Uint8Array();
transformedSource$.pipe(
mergeAll(), // this splits your array and emits the single bytes into the stream
mergeMap((next) => {
buffer = new Uint8Array([...buffer, next]);
if (next === 0xff) { (
const result = buffer;
buffer = new Uint8Array(); // resets the buffer
return of(result); // will emit a completed chunk
}
return EMPTY; // won't emit anything
})
)
.subscribe(console.log);
如果您不喜欢全局变量,或者您还想为其他流重用该代码,这里有一个带有自定义运算符的替代解决方案:
const mergeChunks = () => {
let buffer = new Uint8Array();
return (source$) =>
source$.pipe(
mergeMap((next) => {
buffer = new Uint8Array([...buffer, next]);
if (next === 0xff) {
const result = buffer;
buffer = new Uint8Array();
return of(result);
}
return EMPTY;
})
);
}
transformedSource$.pipe(
mergeAll(),
mergeChunks(),
)
.subscribe(console.log);
我决定尝试仅使用内置运算符而不使用外部变量来实现相同的目的。这更像是一个练习,可以回答问题的第一部分:
Which RxJS methods or operators are suitable for this
使用 buffer
运算符的变体是可行的方法。
source$
.pipe(
mergeMap(t => t), // this transforms an array of bytes to single byte emissions
multicast(
() => new Subject<number>(),
s =>
s.pipe(
bufferToggle(
s.pipe(filter(v => v === 0xaa)),
() => s.pipe(filter(v => v === 0xff))
)
)
),
map(v => new Uint8Array(v))
)
.subscribe(x => {
console.log(hexdump(x.buffer));
});
Stackblitz here.