可观察 |订阅以仅获取更改的对象

Observable | Subscribe to get only the changed objects

我有一个从 Realtime-Database (Firebase) with a wrapper in C# Firebase.Xamarin

获得的对象列表
private List<Job> jobList = null;

首先使用以下代码加载应用程序:

private async void PopulateList()
{
     IReadOnlyCollection<Firebase.Xamarin.Database.FirebaseObject<Job>>  items = await firebase
        .Child("jobs")
        .OnceAsync<Job>();
    jobList = new List<Job>();
    foreach (var item in items)
    {
         jobList.Add(
             new Job 
             { 
                  ID = item.Object.ID, 
                  StartDate = item.Object.StartDate, 
                  EndDate = item.Object.EndDate, 
                  Description = item.Object.Description 
              });
        }
    SubscribeDbChanges();
}

我想订阅 DB 以触发事件和 change/add 新对象到列表并触发事件后显示或通知用户一次,已发生更改。为此,我使用 Observable 即 Reactive Rx.Net with All 通过以下方式:

private void SubscribeDbChanges()
{
     Observable.All<FirebaseEvent<Job>>(
          firebase
              .Child("jobs")
              .AsObservable<Job>(), 
          job => !jobList
              .Contains(job.Object))
          .Subscribe(jobItem =>
              {
              });
}

代码有问题吗?此外,我应该在哪里创建更改已到达的事件?

首先我建议去掉 foreach/Add,这是 Select

的工作
private async void PopulateList()
{
    jobList = (await firebase
        .Child("jobs")
        .OnceAsync<Job>())
        .Select(item =>
             new Job 
             { 
                  ID = item.Object.ID, 
                  StartDate = item.Object.StartDate, 
                  EndDate = item.Object.EndDate, 
                  Description = item.Object.Description 
              });
    SubscribeDbChanges();
}

那我就用Where。你如何使用 All 很奇怪,它是一个扩展方法,你像通常的静态方法一样调用它。这是可能的,但现在应该如何使用它。这是 Where:

的代码
private void SubscribeDbChanges()
{
    firebase
        .Child("jobs")
        .AsObservable<Job>()
        .Where(job => !jobList.Contains(job.Object))
        .Subscribe(jobItem =>
            {
            });
}

感谢@Alexey Zimarev 让我快速点击,流畅的代码出现了: 首先将 ConcurrentDictionary 作为数据成员并初始化 Firebaseclient 然后填充列表 ;

FirebaseClient firebase;
private ConcurrentDictionary<string, Job> jobList ;
protected override void OnCreate(Bundle bundle)
{
    base.OnCreate(bundle);
    SetContentView(Resource.Layout.Main);
    firebase = new FirebaseClient("https://samplehosting-XXXX.firebaseio.com/");
    PopulateList();
}

在这个函数里面我初始化了字典并完成了UI的工作,最后分别设置监听器监听SubscribeToDbChanges

private async void PopulateList()
{
    IReadOnlyCollection<Firebase.Xamarin.Database.FirebaseObject<Job>> items = await firebase
        .Child("jobs")
        .OnceAsync<Job>();
    jobList = new ConcurrentDictionary<string, Job>();

    foreach (var job in items)
    {
        while (!jobList.TryAdd(job.Object.ID, job.Object)) ;
    }
    list = FindViewById<ListView>(Resource.Id.listJobs);
    list.ChoiceMode = ChoiceMode.Single;
    HomeScreenAdapter ListAdapter = new HomeScreenAdapter(this, jobList);
    list.Adapter = ListAdapter;
    SubscribeToDbChanges();


}

这里我为字典中不可用的键设置插入观察者,然后设置删除 字典中可用键 的观察者。

private void SubscribeToDbChanges()
{
    firebase
    .Child("jobs").AsObservable<Job>()
    .Where(job => !jobList.ContainsKey(job.Object.ID) && job.EventType == Firebase.Xamarin.Database.Streaming.FirebaseEventType.InsertOrUpdate)
    .Subscribe(job =>
    {
        if (job.EventType == Firebase.Xamarin.Database.Streaming.FirebaseEventType.InsertOrUpdate)
        {
            while (!jobList.TryAdd(job.Object.ID, job.Object)) ;
        }
    });
    firebase
    .Child("jobs").AsObservable<Job>()
    .Where(job => jobList.ContainsKey(job.Object.ID) && job.EventType == Firebase.Xamarin.Database.Streaming.FirebaseEventType.Delete)
    .Subscribe(job =>
    {
        Thread remove = new Thread(() =>
        {
            Job removed = null;
            jobList.TryRemove(job.Object.ID, out removed);
        });
        remove.Start();
    });

}

PS:假设在这里,我假设我们在上面的问题和答案中添加的对象由于存在内存而没有得到比较。就像一个新对象被实例化一样,它可能与列表中已经存在的克隆不同。如果我在 Linq 行为方面有误,请纠正我。但是,上面给出的代码适用于我使用字典的场景。