是否有像 Scan 这样工作的运算符,但让我 return 一个 IObservable<TResult> 而不是 IObservable<TSource>?
Is there an operator that works like Scan but let's me return an IObservable<TResult> instead of IObservable<TSource>?
在这个纯粹为了练习而编造的例子中,这是我想要的 return:
如果两个学生在指定的时间段内加入学校,比如 2 秒,那么我想要一个数据结构 return包含两个学生、他们加入的学校以及他们加入的时间间隔.
我一直在思考这些问题:
class Program
{
static void Main(string[] args)
{
ObserveStudentsJoiningWithin(TimeSpan.FromSeconds(2));
}
static void ObserveStudentsJoiningWithin(TimeSpan timeSpan)
{
var school = new School("School 1");
var admissionObservable =
Observable.FromEventPattern<StudentAdmittedEventArgs>(school, "StudentAdmitted");
var observable = admissionObservable.TimeInterval()
.Scan((current, next) =>
{
if (next.Interval - current.Interval <= timeSpan)
{
// But this won't work for me because
// this requires me to return a TSource
// and not a TResult
}
});
var subscription = observable.Subscribe(TimeIntervalValueHandler);
school.FillWithStudentsAsync(10, TimeSpan.FromSeconds(3));
school.FillWithStudentsAsync(8, TimeSpan.FromSeconds(1));
Console.WriteLine("Press any key to exit the program");
Console.ReadKey();
subscription.Dispose();
}
}
这是域名:
using System;
using System.Collections;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace SchoolManagementSystem
{
public class Student
{
private static int _studentNumber;
public Student(string name)
{
Name = name;
}
public string Name { get; set; }
public static Student CreateRandom()
{
var name = string.Format($"Student {++_studentNumber}");
return new Student(name);
}
public override string ToString()
{
return Name;
}
}
public class School: IEnumerable<Student>
{
private List<Student> _students;
public event StudentAdmitted StudentAdmitted;
public string Name { get; set; }
public School(string name)
{
Name = name;
_students = new List<Student>();
}
public void AdmitStudent(Student student)
{
if (!_students.Contains(student))
{
_students.Add(student);
OnStudentAdmitted(this, student);
}
}
protected virtual void OnStudentAdmitted(School school, Student student)
{
var args = new StudentAdmittedEventArgs(school, student);
StudentAdmitted?.Invoke(this, args);
}
public IEnumerator<Student> GetEnumerator()
{
return _students.GetEnumerator();
}
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
}
public delegate void StudentAdmitted(object sender, StudentAdmittedEventArgs args);
public class StudentAdmittedEventArgs : EventArgs
{
public StudentAdmittedEventArgs(School school, Student student): base()
{
School = school;
Student = student;
}
public School School { get; protected set; }
public Student Student { get; protected set; }
}
public static class Extensions
{
public async static void FillWithStudentsAsync(this School school, int howMany, TimeSpan gapBetweenEachAdmission)
{
if (school == null)
throw new ArgumentNullException("school");
if (howMany < 0)
throw new ArgumentOutOfRangeException("howMany");
if (howMany == 1)
{
school.AdmitStudent(Student.CreateRandom());
return;
}
for (int i = 0; i < howMany; i++)
{
await Task.Delay(gapBetweenEachAdmission);
school.AdmitStudent(Student.CreateRandom());
}
}
}
}
但是,Scan
运算符只允许我 return 同一个 TSource
的可观察对象。 Select
也不会在这里工作,因为我无法 展望未来 (我可以用 Scan
做的事情)并将当前项目与下一个项目一起投影,尽管 Select
允许我将 TSource
转换为 TResult
.
我正在寻找介于两者之间的东西。
- 对于成对比较(原始 - 将当前项目与下一个项目一起投影)您可以使用 Buffer 方法来构建成对序列。
- 为了找出学生加入之间的间隔,使用 Timestamp instead of TimeInterval 方法可能更有用,因为以下行
next.Interval - current.Interval <= timeSpan
。你真正想要的是 pair[1].Timestamp - pair[0].Timestamp <= timeSpan
以下 4 对结果(学生 11、学生 12)、(学生 13、学生 14)、(学生 15、学生 16)、(学生 17、学生 18):
var admissionObservable = Observable
.FromEventPattern<StudentAdmittedEventArgs>(school, "StudentAdmitted")
.Timestamp()
.Buffer(2)
.Where(pair => pair[1].Timestamp - pair[0].Timestamp <= timeSpan)
.Select(pair => new JoiningData
{
Students = Tuple.Create(pair[0].Value.EventArgs.Student, pair[1].Value.EventArgs.Student),
School = pair[0].Value.EventArgs.School,
Interval = pair[1].Timestamp - pair[0].Timestamp
});
- 如@Enigmativity 所述,最好将每个元素与下一个元素进行比较。因此,为此我们可以使用 Zip 方法:
以下 8 对结果(学生 10、学生 11)(学生 11、学生 12)、(学生 12、学生 13)、(学生 13、学生 14)、(学生 14、学生 15)、( 15 号学生,16 号学生),(16 号学生,17 号学生),(17 号学生,18 号学生):
var admissionObservable = Observable
.FromEventPattern<StudentAdmittedEventArgs>(school, "StudentAdmitted")
.Timestamp();
admissionObservable
.Zip(admissionObservable.Skip(1), (a, b) => Tuple.Create(a,b))
.Where(pair => pair.Item2.Timestamp - pair.Item1.Timestamp <= timeSpan)
.Select(pair => new JoiningData
{
Students = Tuple.Create(pair.Item1.Value.EventArgs.Student, pair.Item2.Value.EventArgs.Student),
School = pair.Item1.Value.EventArgs.School,
Interval = pair.Item2.Timestamp - pair.Item1.Timestamp
});
这是我所做的:
static void ObserveStudentsJoiningWithin(TimeSpan timeSpan)
{
var school = new School("School 1");
var admissionObservable =
Observable.FromEventPattern<StudentAdmittedEventArgs>(school, "StudentAdmitted");
var observable = admissionObservable.TimeInterval()
.Scan<TimeInterval<EventPattern<StudentAdmittedEventArgs>>, StudentPair>(null, (previousPair, current) =>
{
Debug.Print(string.Format($"Student joined after {current.Interval.TotalSeconds} seconds, timeSpan = {timeSpan.TotalSeconds} seconds"));
var pair = new StudentPair();
if (previousPair == null)
{
pair.FirstStudent = null;
pair.SecondStudent = current.Value.EventArgs.Student;
pair.IntervalBetweenJoining = current.Interval;
pair.School = current.Value.EventArgs.School;
return pair;
}
if (current.Interval <= timeSpan)
{
pair.FirstStudent = previousPair.SecondStudent;
pair.SecondStudent = current.Value.EventArgs.Student;
pair.IntervalBetweenJoining = current.Interval;
pair.School = current.Value.EventArgs.School;
return pair;
}
else
{
return default(StudentPair);
}
})
.Where(p => (p != default(StudentPair)) && (p.FirstStudent != null));
var subscription = observable.Subscribe(StudentPairValueHandler);
school.FillWithStudents(4, TimeSpan.FromSeconds(1));
school.FillWithStudents(2, TimeSpan.FromSeconds(10));
school.FillWithStudents(3, TimeSpan.FromSeconds(2));
school.FillWithStudents(2, TimeSpan.FromSeconds(5));
school.FillWithStudents(5, TimeSpan.FromSeconds(0.6));
Console.WriteLine("Press any key to exit the program");
Console.ReadKey();
subscription.Dispose();
}
static void StudentPairValueHandler(StudentPair pair)
{
if (pair != null && pair.FirstStudent != null)
{
Console.WriteLine($"{pair.SecondStudent.Name} joined {pair.School.Name} {Math.Round(pair.IntervalBetweenJoining.TotalSeconds, 2)} seconds after {pair.FirstStudent.Name}.");
}
}
...
public class StudentPair
{
public Student FirstStudent;
public Student SecondStudent;
public School School;
public TimeSpan IntervalBetweenJoining;
}
public static class Extensions
{
public static void FillWithStudents(this School school, int howMany)
{
FillWithStudents(school, howMany, TimeSpan.Zero);
}
public static void FillWithStudents(this School school, int howMany, TimeSpan gapBetweenEachAdmission)
{
if (school == null)
throw new ArgumentNullException("school");
if (howMany < 0)
throw new ArgumentOutOfRangeException("howMany");
if (howMany == 1)
{
school.AdmitStudent(Student.CreateRandom());
return;
}
for (int i = 0; i < howMany; i++)
{
Thread.Sleep((int)gapBetweenEachAdmission.TotalMilliseconds);
school.AdmitStudent(Student.CreateRandom());
}
}
public async static void FillWithStudentsAsync(this School school, int howMany, TimeSpan gapBetweenEachAdmission)
{
if (school == null)
throw new ArgumentNullException("school");
if (howMany < 0)
throw new ArgumentOutOfRangeException("howMany");
if (howMany == 1)
{
school.AdmitStudent(Student.CreateRandom());
return;
}
for (int i = 0; i < howMany; i++)
{
await Task.Delay(gapBetweenEachAdmission);
school.AdmitStudent(Student.CreateRandom());
}
}
}
你能试试看它是否能满足你的需求吗?
IObservable<EventPattern<StudentAdmittedEventArgs>[]> observable =
admissionObservable
.Publish(pxs =>
pxs
.Window(pxs, x => Observable.Timer(timeSpan))
.Select(ys => ys.Take(2)))
.SelectMany(ys => ys.ToArray())
.Where(ys => ys.Skip(1).Any());
在这个纯粹为了练习而编造的例子中,这是我想要的 return:
如果两个学生在指定的时间段内加入学校,比如 2 秒,那么我想要一个数据结构 return包含两个学生、他们加入的学校以及他们加入的时间间隔.
我一直在思考这些问题:
class Program
{
static void Main(string[] args)
{
ObserveStudentsJoiningWithin(TimeSpan.FromSeconds(2));
}
static void ObserveStudentsJoiningWithin(TimeSpan timeSpan)
{
var school = new School("School 1");
var admissionObservable =
Observable.FromEventPattern<StudentAdmittedEventArgs>(school, "StudentAdmitted");
var observable = admissionObservable.TimeInterval()
.Scan((current, next) =>
{
if (next.Interval - current.Interval <= timeSpan)
{
// But this won't work for me because
// this requires me to return a TSource
// and not a TResult
}
});
var subscription = observable.Subscribe(TimeIntervalValueHandler);
school.FillWithStudentsAsync(10, TimeSpan.FromSeconds(3));
school.FillWithStudentsAsync(8, TimeSpan.FromSeconds(1));
Console.WriteLine("Press any key to exit the program");
Console.ReadKey();
subscription.Dispose();
}
}
这是域名:
using System;
using System.Collections;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace SchoolManagementSystem
{
public class Student
{
private static int _studentNumber;
public Student(string name)
{
Name = name;
}
public string Name { get; set; }
public static Student CreateRandom()
{
var name = string.Format($"Student {++_studentNumber}");
return new Student(name);
}
public override string ToString()
{
return Name;
}
}
public class School: IEnumerable<Student>
{
private List<Student> _students;
public event StudentAdmitted StudentAdmitted;
public string Name { get; set; }
public School(string name)
{
Name = name;
_students = new List<Student>();
}
public void AdmitStudent(Student student)
{
if (!_students.Contains(student))
{
_students.Add(student);
OnStudentAdmitted(this, student);
}
}
protected virtual void OnStudentAdmitted(School school, Student student)
{
var args = new StudentAdmittedEventArgs(school, student);
StudentAdmitted?.Invoke(this, args);
}
public IEnumerator<Student> GetEnumerator()
{
return _students.GetEnumerator();
}
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
}
public delegate void StudentAdmitted(object sender, StudentAdmittedEventArgs args);
public class StudentAdmittedEventArgs : EventArgs
{
public StudentAdmittedEventArgs(School school, Student student): base()
{
School = school;
Student = student;
}
public School School { get; protected set; }
public Student Student { get; protected set; }
}
public static class Extensions
{
public async static void FillWithStudentsAsync(this School school, int howMany, TimeSpan gapBetweenEachAdmission)
{
if (school == null)
throw new ArgumentNullException("school");
if (howMany < 0)
throw new ArgumentOutOfRangeException("howMany");
if (howMany == 1)
{
school.AdmitStudent(Student.CreateRandom());
return;
}
for (int i = 0; i < howMany; i++)
{
await Task.Delay(gapBetweenEachAdmission);
school.AdmitStudent(Student.CreateRandom());
}
}
}
}
但是,Scan
运算符只允许我 return 同一个 TSource
的可观察对象。 Select
也不会在这里工作,因为我无法 展望未来 (我可以用 Scan
做的事情)并将当前项目与下一个项目一起投影,尽管 Select
允许我将 TSource
转换为 TResult
.
我正在寻找介于两者之间的东西。
- 对于成对比较(原始 - 将当前项目与下一个项目一起投影)您可以使用 Buffer 方法来构建成对序列。
- 为了找出学生加入之间的间隔,使用 Timestamp instead of TimeInterval 方法可能更有用,因为以下行
next.Interval - current.Interval <= timeSpan
。你真正想要的是pair[1].Timestamp - pair[0].Timestamp <= timeSpan
以下 4 对结果(学生 11、学生 12)、(学生 13、学生 14)、(学生 15、学生 16)、(学生 17、学生 18):
var admissionObservable = Observable
.FromEventPattern<StudentAdmittedEventArgs>(school, "StudentAdmitted")
.Timestamp()
.Buffer(2)
.Where(pair => pair[1].Timestamp - pair[0].Timestamp <= timeSpan)
.Select(pair => new JoiningData
{
Students = Tuple.Create(pair[0].Value.EventArgs.Student, pair[1].Value.EventArgs.Student),
School = pair[0].Value.EventArgs.School,
Interval = pair[1].Timestamp - pair[0].Timestamp
});
- 如@Enigmativity 所述,最好将每个元素与下一个元素进行比较。因此,为此我们可以使用 Zip 方法:
以下 8 对结果(学生 10、学生 11)(学生 11、学生 12)、(学生 12、学生 13)、(学生 13、学生 14)、(学生 14、学生 15)、( 15 号学生,16 号学生),(16 号学生,17 号学生),(17 号学生,18 号学生):
var admissionObservable = Observable
.FromEventPattern<StudentAdmittedEventArgs>(school, "StudentAdmitted")
.Timestamp();
admissionObservable
.Zip(admissionObservable.Skip(1), (a, b) => Tuple.Create(a,b))
.Where(pair => pair.Item2.Timestamp - pair.Item1.Timestamp <= timeSpan)
.Select(pair => new JoiningData
{
Students = Tuple.Create(pair.Item1.Value.EventArgs.Student, pair.Item2.Value.EventArgs.Student),
School = pair.Item1.Value.EventArgs.School,
Interval = pair.Item2.Timestamp - pair.Item1.Timestamp
});
这是我所做的:
static void ObserveStudentsJoiningWithin(TimeSpan timeSpan)
{
var school = new School("School 1");
var admissionObservable =
Observable.FromEventPattern<StudentAdmittedEventArgs>(school, "StudentAdmitted");
var observable = admissionObservable.TimeInterval()
.Scan<TimeInterval<EventPattern<StudentAdmittedEventArgs>>, StudentPair>(null, (previousPair, current) =>
{
Debug.Print(string.Format($"Student joined after {current.Interval.TotalSeconds} seconds, timeSpan = {timeSpan.TotalSeconds} seconds"));
var pair = new StudentPair();
if (previousPair == null)
{
pair.FirstStudent = null;
pair.SecondStudent = current.Value.EventArgs.Student;
pair.IntervalBetweenJoining = current.Interval;
pair.School = current.Value.EventArgs.School;
return pair;
}
if (current.Interval <= timeSpan)
{
pair.FirstStudent = previousPair.SecondStudent;
pair.SecondStudent = current.Value.EventArgs.Student;
pair.IntervalBetweenJoining = current.Interval;
pair.School = current.Value.EventArgs.School;
return pair;
}
else
{
return default(StudentPair);
}
})
.Where(p => (p != default(StudentPair)) && (p.FirstStudent != null));
var subscription = observable.Subscribe(StudentPairValueHandler);
school.FillWithStudents(4, TimeSpan.FromSeconds(1));
school.FillWithStudents(2, TimeSpan.FromSeconds(10));
school.FillWithStudents(3, TimeSpan.FromSeconds(2));
school.FillWithStudents(2, TimeSpan.FromSeconds(5));
school.FillWithStudents(5, TimeSpan.FromSeconds(0.6));
Console.WriteLine("Press any key to exit the program");
Console.ReadKey();
subscription.Dispose();
}
static void StudentPairValueHandler(StudentPair pair)
{
if (pair != null && pair.FirstStudent != null)
{
Console.WriteLine($"{pair.SecondStudent.Name} joined {pair.School.Name} {Math.Round(pair.IntervalBetweenJoining.TotalSeconds, 2)} seconds after {pair.FirstStudent.Name}.");
}
}
...
public class StudentPair
{
public Student FirstStudent;
public Student SecondStudent;
public School School;
public TimeSpan IntervalBetweenJoining;
}
public static class Extensions
{
public static void FillWithStudents(this School school, int howMany)
{
FillWithStudents(school, howMany, TimeSpan.Zero);
}
public static void FillWithStudents(this School school, int howMany, TimeSpan gapBetweenEachAdmission)
{
if (school == null)
throw new ArgumentNullException("school");
if (howMany < 0)
throw new ArgumentOutOfRangeException("howMany");
if (howMany == 1)
{
school.AdmitStudent(Student.CreateRandom());
return;
}
for (int i = 0; i < howMany; i++)
{
Thread.Sleep((int)gapBetweenEachAdmission.TotalMilliseconds);
school.AdmitStudent(Student.CreateRandom());
}
}
public async static void FillWithStudentsAsync(this School school, int howMany, TimeSpan gapBetweenEachAdmission)
{
if (school == null)
throw new ArgumentNullException("school");
if (howMany < 0)
throw new ArgumentOutOfRangeException("howMany");
if (howMany == 1)
{
school.AdmitStudent(Student.CreateRandom());
return;
}
for (int i = 0; i < howMany; i++)
{
await Task.Delay(gapBetweenEachAdmission);
school.AdmitStudent(Student.CreateRandom());
}
}
}
你能试试看它是否能满足你的需求吗?
IObservable<EventPattern<StudentAdmittedEventArgs>[]> observable =
admissionObservable
.Publish(pxs =>
pxs
.Window(pxs, x => Observable.Timer(timeSpan))
.Select(ys => ys.Take(2)))
.SelectMany(ys => ys.ToArray())
.Where(ys => ys.Skip(1).Any());