拆分 IObservable<byte[]> 到字符然后到行
Split IObservable<byte[]> to characters then to line
Rx 很棒,但有时很难找到优雅的方式来做某事。这个想法很简单。我收到带有字节 [] 的事件,此数组可能包含一行的一部分、多行或一行。我想要的是找到一种方法让 IObservable of Line 所以 IObservable<String>
,其中序列的每个元素都是一行。
几个小时后,我找到的最接近的解决方案非常难看,当然也行不通,因为 scan 在每个字符上触发 OnNext:
//Intermediate subject use to transform byte[] into char
var outputStream = new Subject<char>();
_reactiveSubcription = outputStream
//Scan doesn't work it trigger OnNext on every char
//Aggregate doesn't work neither as it doesn't return intermediate result
.Scan(new StringBuilder(), (builder, c) => c == '\r' ? new StringBuilder() : builder.Append((char)c))
.Subscribe(this);
Observable.FromEventPattern<ShellDataEventArgs>(shell, "DataReceived")
//Data is a byte[]
.Select(_ => _.EventArgs.Data)
.Subscribe(array => array.ToObservable()
//Convert into char
.ForEach(c => outputStream.OnNext((char)c)));
注意:_reactiveSubcription
应该是 IObservable<String>
。
如果不考虑字符编码问题,我缺少什么来完成这项工作?
这对我有用。
首先,将 byte[] 转换为字符串并在 \r
上拆分字符串(正则表达式拆分保留分隔符)。
现在有一个字符串流,其中一些以 \r
结尾。
然后 Concat,以保持它们的顺序。此外,由于 strings
需要成为下一步的热点,因此发布它们。
var strings = bytes.
Select(arr => (Regex.Split(Encoding.Default.GetString(arr, 0, arr.Length - 1), "(\r)")).
Where(s=> s.Length != 0).
ToObservable()).
Concat().
Publish().
RefCount();
创建一个 Window 字符串,当字符串以 \r
结尾时结束。 strings
需要热,因为它同时用于 Window 内容和结束-window 触发器。
var linewindows = strings.Window(strings.Where(s => s.EndsWith("\r")));
将每个 Window 聚合成一个字符串。
var lines = linewindows.SelectMany(w => w.Aggregate((l, r) => l + r));
lines
是 IObservable<String>
并且每个字符串包含一行。
为了测试这一点,我使用了以下生成器来生成 IObservable<byte[]>
var bytes = Observable.
Range(1, 10).
SelectMany(i => Observable.
Return((byte)('A' + i)).
Repeat(24).
Concat(Observable.
Return((byte)'\r'))).
Window(17).
SelectMany(w => w.ToArray());
Rx 很棒,但有时很难找到优雅的方式来做某事。这个想法很简单。我收到带有字节 [] 的事件,此数组可能包含一行的一部分、多行或一行。我想要的是找到一种方法让 IObservable of Line 所以 IObservable<String>
,其中序列的每个元素都是一行。
几个小时后,我找到的最接近的解决方案非常难看,当然也行不通,因为 scan 在每个字符上触发 OnNext:
//Intermediate subject use to transform byte[] into char
var outputStream = new Subject<char>();
_reactiveSubcription = outputStream
//Scan doesn't work it trigger OnNext on every char
//Aggregate doesn't work neither as it doesn't return intermediate result
.Scan(new StringBuilder(), (builder, c) => c == '\r' ? new StringBuilder() : builder.Append((char)c))
.Subscribe(this);
Observable.FromEventPattern<ShellDataEventArgs>(shell, "DataReceived")
//Data is a byte[]
.Select(_ => _.EventArgs.Data)
.Subscribe(array => array.ToObservable()
//Convert into char
.ForEach(c => outputStream.OnNext((char)c)));
注意:_reactiveSubcription
应该是 IObservable<String>
。
如果不考虑字符编码问题,我缺少什么来完成这项工作?
这对我有用。
首先,将 byte[] 转换为字符串并在 \r
上拆分字符串(正则表达式拆分保留分隔符)。
现在有一个字符串流,其中一些以 \r
结尾。
然后 Concat,以保持它们的顺序。此外,由于 strings
需要成为下一步的热点,因此发布它们。
var strings = bytes.
Select(arr => (Regex.Split(Encoding.Default.GetString(arr, 0, arr.Length - 1), "(\r)")).
Where(s=> s.Length != 0).
ToObservable()).
Concat().
Publish().
RefCount();
创建一个 Window 字符串,当字符串以 \r
结尾时结束。 strings
需要热,因为它同时用于 Window 内容和结束-window 触发器。
var linewindows = strings.Window(strings.Where(s => s.EndsWith("\r")));
将每个 Window 聚合成一个字符串。
var lines = linewindows.SelectMany(w => w.Aggregate((l, r) => l + r));
lines
是 IObservable<String>
并且每个字符串包含一行。
为了测试这一点,我使用了以下生成器来生成 IObservable<byte[]>
var bytes = Observable.
Range(1, 10).
SelectMany(i => Observable.
Return((byte)('A' + i)).
Repeat(24).
Concat(Observable.
Return((byte)'\r'))).
Window(17).
SelectMany(w => w.ToArray());