RxJava 2 - 调度程序抛出致命异常

RxJava 2 - Fatal Exception thrown on Scheduler

我试图将我的异步任务转换为 JavaRx 2。我使用 google 工作表 api 从电子表格下载数据。 (here is a link how this happens)

这是我的部分代码:

创建时:

/**
 * JavaRx
 */

//Observable
Observable<String> observable
        = Observable.create(
        new ObservableOnSubscribe<String>() {
            @Override

            public void subscribe(ObservableEmitter<String> e) throws Exception {
                //Use onNext to emit each item in the stream//
                e.onNext("https://docs.google.com/spreadsheets/d/1W5S5W2QH6WHjUcL1VMwqIqOdFYVleTopJNryQJGw568/gviz/tq?tqx=out:QUERY&tq=select+B,X,Y,Z");

                    //Once the Observable has emitted all items in the sequence, call onComplete//
                e.onComplete();
            }
        }
    ).subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread());

//Create our subscription
Observer<String> observer = new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.e(TAG, "onSubscribe " + Thread.currentThread().getName());
    }

    @Override
    public void onNext(String value) {
        try {
            String data = getLeagueData(value);
            mLeagues.add(autoProcessJsonLeague("Argentina Primera Division", returnJSON(data)));
        } catch (IOException e) {
            e.printStackTrace();
        }

        Log.e(TAG, "onNext: " + value + Thread.currentThread().getName());

    }

    @Override
    public void onError(Throwable e) {
        Log.e(TAG, "onError: ");
    }

    @Override
    public void onComplete() {
        Log.e(TAG, "onComplete: All Done! " + Thread.currentThread().getName());
    }
};
observable.subscribe(observer);

其他方法:

private String getLeagueData(String urlString) throws IOException {
//Download JSON file
    InputStream is = null;

    try {
        URL url = new URL(urlString);
        HttpsURLConnection conn = (HttpsURLConnection) url.openConnection();
        conn.setInstanceFollowRedirects(true);  //you still need to handle redirect manually.
        HttpsURLConnection.setFollowRedirects(true);
        conn.setReadTimeout(10000 /* milliseconds */);
        conn.setConnectTimeout(15000 /* milliseconds */);
        conn.setInstanceFollowRedirects(true);
        conn.setRequestMethod("GET");
        conn.setDoInput(true);
        // Starts the query
        conn.connect(); //ERROR HAPPENS HERE!
        int responseCode = conn.getResponseCode();
        is = conn.getInputStream();

        String contentAsString = convertStreamToString(is);
        //Log.d("contentAsString", contentAsString);
        return contentAsString;
    } catch (ProtocolException e) {
        e.printStackTrace();
    } catch (MalformedURLException e) {
        e.printStackTrace();
    } catch (IOException e) {
        e.printStackTrace();
    } finally {
        if (is != null) {
            is.close();
        }
    }

    return null;
}

