RxJava OutOfMemory 运行-time crash 无法创建线程

RxJava OutOfMemory run-time crash unable to create thread

我正在使用 Retrofit2+RxJava+RxAndroid 进行网络调用。

我试过不同的 Schedulers 同样 io()newThread()。我知道使用 newThread() 的后果,但应用程序仍然是 OOM 的关闭原因。

场景:

以上网络调用出现65次。

代码:

/*form version download method*/
public void startFormDownload(List<Form> form, ApiClient apiClient) {
    Observable.fromIterable(form)
            .concatMapIterable(Form::getFormVersions) // get form version list from single form object
            .doOnSubscribe(disposable -> AppLogger.i(tag, "Form Versions download is subscribed")) // subscribe process to rx
            .filter(this::checkFormVersionToDownloadOrNot) // only get those versions who needed to downloaded
            .doOnNext(formVersion -> { // get formVersion object

                AppLogger.i(tag, "download this form ---------> " + formVersion.getFormUrl());
                AppLogger.i(tag, formVersion.getFormUrl());
                String constructURL = formVersion.getFormUrl();

                /* download form gzip process starts from here */
                apiClient.getJsonByFormURL(constructURL)
                        .subscribeOn(Schedulers.newThread())
                        .observeOn(AndroidSchedulers.mainThread())
                        .subscribe(new SingleObserver<Response<ResponseBody>>() {
                            @Override
                            public void onSubscribe(Disposable disposable) {
                                AppLogger.i(tag, "gzip download ->" + "subscribed");
                            }

                            @Override
                            public void onSuccess(Response<ResponseBody> responseBodyResponse) {
                                AppLogger.i(tag, "gzip downloaded " + " success");
                                AppLogger.i(tag, "gzip downloaded ->" + responseBodyResponse.toString());

                                if (responseBodyResponse.code() == 401) {
                                    //not authorized
                                    return;
                                }

                                ByteArrayInputStream bais = null;
                                if (responseBodyResponse.body() != null)
                                    try {
                                        bais = new ByteArrayInputStream(responseBodyResponse.body().bytes());

                                        GZIPInputStream gzis = new GZIPInputStream(bais);
                                        InputStreamReader reader = new InputStreamReader(gzis);
                                        BufferedReader in = new BufferedReader(reader);

                                        String readed;
                                        StringBuilder stringBuilder = new StringBuilder();
                                        while ((readed = in.readLine()) != null) {
                                            stringBuilder.append(readed);
                                        }

                                        baseRealm = Realm.getDefaultInstance();
                                        formVersion.setJsonString(stringBuilder.toString());
                                        baseRealm.executeTransaction(realm ->
                                                baseRealm.copyToRealmOrUpdate(formVersion));



                                    } catch (IOException e) {
                                        AppLogger.e(tag, "gzip extract exception->" + e.getLocalizedMessage(), e);
                                    }
                            }

                            @Override
                            public void onError(Throwable throwable) {
                                AppLogger.e(tag, "gzip failed->" + throwable.getMessage(), throwable);
                            }
                        });


            })
            .doOnTerminate(() -> AppLogger.i(tag, "Form Versions terminated"))
            .doOnError(Throwable::printStackTrace)
            .subscribe(
                    formVersion -> AppLogger.i(tag, "Form Versions download completed"),
                    throwable -> AppLogger.e(tag, throwable.getMessage(), throwable)
            );
}

可能会有InputStream的情况导致OOM,但我不确定这里可以做什么。

PS。 OOM只是有时发生,而不是总是发生。

Crash report

