使用 RxJava Observables 合并并行下载图像的地图

Combining to a Map of parallelly downloaded images with RxJava Observables

我正在尝试并行下载图像列表,将它们组合成地图。

起初我尝试像这样制作一个 Observable:

Observable<Map<Integer, Bitmap>> getImages(final List<Activity> activities) {
    return Observable.create(new Observable.OnSubscribe<Map<Integer, Bitmap>>() {
        @Override
        public void call(Subscriber<? super Map<Integer, Bitmap>> subscriber) {
            try {
                Map<Integer, Bitmap> result = new HashMap<Integer, Bitmap>();
                for (Activity act : activities) {
                    result.put(act.getId(), downloadImage(act.getImage()));
                }
                subscriber.onNext(result);
                subscriber.onCompleted();
            } catch (Exception e) {
                subscriber.onError(e);
            }
        }
    });
}

这可行,但不是我想要的。因为图像是按顺序下载的。而且我认为 for 循环在 rxjava 中并不好。所以我创建了这些 Observables:

    Observable<Bitmap> getImage(final Activity activity) {
    return Observable.create(new Observable.OnSubscribe<Bitmap>() {
        @Override
        public void call(Subscriber<? super Bitmap> subscriber) {
            try {
                subscriber.onNext(downloadImage(activity.getImage()));
                subscriber.onCompleted();
            } catch (Exception e) {
                subscriber.onError(e);
            }
        }
    });
}

Observable<Map<Integer, Bitmap>> getImages(final List<Activity> activities) {
    return Observable
        .from(activities)
        .flatMap(new Func1<Activity, Observable<Bitmap>>() {
            @Override
            public Observable<Bitmap> call(Activity activity) {
                return getImage(activity);
            }
        })
        .toMap(new Func1<Bitmap, Integer>() {
            @Override
            public Integer call(Bitmap bitmap) {
                return 1; // How am I supposed to get the activity.getId()?
            }
        });
}

所以我制作了一个 Observable 来获取单个图像,并尝试使用 flatMap 将它们组合到第二个图像中。这可行,但仍然存在 2 个问题:

  1. 当我执行 toMap() 时,如何检索正确的 ID 以用作地图的键?我想为此使用 Activity 对象。
  2. 不幸的是,下载仍在按顺序而非并行处理。我该如何解决这个问题?

创建一个包含位图和 Activity 的包装器 class。说 ActivityBitmap。将 getImage 替换为 getActivityBitmap

Observable<ActivityBitmap> getActivityBitmap(final Activity activity) {
    return Observable.create(new Observable.OnSubscribe<ActivityBitmap>() {
        @Override
        public void call(Subscriber<? super ActivityBitmap> subscriber) {
            try {
                subscriber.onNext(new ActivityBitmap(activity, downloadImage(activity.getImage())));
                subscriber.onCompleted();
            } catch (Exception e) {
                subscriber.onError(e);
            }
        }
    });
}

并像下面这样称呼它。请注意,要获得异步下载,您可以在 flatMap 中使用 subscribeOn。要在最后构建 Map<Integer,Bitmap>,您可以使用 toMap 的不同重载,它允许您指定键和值。

Observable<Map<Integer, Bitmap>> getImages(final List<Activity> activities) {
    return Observable
        .from(activities)
        .flatMap(new Func1<Activity, Observable<ActivityBitmap>>() {
            @Override
            public Observable<ActivityBitmap> call(Activity activity) {
                return getActivityBitmap(activity).subscribeOn(Schedulers.io());
            }
        })
        .toMap(new Func1<ActivityBitmap, Integer>() {
            @Override
            public Integer call(ActivityBitmap activityBitmap) {
                return activityBitmap.getActivity().getId();
            }
        },new Func1<ActivityBitmap, Bitmap>() {
            @Override
            public Integer call(ActivityBitmap activityBitmap) {
                return activityBitmap.getBitmap();
            }
        });
}

我有一个可能的解决方案。它使用 reduce 运算符转换为地图。不过,我不确定在一个 Observable 中订阅一个 Observable 是不是一个好习惯。

Observable<Bitmap> getImage(final Activity activity) {
    return Observable.create(new Observable.OnSubscribe<Bitmap>() {
        @Override
        public void call(Subscriber<? super Bitmap> subscriber) {
            try {
                subscriber.onNext(downloadImage(activity.getImage()));
                subscriber.onCompleted();
            } catch (Exception e) {
                subscriber.onError(e);
            }
        }
    });
}

Observable<HashMap<Integer, Bitmap>> getImages(final List<Activity> activities) {
    return Observable
        .from(activities)
        .reduce(new HashMap<Integer, Bitmap>(), new Func2<HashMap<Integer, Bitmap>, Activity, HashMap<Integer, Bitmap>>() {
            @Override
            public HashMap<Integer, Bitmap> call(final HashMap<Integer, Bitmap> bitmaps, final Activity activity) {
                getImage(activity)
                    .observeOn(Schedulers.io())
                    .subscribeOn(Schedulers.io())
                    .subscribe(new Action1<Bitmap>() {
                        @Override
                        public void call(Bitmap bitmap) {
                            bitmaps.put(activity.getId(), bitmap);
                        }
                    });
                return bitmaps;
            }
        });
}

非常感谢对此解决方案的反馈。

我会这样:

Observable<Map<Integer, Bitmap>> getImages(List<Activity> activities) {
    return Observable.from(activities)
          .map(activity -> new Pair(activity.getId(), downloadImage(activity.getImage())))
          .toMap(pair -> pair.first, pair -> pair.second);
}

(注意:我使用 retrolambda,因此使用 lambda)

显然,类似的东西应该是平行的:

Observable.from(activities)
          .flatMap(activity ->
                Observable.zip(Observable.just(activity.getId(),
                               downloadImage(activity.getImage())),
                (k, v) -> new Pair(k, v)))
          .toMap(pair -> pair.first, pair -> pair.second);

(前提是downloadImage returns一个异步的Observable