private String convertStreamToString(InputStream is) {
    BufferedReader reader = new BufferedReader(new InputStreamReader(is));
    StringBuilder sb = new StringBuilder();

    String line = null;
    try {
        while ((line = reader.readLine()) != null) {
            sb.append(line + "\n");
        }
    } catch (IOException e) {
        e.printStackTrace();
    } finally {
        try {
            is.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    return sb.toString();
}


private League autoProcessJsonLeague(String leagueName, JSONObject object) {
    //Get the data from the JSON string
    ArrayList<Team> teams = new ArrayList<>();
    try {
        JSONArray rows = object.getJSONArray("rows");
        for (int r = 0; r < rows.length(); ++r) {
            JSONObject row = rows.getJSONObject(r);
            JSONArray columns = row.getJSONArray("c");
            String name = columns.getJSONObject(0).getString("v");
            int points = columns.getJSONObject(1).getInt("v");
            double hGoalAv = columns.getJSONObject(2).getDouble("v");
            double aGoalAv = columns.getJSONObject(3).getDouble("v");
            hGoalAv = Utilities.round(hGoalAv, 2);
            aGoalAv = Utilities.round(aGoalAv, 2);
            teams.add(new Team(name, points, hGoalAv, aGoalAv));
            //Log.d("Team", name + " " + hGoalAv + " " + aGoalAv);
        }
    } catch (JSONException e) {
        e.printStackTrace();
        e.printStackTrace();
    }
    return new League(leagueName, teams);
}

所以我创建了一个可观察对象,我在 IO 线程上订阅并在主线程上观察。使用 onNext 我将 url link 发送给观察者,然后我尝试连接到服务器以下载 json 字符串文件。

conn.connect() 行的方法 getLeagueData() 发生错误; 它说 java.lang.IllegalStateException:调度程序抛出致命异常。

完整堆栈跟踪错误:

08-16 08:53:09.934 29841-29841/com.aresproductions.bettingtools E/AndroidRuntime: FATAL EXCEPTION: main
Process: com.aresproductions.bettingtools, PID: 29841
java.lang.IllegalStateException: Fatal Exception thrown on Scheduler.
  at io.reactivex.android.schedulers.HandlerScheduler$ScheduledRunnable.run(HandlerScheduler.java:111)
  at android.os.Handler.handleCallback(Handler.java:751)
  at android.os.Handler.dispatchMessage(Handler.java:95)
  at android.os.Looper.loop(Looper.java:154)
  at android.app.ActivityThread.main(ActivityThread.java:6195)
  at java.lang.reflect.Method.invoke(Native Method)
  at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:874)
  at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:764)
 Caused by: android.os.NetworkOnMainThreadException
  at android.os.StrictMode$AndroidBlockGuardPolicy.onNetwork(StrictMode.java:1303)
  at java.net.Inet6AddressImpl.lookupHostByName(Inet6AddressImpl.java:86)
  at java.net.Inet6AddressImpl.lookupAllHostAddr(Inet6AddressImpl.java:74)
  at java.net.InetAddress.getAllByName(InetAddress.java:752)
  at com.android.okhttp.internal.Network.resolveInetAddresses(Network.java:29)
  at com.android.okhttp.internal.http.RouteSelector.resetNextInetSocketAddress(RouteSelector.java:187)
  at com.android.okhttp.internal.http.RouteSelector.nextProxy(RouteSelector.java:156)
  at com.android.okhttp.internal.http.RouteSelector.next(RouteSelector.java:98)
  at com.android.okhttp.internal.http.HttpEngine.createNextConnection(HttpEngine.java:346)
  at com.android.okhttp.internal.http.HttpEngine.connect(HttpEngine.java:329)
  at com.android.okhttp.internal.http.HttpEngine.sendRequest(HttpEngine.java:247)
  at com.android.okhttp.internal.huc.HttpURLConnectionImpl.execute(HttpURLConnectionImpl.java:457)
  at com.android.okhttp.internal.huc.HttpURLConnectionImpl.connect(HttpURLConnectionImpl.java:126)
  at com.android.okhttp.internal.huc.DelegatingHttpsURLConnection.connect(DelegatingHttpsURLConnection.java:89)
  at com.android.okhttp.internal.huc.HttpsURLConnectionImpl.connect(HttpsURLConnectionImpl.java)
  at com.aresproductions.bettingtools.MainActivity.getLeagueData(MainActivity.java:307)
  at com.aresproductions.bettingtools.MainActivity.access[=12=]0(MainActivity.java:80)
  at com.aresproductions.bettingtools.MainActivity.onNext(MainActivity.java:180)
  at com.aresproductions.bettingtools.MainActivity.onNext(MainActivity.java:171)
  at io.reactivex.internal.operators.observable.ObservableObserveOn$ObserveOnObserver.drainNormal(ObservableObserveOn.java:198)
  at io.reactivex.internal.operators.observable.ObservableObserveOn$ObserveOnObserver.run(ObservableObserveOn.java:250)
  at io.reactivex.android.schedulers.HandlerScheduler$ScheduledRunnable.run(HandlerScheduler.java:109)
  at android.os.Handler.handleCallback(Handler.java:751) 
  at android.os.Handler.dispatchMessage(Handler.java:95) 
  at android.os.Looper.loop(Looper.java:154) 
  at android.app.ActivityThread.main(ActivityThread.java:6195) 
  at java.lang.reflect.Method.invoke(Native Method) 
  at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:874) 
      at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:764) 
08-16 08:53:09.936 29841-29841/com.aresproductions.bettingtools E/MQSEventManagerDelegate: failed to get MQSService.

提前致谢!

我假设您在主线程上遇到网络异常,因为网络调用在错误的位置。你应该把它移到创建中,这样它就会在 IO 调度程序上执行,而不是作为观察普通字符串的一部分:

Observable<String> observable = Observable.create(
    new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> e) throws Exception {

            String value = "https://docs.google.com/spreadsheets/d/1W5S5W2QH6WHjUcL1VM" +
                "wqIqOdFYVleTopJNryQJGw568/gviz/tq?tqx=out:QUERY&tq=select+B,X,Y,Z";

            String data = getLeagueData(value);

            //Use onNext to emit the item in the stream//
            e.onNext(data);

            /* Once the Observable has emitted all items 
            in the sequence, call onComplete */
            e.onComplete();
        }
    }
)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());

//Create our subscription
Observer<String> observer = new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.e(TAG, "onSubscribe " + Thread.currentThread().getName());
    }

    @Override
    public void onNext(String data) {
        try {
            mLeagues.add(autoProcessJsonLeague(
                "Argentina Primera Division", returnJSON(data)));
        } catch (IOException e) {
            e.printStackTrace();
        }


        Log.e(TAG, "onNext: " + value + Thread.currentThread().getName());

    }

    @Override
    public void onError(Throwable e) {
        Log.e(TAG, "onError: ");
    }

    @Override
    public void onComplete() {
        Log.e(TAG, "onComplete: All Done! " + Thread.currentThread().getName());
    }
};
observable.subscribe(observer);

您可能还想搬家

autoProcessJsonLeague(
                "Argentina Primera Division", returnJSON(data))

进入 ObservableOnSubscribe 以防处理成本高昂。

问题是你在主线程上进行网络调用。尽管您已订阅 Schedulers.io(),但您正在执行网络调用的 onNext() 方法将在主线程上调用,因为您正在主线程 observeOn(AndroidSchedulers.mainThread()).

上进行观察

解决办法是调用observable的getLeagueData(String urlString) inside subscribe()方法,然后用网络调用的结果调用e.onNext(result)