java.lang.OutOfMemoryError: Could not allocate JNI Env
at java.lang.Thread.nativeCreate(Native Method)
at java.lang.Thread.start(Thread.java:730)
at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:941)
at java.util.concurrent.ThreadPoolExecutor.ensurePrestart(ThreadPoolExecutor.java:1582)
at java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:313)
at java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:550)
at java.util.concurrent.ScheduledThreadPoolExecutor.submit(ScheduledThreadPoolExecutor.java:651)
at io.reactivex.internal.schedulers.NewThreadWorker.scheduleActual(NewThreadWorker.java:146)
at io.reactivex.internal.schedulers.NewThreadWorker.schedule(NewThreadWorker.java:51)
at io.reactivex.Scheduler.scheduleDirect(Scheduler.java:135)
at io.reactivex.Scheduler.scheduleDirect(Scheduler.java:111)
at io.reactivex.internal.operators.single.SingleSubscribeOn.subscribeActual(SingleSubscribeOn.java:37)
at io.reactivex.Single.subscribe(Single.java:2779)
at io.reactivex.internal.operators.single.SingleObserveOn.subscribeActual(SingleObserveOn.java:35)
at io.reactivex.Single.subscribe(Single.java:2779)
at com.visualogyx.app.fragments.VisualogyxBaseFragment.lambda$startFormDownload$VisualogyxBaseFragment(VisualogyxBaseFragment.java:623)
at com.visualogyx.app.fragments.VisualogyxBaseFragment$$Lambda.accept(Unknown Source)
at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onNext(ObservableDoOnEach.java:95)
at io.reactivex.internal.operators.observable.ObservableFilter$FilterObserver.onNext(ObservableFilter.java:52)
at io.reactivex.internal.observers.DisposableLambdaObserver.onNext(DisposableLambdaObserver.java:58)
at io.reactivex.internal.operators.observable.ObservableFlattenIterable$FlattenIterableObserver.onNext(ObservableFlattenIterable.java:111)
at io.reactivex.internal.operators.observable.ObservableFromIterable$FromIterableDisposable.run(ObservableFromIterable.java:98)
at io.reactivex.internal.operators.observable.ObservableFromIterable.subscribeActual(ObservableFromIterable.java:58)
at io.reactivex.Observable.subscribe(Observable.java:10910)
at io.reactivex.internal.operators.observable.ObservableFlattenIterable.subscribeActual(ObservableFlattenIterable.java:44)
at io.reactivex.Observable.subscribe(Observable.java:10910)
at io.reactivex.internal.operators.observable.ObservableDoOnLifecycle.subscribeActual(ObservableDoOnLifecycle.java:33)
at io.reactivex.Observable.subscribe(Observable.java:10910)
at io.reactivex.internal.operators.observable.ObservableFilter.subscribeActual(ObservableFilter.java:30)
at io.reactivex.Observable.subscribe(Observable.java:10910)
at io.reactivex.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42)
at io.reactivex.Observable.subscribe(Observable.java:10910)
at io.reactivex.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42)
at io.reactivex.Observable.subscribe(Observable.java:10910)
at io.reactivex.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42)
at io.reactivex.Observable.subscribe(Observable.java:10910)
at io.reactivex.Observable.subscribe(Observable.java:10896)
at io.reactivex.Observable.subscribe(Observable.java:10825)
at com.visualogyx.app.fragments.VisualogyxBaseFragment.startFormDownload(VisualogyxBaseFragment.java:683)
at com.visualogyx.app.fragments.VisualogyxBaseFragment.updateJobAndDownloadCheckListForm(VisualogyxBaseFragment.java:239)
at com.visualogyx.app.fragments.VisualogyxBaseFragment.lambda$null$VisualogyxBaseFragment(VisualogyxBaseFragment.java:221)
at com.visualogyx.app.fragments.VisualogyxBaseFragment$$Lambda.test(Unknown Source)
at io.reactivex.internal.operators.observable.ObservableAllSingle$AllObserver.onNext(ObservableAllSingle.java:69)
at io.reactivex.internal.operators.observable.ObservableFromIterable$FromIterableDisposable.run(ObservableFromIterable.java:98)
at io.reactivex.internal.operators.observable.ObservableFromIterable.subscribeActual(ObservableFromIterable.java:58)
at io.reactivex.Observable.subscribe(Observable.java:10910)
at io.reactivex.internal.operators.observable.ObservableAllSingle.subscribeActual(ObservableAllSingle.java:34)
at io.reactivex.Single.subscribe(Single.java:2779)
at io.reactivex.Single.subscribe(Single.java:2765)
at io.reactivex.Single.subscribe(Single.java:2686)
at com.visualogyx.app.fragments.VisualogyxBaseFragment.lambda$storeAndRetrieveListFromDB$VisualogyxBaseFragment(VisualogyxBaseFragment.java:226)
at com.visualogyx.app.fragments.VisualogyxBaseFragment$$Lambda[=12=].execute(Unknown Source)
at io.realm.Realm.executeTransaction(Realm.java:1394)
at com.visualogyx.app.fragments.VisualogyxBaseFragment.storeAndRetrieveListFromDB(VisualogyxBaseFragment.java:200)
at com.visualogyx.app.fragments.HomeFragment.onSuccess(HomeFragment.java:59)
at com.visualogyx.app.fragments.HomeFragment.onSuccess(HomeFragment.java:54)
at com.visualogyx.app.controller.ApiCallback.onResponse(ApiCallback.java:36)
at com.visualogyx.app.controller.ApiClient.onResponse(ApiClient.java:264)
at retrofit2.ExecutorCallAdapterFactory$ExecutorCallbackCall.run(ExecutorCallAdapterFactory.java:70)
at android.os.Handler.handleCallback(Handler.java:836)
at android.os.Handler.dispatchMessage(Handler.java:103)
at android.os.Looper.loop(Looper.java:203)
at android.app.ActivityThread.main(ActivityThread.java:6293)
at java.lang.reflect.Method.invoke(Native Method)
at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:1065)
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:926)

