具有 Observable 的复杂流程

Complex flow with Observables

我有一系列图像 (IObservable<ImageSource>) 通过此 "pipeline"。

  1. 每张图片都使用 OCR 识别
    • 如果结果具有 有效 值,则将它们上传到可以在给定时间(不是同时)注册一组结果的服务。
    • 如果结果有任何无效值,它们将呈现给用户以修复它们。修复后,该过程继续。
  2. 在此过程中,UI 应保持响应。

问题是我不知道如何处理用户必须交互的情况。我就是做不到这个

        subscription = images                
            .Do(source => source.Freeze())
            .Select(image => OcrService.Recognize(image))                
            .Subscribe(ocrResults => Upload(ocrResults));

...因为 当 ocrResults 必须由用户修复时,流程应保持暂停,直到接受有效值(即用户可以执行命令单击按钮)

怎么说:如果结果无效,请等到用户修复它们?

我假设你的 UploadAsync 方法 returns 一个 Task 允许你等待它完成?如果是这样,则有处理任务的 SelectMany 重载。

images.Select(originalImage => ImageOperations.Resize(originalImage))
    .SelectMany(resizedImg => imageUploader.UploadAsync(resizedImg))
    .Subscribe();

假设您有一个实现 "user fix process":

的异步方法
/* show the image to the user, which fixes it, returns true if fixed, false if should be skipped */
async Task UserFixesTheOcrResults(ocrResults);

然后您的可观察值变为:

subscription = images                
        .Do(source => source.Freeze())
        .Select(image => OcrService.Recognize(image))
        .Select(ocrResults=> {
            if (ocrResults.IsValid)
                return Observable.Return(ocrResults);
            else
                return UserFixesTheOcrResults(ocrResults).ToObservable().Select(_ => ocrResults)
        })
        .Concat()             
        .Subscribe(ocrResults => Upload(ocrResults));

这似乎是 UX、WPF 和 Rx 的混合体,所有这些都包含在一个问题中。试图只用 Rx 来解决它可能会让你陷入困境。我相信你 可以 只用 Rx 解决它,而不用再考虑它,但你愿意吗?它是否可测试、松散耦合且易于维护?

根据我对问题的理解,您必须执行以下步骤

  1. 用户Uploads/Selects一些图片
  2. 系统对每张图片进行OCR
  3. 如果OCR工具认为图片来源有效,则上传处理结果
  4. 如果OCR工具认为图片来源无效,用户"fixes"结果和上传结果

但这可能更好地描述为

  1. 用户Uploads/Selects一些图片
  2. 系统对每张图片进行OCR
  3. OCR 的结果放入验证队列
  4. 虽然结果无效,但用户需要手动将其更新为有效状态。
  5. 上传有效结果

所以在我看来,您需要一个基于 task/queue 的 UI,以便用户可以看到他们需要处理的无效 OCR 结果。这也告诉我,如果涉及到一个人,那么它可能应该在 Rx 查询之外。

第 1 步 - 执行 ORC

subscription = images                
        .Subscribe(image=>
        {
          //image.Freeze() --probably should be done by the source sequence
          var result = _ocrService.Recognize(image);
          _validator.Enqueue(result);
        });

第 2 步 - 验证结果

//In the Enqueue method, if queue is empty, ProcessHead();
//Else add to queue.
//When Head item is updated, ProcessHead();
//ProcessHead method checks if the head item is valid, and if it is uploads it and remove from queue. Once removed from queue, if(!IsEmpty) {ProcessHead();}


//Display Head of the Queue (and probably queue depth) to user so they can interact with it.

第 3 步 - 上传结果

Upload(ocrResults)

所以这里的Rx只是我们武器库中的一个工具,而不是需要解决所有问题的一把锤子。我发现大多数 "Rx" 问题的规模都在增长,Rx 只是充当各种 Queue 结构的入口点和出口点。这允许我们使系统中的排队显式而不是隐式(即隐藏在 Rx 运算符内部)。