如何将流程执行实现为反应式`Observable [String]
How to implement process execution as reactive `Observable[String]
我想将外部流程执行表示为 Observable[String]
,其中 String
- 来自流程输出的行。这是我所做的示例,它有效:
import monix.eval.Task
import monix.execution.Scheduler.Implicits.global
import monix.reactive.Observable
object TestSo {
def main(args: Array[String]): Unit = {
val lineStream = scala.sys.process.Process("python3 test.py").lineStream
val lineStreamO: Observable[String] = Observable.fromIterator(Task(lineStream.iterator))
.doOnNext(l => Task(println(l))) //logging
.guarantee(Task(println("clean resources")))
println(lineStreamO.toListL.runSyncUnsafe())
}
}
您可以看到,该进程每秒都会发出新行。但这无关紧要。只需提供完整示例,test.py
:
from time import sleep
print(0, flush=True)
sleep(1)
print(1, flush=True)
sleep(1)
print(2, flush=True)
sleep(1)
print(3, flush=True)
sleep(1)
print(4, flush=True)
输出:
0
1
2
3
4
5
clean resources
List(0, 1, 2, 3, 4, 5)
问题:
我想要超时 - 如果进程冻结(例如 sleep 100000
)进程应该在超时后终止。此外,如果进程强制执行或失败,则应清理一些资源(例如 guarantee
)。非零退出代码应表示失败。
如何通过适当的错误处理实现流程执行 Observable[String]
? rx-java
欢迎提供解决方案。
超时的需要将迫使您重新编写 lineStream
逻辑的主要部分。另一方面,通过这样的重写,您可以避免中间 Iterator
并直接将行推入 Subject
。对于超时逻辑,您可以使用 Monix timeoutOnSlowUpstream
方法,但您仍然必须处理超时错误并关闭启动的进程。
还可以选择如何处理长输出和多个订阅者。在这段代码中,我决定使用 replayLimited
的有限缓冲区。根据您的需要,您可能会选择一些不同的策略。这是解决方案的草图:
object ProcessHelper {
import scala.sys.process.{Process, BasicIO}
import scala.concurrent.duration.FiniteDuration
import monix.eval.Task
import monix.execution.Scheduler
import monix.reactive.subjects.ConcurrentSubject
import monix.reactive.Observable
private class FinishedFlagWrapper(var finished: Boolean = false)
def buildProcessLinesObservable(cmd: String, timeout: FiniteDuration, bufferLines: Int = 100)(implicit scheduler: Scheduler): Observable[String] = {
// works both as a holder for a mutable boolean var and as a synchronization lock
// that is required to preserve semantics of a Subject, particularly
// that onNext is never called after onError or onComplete
val finished = new FinishedFlagWrapper()
// whether you want here replayLimited or some other logic depends on your needs
val subj = ConcurrentSubject.replayLimited[String](bufferLines)
val proc = Process(cmd).run(BasicIO(withIn = false,
line => finished.synchronized {
if (!finished.finished)
subj.onNext(line)
}, None))
// unfortunately we have to block a whole thread just to wait for the exit code
val exitThread = new Thread(() => {
try {
val exitCode = proc.exitValue()
finished.synchronized {
if (!finished.finished) {
finished.finished = true
if (exitCode != 0) {
subj.onError(new RuntimeException(s"Process '$cmd' has exited with $exitCode."))
}
else {
subj.onComplete()
}
}
}
}
catch {
// ignore when this is a result of our timeout
case e: InterruptedException => if(!finished.finished) e.printStackTrace()
}
}, "Process-exit-wait")
exitThread.start()
subj.timeoutOnSlowUpstream(timeout)
.guarantee(Task(finished.synchronized {
if (!finished.finished) {
finished.finished = true
proc.destroy()
exitThread.interrupt()
}
}))
}
}
用法示例如下:
def test(): Unit = {
import monix.execution.Ack._
import monix.reactive._
import scala.concurrent._
import scala.concurrent.duration._
import monix.execution.Scheduler.Implicits.global
val linesO = ProcessHelper.buildProcessLinesObservable("python3 test.py", 5 seconds, 2) // buffer is reduced to just 2 lines just for this example
linesO.subscribe(new Observer[String] {
override def onNext(s: String): Future[Ack] = {
println(s"Received '$s'")
Future.successful(Continue)
}
override def onError(ex: Throwable): Unit = println(s"Error '$ex'")
override def onComplete(): Unit = println("Complete")
})
try {
println(linesO.toListL.runSyncUnsafe())
println(linesO.toListL.runSyncUnsafe()) // second run will show only last 2 values because of the reduced buffer size
println("Finish success")
}
catch {
case e: Throwable => println("Failed with " + e)
}
}
我以反应式 rxjava2
Observable
的小 library, that wraps NuProcess 方式将流程执行实现为反应式。例如:
PreparedStreams streams = builder.asStdInOut();
Single<NuProcess> started = streams.started();
Single<Exit> done = streams.waitDone();
Observable<byte[]> stdout = streams.stdOut();
Observer<byte[]> stdin = streams.stdIn();
done.subscribe();
我想将外部流程执行表示为 Observable[String]
,其中 String
- 来自流程输出的行。这是我所做的示例,它有效:
import monix.eval.Task
import monix.execution.Scheduler.Implicits.global
import monix.reactive.Observable
object TestSo {
def main(args: Array[String]): Unit = {
val lineStream = scala.sys.process.Process("python3 test.py").lineStream
val lineStreamO: Observable[String] = Observable.fromIterator(Task(lineStream.iterator))
.doOnNext(l => Task(println(l))) //logging
.guarantee(Task(println("clean resources")))
println(lineStreamO.toListL.runSyncUnsafe())
}
}
您可以看到,该进程每秒都会发出新行。但这无关紧要。只需提供完整示例,test.py
:
from time import sleep
print(0, flush=True)
sleep(1)
print(1, flush=True)
sleep(1)
print(2, flush=True)
sleep(1)
print(3, flush=True)
sleep(1)
print(4, flush=True)
输出:
0
1
2
3
4
5
clean resources
List(0, 1, 2, 3, 4, 5)
问题:
我想要超时 - 如果进程冻结(例如 sleep 100000
)进程应该在超时后终止。此外,如果进程强制执行或失败,则应清理一些资源(例如 guarantee
)。非零退出代码应表示失败。
如何通过适当的错误处理实现流程执行 Observable[String]
? rx-java
欢迎提供解决方案。
超时的需要将迫使您重新编写 lineStream
逻辑的主要部分。另一方面,通过这样的重写,您可以避免中间 Iterator
并直接将行推入 Subject
。对于超时逻辑,您可以使用 Monix timeoutOnSlowUpstream
方法,但您仍然必须处理超时错误并关闭启动的进程。
还可以选择如何处理长输出和多个订阅者。在这段代码中,我决定使用 replayLimited
的有限缓冲区。根据您的需要,您可能会选择一些不同的策略。这是解决方案的草图:
object ProcessHelper {
import scala.sys.process.{Process, BasicIO}
import scala.concurrent.duration.FiniteDuration
import monix.eval.Task
import monix.execution.Scheduler
import monix.reactive.subjects.ConcurrentSubject
import monix.reactive.Observable
private class FinishedFlagWrapper(var finished: Boolean = false)
def buildProcessLinesObservable(cmd: String, timeout: FiniteDuration, bufferLines: Int = 100)(implicit scheduler: Scheduler): Observable[String] = {
// works both as a holder for a mutable boolean var and as a synchronization lock
// that is required to preserve semantics of a Subject, particularly
// that onNext is never called after onError or onComplete
val finished = new FinishedFlagWrapper()
// whether you want here replayLimited or some other logic depends on your needs
val subj = ConcurrentSubject.replayLimited[String](bufferLines)
val proc = Process(cmd).run(BasicIO(withIn = false,
line => finished.synchronized {
if (!finished.finished)
subj.onNext(line)
}, None))
// unfortunately we have to block a whole thread just to wait for the exit code
val exitThread = new Thread(() => {
try {
val exitCode = proc.exitValue()
finished.synchronized {
if (!finished.finished) {
finished.finished = true
if (exitCode != 0) {
subj.onError(new RuntimeException(s"Process '$cmd' has exited with $exitCode."))
}
else {
subj.onComplete()
}
}
}
}
catch {
// ignore when this is a result of our timeout
case e: InterruptedException => if(!finished.finished) e.printStackTrace()
}
}, "Process-exit-wait")
exitThread.start()
subj.timeoutOnSlowUpstream(timeout)
.guarantee(Task(finished.synchronized {
if (!finished.finished) {
finished.finished = true
proc.destroy()
exitThread.interrupt()
}
}))
}
}
用法示例如下:
def test(): Unit = {
import monix.execution.Ack._
import monix.reactive._
import scala.concurrent._
import scala.concurrent.duration._
import monix.execution.Scheduler.Implicits.global
val linesO = ProcessHelper.buildProcessLinesObservable("python3 test.py", 5 seconds, 2) // buffer is reduced to just 2 lines just for this example
linesO.subscribe(new Observer[String] {
override def onNext(s: String): Future[Ack] = {
println(s"Received '$s'")
Future.successful(Continue)
}
override def onError(ex: Throwable): Unit = println(s"Error '$ex'")
override def onComplete(): Unit = println("Complete")
})
try {
println(linesO.toListL.runSyncUnsafe())
println(linesO.toListL.runSyncUnsafe()) // second run will show only last 2 values because of the reduced buffer size
println("Finish success")
}
catch {
case e: Throwable => println("Failed with " + e)
}
}
我以反应式 rxjava2
Observable
的小 library, that wraps NuProcess 方式将流程执行实现为反应式。例如:
PreparedStreams streams = builder.asStdInOut();
Single<NuProcess> started = streams.started();
Single<Exit> done = streams.waitDone();
Observable<byte[]> stdout = streams.stdOut();
Observer<byte[]> stdin = streams.stdIn();
done.subscribe();