也许你应该将日志添加到代码的不同部分并找出内存足迹,哪一部分占用了内存。或者你可以使用 AndroidStuido 内存分配工具来监控内存分配。

您可以使用字节流代替字节[]

String body = null;
String charset = "UTF-8"; // You should determine it based on response header.

try (
    InputStream gzippedResponse = responseBodyResponse.body().byteStream();
    InputStream ungzippedResponse = new GZIPInputStream(gzippedResponse);
    Reader reader = new InputStreamReader(ungzippedResponse, charset);
    Writer writer = new StringWriter();
) {
    char[] buffer = new char[10240];
    for (int length = 0; (length = reader.read(buffer)) > 0;) {
        writer.write(buffer, 0, length);
    }
    body = writer.toString();
}

我不得不在 doFinally() 块中移动与领域交易相关的操作。 到目前为止一切顺利。

Observable.fromIterable(form)
            .concatMapIterable(Form::getFormVersions) // get form version list from single form object
            .doOnSubscribe(disposable -> AppLogger.i(tag, "Form Versions download is subscribed")) // subscribe process to rx
            .filter(formVersion -> formVersion.getJsonString() == null) // only get those versions who is not downloaded yet
            .doOnNext(formVersion -> { // get formVersion object

                AppLogger.i(tag, "download this form ---------> " + formVersion.getFormUrl());
                AppLogger.i(tag, formVersion.getFormUrl());
                String constructURL = formVersion.getFormUrl();

                /* download form gzip process starts from here */
                apiClient.getJsonByFormURL(constructURL)
                        .subscribeOn(Schedulers.io())
                        .subscribe(new SingleObserver<Response<ResponseBody>>() {
                            @Override
                            public void onSubscribe(Disposable disposable) {
                                AppLogger.i(tag, "gzip download ->" + "subscribed");

                            }

                            @Override
                            public void onSuccess(Response<ResponseBody> responseBodyResponse) {
                                AppLogger.i(tag, "gzip downloaded " + " success");
                                AppLogger.i(tag, "gzip downloaded ->" + responseBodyResponse.toString());

                                if (responseBodyResponse.code() == 401) {
                                    //not authorized
                                    return;
                                }
                                // You should determine it based on response header.

                                if (responseBodyResponse.body() != null)
                                    try {
                                        gzipBody = null;

                                        gzis = responseBodyResponse.body().byteStream();
                                        ungzippedResponse = new GZIPInputStream(gzis);
                                        reader = new InputStreamReader(ungzippedResponse, charset);
                                        writer = new java.io.StringWriter();

                                        buffer = new char[10240];
                                        for (int length = 0; (length = reader.read(buffer)) > 0; ) {
                                            writer.write(buffer, 0, length);
                                        }
                                        gzipBody = writer.toString();

                                        formVersion.setJsonString(gzipBody);
                                        AppLogger.i(tag, "set json string ->" + gzipBody);

                                    } catch (IOException e) {
                                        AppLogger.e(tag, "gzip extract exception->" + e.getLocalizedMessage(), e);
                                    }
                            }

                            @Override
                            public void onError(Throwable throwable) {
                                AppLogger.e(tag, "gzip failed->" + throwable.getMessage(), throwable);
                            }
                        });


            })
            .doOnTerminate(() -> AppLogger.i(tag, "Form Versions terminated"))
            .doOnError(Throwable::printStackTrace)
            .doFinally(() -> {
                baseRealm = Realm.getDefaultInstance();
                baseRealm.executeTransaction(realm ->
                        realm.copyToRealmOrUpdate(form));
            })
            .subscribe(
                    formVersion -> AppLogger.i(tag, "Form Versions download completed"),
                    throwable -> AppLogger.e(tag, throwable.getMessage(), throwable)
            );