可观察事件

Observable events

我有这段代码,原始代码 here,我需要修改以在字符串列表中插入输出字符串。

我该怎么做?

    private static List<string> list_of_proc = new List<string>();

    static void Run<T>(Func<IObservable<T>> f, Action a)
    {
        Action<string> messageTarget = (x) => list_of_proc.Add(x);
        using (f().Subscribe(t => Console.WriteLine(t), p => messageTarget(p)))
        {
            Console.ReadLine();
            Console.WriteLine();
        }
    }

我的动作 messageTarget 没有编译。

输出错误:

1>Program.cs(20,34,20,59): error CS1660: Cannot convert lambda expression to type 'System.IObserver' because it is not a delegate type
1>Program.cs(20,61,20,82): error CS1593: Delegate 'System.Action' does not take 1 arguments
1>Program.cs(20,61,20,82): error CS1660: Cannot convert lambda expression to type 'System.Threading.CancellationToken' because it is not a delegate type
1>Program.cs(20,66,20,82): error CS1594: Delegate 'System.Action' has some invalid arguments
1>Program.cs(20,80,20,81): error CS1503: Argument 1: cannot convert from 'System.Exception' to 'string'

此代码编译正常,没有错误:

        static void Run<T>(Func<IObservable<T>> f, Action a)
    {
        Action<string> messageTarget = (x) => list_of_proc.Add(x);
        using (f().Subscribe(t => Console.WriteLine(t)/*, p => messageTarget(p)*/))
        {
            Console.ReadLine();
            Console.WriteLine();
        }
    }

订阅具有这种形式(System.Reactive.Core.dll,v2.2.5.0):

public static IDisposable Subscribe(this IObservable source, Action onNext);

这个正确的代码:

    static void Run<T>(Func<IObservable<T>> f, Action a)
    {
        Action<string> messageTarget = (x) => list_of_proc.Add(x);
        using (f().Subscribe( t => messageTarget(t.ToString())))
        {
            Console.ReadLine();
            Console.WriteLine();
        }
    }

您将 IObserver 用作函数的第一个参数,但您实际上给了它一个委托,而编译器告诉您它不是委托。 就这么简单。

您的Subscribe方法是扩展方法this 参数隐式是调用该方法的对象。这意味着您应该只将 Action 委托作为唯一参数传递。

f().Subscribe(() => Console.WriteLine("Hello"))

IObservable<T>的扩展方法有很多.Subscribe(...)种。编译器会尝试找到最合适的重载。在您的情况下,它按以下顺序尝试这些重载:

IDisposable Subscribe<T>(this IObservable<T> source)
IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext, Action onCompleted)
void Subscribe<T>(this IObservable<T> source, Action<T> onNext, CancellationToken token)
IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext, Action<Exception> onError)

然而,你的电话是这样的:

.Subscribe(this IObservable<T> source, Action<T> onNext, Action<string> ???)

None 匹配,但正在尝试为您提供有用的提示:

Cannot convert lambda expression to type 'System.IObserver<T>' because it is not a delegate type
Delegate 'System.Action' does not take 1 arguments
Cannot convert lambda expression to type 'System.Threading.CancellationToken' because it is not a delegate type
Delegate 'System.Action<string>' has some invalid arguments

每个有效地都在尝试匹配上述重载之一。

您的代码中不清楚的是,当您 运行 代码时 T 的实际类型是什么。为 T 类型的 t 调用 Console.WriteLine(t) 意味着您可能会强制 t.ToString() 输出结果 - 这并不总是产生有意义的东西。

然后用 Action<string> 跟进 Action<T> 也没有意义,因为 string 不是 Exception 并且 [= 没有过载16=] 需要明确的 Action<string>.

我想你需要的是这样的:

static IObservable<IList<string>> Run<T>(Func<IObservable<T>> f)
{
    return Observable
        .Defer(f)
        .Select(t => t.ToString())
        .ToArray()
        .Select(ss => ss.ToList());
}

或者,也许是这样:

static IObservable<IList<string>> Run<T>(Func<IObservable<T>> f, Func<T, string> toString)
{
    return Observable
        .Defer(f)
        .Select(t => toString(t))
        .ToArray()
        .Select(ss => ss.ToList());
}

然后你可以这样称呼它:

Run<int>(() => Observable.Range(0, 3), n => n.ToString())
    .Subscribe(results => /* handle list of results */);