如何实现对可运行线程的简单 RxJava 调用?

How do I implement simple a RxJava call into a Runnable Thread?

我正在尝试将一个简单的 RxJava 调用添加到一个可运行的线程中,这样我就可以在线程完成后更新 UI。我该怎么做呢?这是我的 Activity 代码:

public class PrintActivity extends AppCompatActivity {
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_printer);
        printZpl("^XA^LL360^POI^FO20,20^A0N,25,25^FDThis is a test of the ZPL file printing on " + Helper.getCurrDateTime() + "^FS^XZ");
    }
}

这是执行可运行线程的class:

public class PrinterManager {
    private static void printZpl(final String body, final String footer) {
        new Thread(new Runnable() {
            public void run() {
                try {
                    Connection btCon = new BluetoothConnectionInsecure("AC:3F:A4:0E:22:05");
                    btCon.open();
                    btCon.write(body.getBytes());
                    Thread.sleep(500);
                    btCon.close();
                    // Insert RxJava return here to update the UI in the activity once the thread is completed.
                } catch (Exception e) {
                    Timber.e(e.getMessage());
                }
            }
        }).start();
    }
}

我简化了这篇文章的代码。实际代码要复杂得多...

像这样将代码的异步部分包装在 Observable 中:

public class PrinterManager {
    public static Observable<Void> printZpl(final String body, final String footer) {

    return Observable.fromCallable(new Callable<Void>() {
        @Override
        public Void call() throws Exception {
            try {
                Connection btCon = new BluetoothConnectionInsecure("AC:3F:A4:0E:22:05");
                btCon.open();
                btCon.write(body.getBytes());
                Thread.sleep(500);
                btCon.close();                  
            } catch (Exception e) {
                Timber.e(e.getMessage());
            }
            return null;
        }
    });
}
}

然后在你的Activity里订阅它,触发里面的代码:

public class PrintActivity extends AppCompatActivity {
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_printer);
        PrinterManager.printZpl("^XA^LL360^POI^FO20,20^A0N,25,25^FDThis is a test of the ZPL file printing on " + Helper.getCurrDateTime() + "^FS^XZ")
                      .subscribeOn(Schedulers.io())
                      .observeOn(AndroidSchedulers.mainThread())
                      .subscribe();
    }
}

如果您还没有,则需要将依赖项添加到您的 app.gradle 文件中:

compile "io.reactivex:rxandroid:1.2.0"
compile "io.reactivex:rxjava:1.2.0"

如果你想更新 UI,然后将一个 Observer 传递给 subscribe 方法,而不是像我上面的例子那样只使用空的。

使用 RxJava2:

Completable.fromAction(() -> {
            Connection btCon = new BluetoothConnectionInsecure("AC:3F:A4:0E:22:05");
            btCon.open();
            btCon.write(body.getBytes());
            Thread.sleep(500);
            btCon.close();
        })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe();

使用 Completable 而不是 Observable 因为除了完成事件你不会发出任何东西。