如何将hot observable和single变成cold observable
How to turn hot observable and single into cold observable
我尝试为 NuProcess 制作 rx 包装器,用于异步执行外部进程的库。
这里主要class - "communication" 和流程。在这里我读到标准输出:
static class MyProcessHandler extends NuAbstractProcessHandler {
final PublishSubject<String> stdout = PublishSubject.create();
@Override
public void onStdout(ByteBuffer buffer, boolean closed) {
if (!closed) {
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
stdout.onNext(new String(bytes));
}
}
@Override
public void onExit(int statusCode) {
if (statusCode != 0)
stdout.onError(new RuntimeException("non zero code"));
else
stdout.onComplete();
}
}
我是这样开始的:
static class Streams {
RxProcessHandler handler = new RxProcessHandler();
Single<Integer> waitDone(long timeout, TimeUnit timeUnit) {
return Single.create(emitter -> {
NuProcessBuilder b = new NuProcessBuilder("some cmd");
b.setProcessListener(handler);
NuProcess process = b.start();
emitter.setCancellable(() -> process.destroy(true));
int code = process.waitFor(timeout, timeUnit);
emitter.onSuccess(code);
});
}
public PublishSubject<String> stdOut() {
return handler.stdout;
}
}
最后是我的 api。如您所见,这里有三个变体:
1 - 等待进程结束
2 - 与标准输出回调相同
3 - 读取标准输出直到进程结束。 onComplete 表示零退出代码,错误 - 非零退出代码。 subscribe()
应该开始进程。
我不知道如何实现 3d 变体。
static class PublicApi {
//just wait process ends
public Single<Integer> asWaitDone(long timeout, TimeUnit timeUnit) {
return new Streams().waitDone(timeout, timeUnit);
}
//wait process ends and have stdout callback
public Pair<Single<Integer>, Observable<String>> asWaitDoneWithStdout(long timeout, TimeUnit timeUnit) {
Streams streams = new Streams();
return new ImmutablePair(streams.waitDone(timeout, timeUnit), streams.stdOut());
}
//read stdout until process ends
public Observable<String> asStdout(long timeout, TimeUnit timeUnit) {
return ???
}
}
您可以重新排列现有的命令集并将它们变成一个 Observable
static final class MyProcessHandlerObservable extends NuAbstractProcessHandler {
final ObservableEmitter<String> emitter;
MyProcessHandler(ObservableEmitter<String> emitter) {
this.emitter = emitter;
}
@Override
public void onStdout(ByteBuffer buffer, boolean closed) {
if (!closed) {
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
emitter.onNext(new String(bytes));
}
}
@Override
public void onExit(int statusCode) {
if (statusCode != 0) {
stdout.onError(new RuntimeException("non zero code: " + statusCode));
} else {
stdout.onComplete();
}
}
}
public Observable<String> asStdout(long timeout, TimeUnit timeUnit) {
return Observable.create(emitter -> {
MyProcessHandlerObservable handler = new MyProcessHandlerObservable(emitter);
NuProcessBuilder b = new NuProcessBuilder("some cmd");
b.setProcessListener(handler);
NuProcess process = b.start();
emitter.setCancellable(() -> process.destroy(true));
})
.takeUntil(Observable.timer(timeout, timeUnit).map(v -> throw new TimeoutException()));
}
我尝试为 NuProcess 制作 rx 包装器,用于异步执行外部进程的库。
这里主要class - "communication" 和流程。在这里我读到标准输出:
static class MyProcessHandler extends NuAbstractProcessHandler {
final PublishSubject<String> stdout = PublishSubject.create();
@Override
public void onStdout(ByteBuffer buffer, boolean closed) {
if (!closed) {
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
stdout.onNext(new String(bytes));
}
}
@Override
public void onExit(int statusCode) {
if (statusCode != 0)
stdout.onError(new RuntimeException("non zero code"));
else
stdout.onComplete();
}
}
我是这样开始的:
static class Streams {
RxProcessHandler handler = new RxProcessHandler();
Single<Integer> waitDone(long timeout, TimeUnit timeUnit) {
return Single.create(emitter -> {
NuProcessBuilder b = new NuProcessBuilder("some cmd");
b.setProcessListener(handler);
NuProcess process = b.start();
emitter.setCancellable(() -> process.destroy(true));
int code = process.waitFor(timeout, timeUnit);
emitter.onSuccess(code);
});
}
public PublishSubject<String> stdOut() {
return handler.stdout;
}
}
最后是我的 api。如您所见,这里有三个变体:
1 - 等待进程结束
2 - 与标准输出回调相同
3 - 读取标准输出直到进程结束。 onComplete 表示零退出代码,错误 - 非零退出代码。 subscribe()
应该开始进程。
我不知道如何实现 3d 变体。
static class PublicApi {
//just wait process ends
public Single<Integer> asWaitDone(long timeout, TimeUnit timeUnit) {
return new Streams().waitDone(timeout, timeUnit);
}
//wait process ends and have stdout callback
public Pair<Single<Integer>, Observable<String>> asWaitDoneWithStdout(long timeout, TimeUnit timeUnit) {
Streams streams = new Streams();
return new ImmutablePair(streams.waitDone(timeout, timeUnit), streams.stdOut());
}
//read stdout until process ends
public Observable<String> asStdout(long timeout, TimeUnit timeUnit) {
return ???
}
}
您可以重新排列现有的命令集并将它们变成一个 Observable
static final class MyProcessHandlerObservable extends NuAbstractProcessHandler {
final ObservableEmitter<String> emitter;
MyProcessHandler(ObservableEmitter<String> emitter) {
this.emitter = emitter;
}
@Override
public void onStdout(ByteBuffer buffer, boolean closed) {
if (!closed) {
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
emitter.onNext(new String(bytes));
}
}
@Override
public void onExit(int statusCode) {
if (statusCode != 0) {
stdout.onError(new RuntimeException("non zero code: " + statusCode));
} else {
stdout.onComplete();
}
}
}
public Observable<String> asStdout(long timeout, TimeUnit timeUnit) {
return Observable.create(emitter -> {
MyProcessHandlerObservable handler = new MyProcessHandlerObservable(emitter);
NuProcessBuilder b = new NuProcessBuilder("some cmd");
b.setProcessListener(handler);
NuProcess process = b.start();
emitter.setCancellable(() -> process.destroy(true));
})
.takeUntil(Observable.timer(timeout, timeUnit).map(v -> throw new TimeoutException()));
}