EventHandler 到 Reactive Extensions,其中事件处理程序是短暂的生活方式

EventHandler to Reactive Extensions where event handler is transient lifestyle

我需要桥接我的消息传递框架,以便在事件发生时(包含名为 ReadingStream 的成员的 TimeAggregate)我可以传递到 Reactive Extensions 并做一些奇特的事情:

public class TestHandler : ITopicNotificationHandler<TimeAggregate>
{
    public TestHandler(/* singleton variables are injected here */)
    {
    }

    public async Task Handle(TimeAggregate notification, string topic, CancellationToken cancellationToken)
    {
        // notification contains TimeAggregate.Reading (which is a decimal)
        // and TimeAggregate.Stream (which is a string, i.e. Office1, OfficeA etc)
        // I want to perform an average on TimeAggregate.Reading but split by TimeAggregate.Reading
    }
}

我将如何链接两者(事件处理程序和 Reactive Extension)并拆分,以便每个 TimeAggregate.Stream 键独立平均每个阅读名称(各种平均可观察值的字典)?

终身详细信息

还有一个技术方面,每次调用事件处理程序 (TestHandler) 时,它都是从头开始构建的,TestHandler 的生命周期是短暂的。

我可以通过制作一个静态注册管理器(IoC 将其注入到 TestHandler 构造函数中,或者我们在字面上将其称为静态)来解决这个问题。

一种方法是注入一个可以在事件处理程序中调用的单例生命周期服务。它可以使用 Subject 生成可观察对象并公开/利用 运行 平均消息的可观察对象:

public class MonitoringService : IMonitoringService
{
    private Subject<TimeAggregate> _subject;

    // Calculate tuples of (key, average)
    public IObservable<(string, decimal)> Averages => _subject
        // Group by key
        .GroupBy(s => s.Group)
        .SelectMany(g => g
            .Select(g => g.Reading)
            // Collect element count and running total
            .Scan((elements: 0, total: (decimal)0), (agg, v) => (agg.elements + 1, agg.total + v))
            // Calculate running average
            .Select(t => t.total / t.elements)
            // Associate key and average for SelectMany
            .Select(average => (g.Key, average)));

    public MonitoringService()
    {
        _subject = new Subject<TimeAggregate>();
    }
    public void PostNotification(TimeAggregate notification)
    {
        _subject.OnNext(notification);
    }
}

然后可以这样使用:

var monitoringService = new MonitoringService();
var handler = new TestHandler(monitoringService);

var dict = new Dictionary<string, decimal>();
monitoringService.Averages.Subscribe<(string group, decimal average)>(t =>
{
    // Do something with running average. In this case populate a dictionary
    dict[t.group] = t.average;
});

注意:如果你想在观察者被附加之前开始计算平均值,你可能想要查看 Connectables(Publish 等)并且你会想要避免像这样使用 get 只生成 属性 而是在构造函数中分配一次。但请记住,这种方法只会将更新推送到平均值,因此如果消息不多,则平均消息将被延迟。 例如

public class MonitoringService : IMonitoringService
{
    private Subject<TimeAggregate> _subject;

    // Calculate tuples of (key, average)
    public IObservable<(string, decimal)> Averages { get; }

    public MonitoringService()
    {
        _subject = new Subject<TimeAggregate>();
        Averages = _subject
        // Group by key
        .GroupBy(s => s.Group)
        .SelectMany(g => g
            .Select(g => g.Reading)
            // Collect element count and running total
            .Scan((elements: 0, total: (decimal)0), (agg, v) => (agg.elements + 1, agg.total + v))
            // Calculate running average
            .Select(t => t.total / t.elements)
            // Associate key and average for SelectMany
            .Select(average => (g.Key, average)))
        .Publish()
        // Connect immediately
        .AutoConnect(0);
    }
    public void PostNotification(TimeAggregate notification)
    {
        _subject.OnNext(notification);
    }
}

完整示例:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    class Program
    {
        public static async Task Main()
        {
            var monitoringService = new MonitoringService();
            var handler = new TestHandler(monitoringService);

            var dict = new Dictionary<string, decimal>();
            monitoringService.Averages.Subscribe<(string group, decimal average)>(t =>
            {
                // Do something with running average. In this case populate a dictionary
                dict[t.group] = t.average;
            });

            await handler.Handle(new TimeAggregate
            {
                Group = "Test1",
                Reading = 100
            }, "Test", CancellationToken.None);

            await handler.Handle(new TimeAggregate
            {
                Group = "Test1",
                Reading = 200
            }, "Test", CancellationToken.None);

            await handler.Handle(new TimeAggregate
            {
                Group = "Test2",
                Reading = 200
            }, "Test", CancellationToken.None);

            await handler.Handle(new TimeAggregate
            {
                Group = "Test2",
                Reading = 300
            }, "Test", CancellationToken.None);
        }  
    }

    public class MonitoringService : IMonitoringService
    {
        private Subject<TimeAggregate> _subject;

        // Calculate tuples of (key, average)
        public IObservable<(string, decimal)> Averages => _subject
            // Group by key
            .GroupBy(s => s.Group)
            .SelectMany(g => g
                .Select(g => g.Reading)
                // Collect element count and running total
                .Scan((elements: 0, total: (decimal)0), (agg, v) => (agg.elements + 1, agg.total + v))
                // Calculate running average
                .Select(t => t.total / t.elements)
                // Associate key and average for SelectMany
                .Select(average => (g.Key, average)));

        public MonitoringService()
        {
            _subject = new Subject<TimeAggregate>();
        }
        public void PostNotification(TimeAggregate notification)
        {
            _subject.OnNext(notification);
        }
    }

    public class TestHandler : ITopicNotificationHandler<TimeAggregate>
    {
        private readonly IMonitoringService _monitoringService;

        public TestHandler(IMonitoringService monitoringService)
        {
            _monitoringService = monitoringService;
        }

        public Task Handle(TimeAggregate notification, string topic, CancellationToken cancellationToken)
        {
            _monitoringService.PostNotification(notification);
            return Task.CompletedTask;
        }
    }

    public interface IMonitoringService
    {
        void PostNotification(TimeAggregate notification);
        IObservable<(string group, decimal average)> Averages { get; }
    }

    public class TimeAggregate
    {
        public string Group { get; set; }
        public decimal Reading { get; set; }
    }

    public interface ITopicNotificationHandler<T>
    {
    }
}