vertx - 如何从可执行程序异步读取流
vertx - how to read stream from executable program async
我有生成器 g,一旦我从控制台 运行 它,它就会开始写入控制台输出(stdout),睡眠 x 秒并继续,数据流。
我希望我的程序 运行 g 并将其输出绑定到 java vertx 应用程序作为流输入。
我想异步完成所有读取,我该如何实现?
这就是我正在做的事情:
public class InputHandler extends AbstractVerticle {
final String command = "path";
@Override
public void start() throws Exception {
Runtime r = Runtime.getRuntime();
Process p; // Process tracks one external native process
BufferedReader is; // reader for output of process
String line;
p = r.exec(command);
System.out.println("In Main after exec");
is = new BufferedReader(new InputStreamReader(p.getInputStream()));
while ((line = is.readLine()) != null)
try {
System.out.println(line);
}catch (Exception ex){
System.out.println(ex);
}
}
}
这是抛出的异常:
io.vertx.core.VertxException: Thread blocked
at java.io.FileInputStream.readBytes(Native Method)
at java.io.FileInputStream.read(FileInputStream.java:255)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:284)
at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
at java.io.InputStreamReader.read(InputStreamReader.java:184)
at java.io.BufferedReader.fill(BufferedReader.java:161)
at java.io.BufferedReader.readLine(BufferedReader.java:324)
at java.io.BufferedReader.readLine(BufferedReader.java:389)
at io.vertx.example.InputHandler.start(InputHandler.java:27)
at io.vertx.core.AbstractVerticle.start(AbstractVerticle.java:106)
at io.vertx.core.impl.DeploymentManager.lambda$doDeploy(DeploymentManager.java:483)
at io.vertx.core.impl.DeploymentManager$$Lambda/884603232.handle(Unknown Source)
at io.vertx.core.impl.ContextImpl.lambda$wrapTask(ContextImpl.java:339)
at io.vertx.core.impl.ContextImpl$$Lambda/154173878.run(Unknown Source)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:886)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:745)
您应该使用 Worker verticle
。如此处所述 https://vertx.io/preview/docs/vertx-core/java/#blocking_code 他们可以处理阻塞代码。
例如,您应该以这种方式部署您的 Verticle:
vertx.deployVerticle(YourVerticle.class.getName(), new DeploymentOptions().setWorker(true));
所以,我找到了解决这个问题的方法:
public void start() throws Exception {
Runtime r = Runtime.getRuntime();
Process p = r.exec(command);
AsyncInputStream asyncInputStream = new AsyncInputStream(vertx, vertx.nettyEventLoopGroup(), p.getInputStream());
asyncInputStream.handler(buffer ->
lineHandler(buffer.getString(0, buffer.length() - 1))
);
}
我需要深入了解如何使用 org.wisdom.framework.vertx.AsyncInputStream
,因此您需要在 pom XML 中使用它:
<wisdomVertxEngin.version>0.10.0</wisdomVertxEngin.version>
<dependency>
<groupId>org.wisdom-framework</groupId>
<artifactId>wisdom-vertx-engine</artifactId>
<version>${wisdomVertxEngin.version}</version>
</dependency>
我有生成器 g,一旦我从控制台 运行 它,它就会开始写入控制台输出(stdout),睡眠 x 秒并继续,数据流。
我希望我的程序 运行 g 并将其输出绑定到 java vertx 应用程序作为流输入。
我想异步完成所有读取,我该如何实现?
这就是我正在做的事情:
public class InputHandler extends AbstractVerticle {
final String command = "path";
@Override
public void start() throws Exception {
Runtime r = Runtime.getRuntime();
Process p; // Process tracks one external native process
BufferedReader is; // reader for output of process
String line;
p = r.exec(command);
System.out.println("In Main after exec");
is = new BufferedReader(new InputStreamReader(p.getInputStream()));
while ((line = is.readLine()) != null)
try {
System.out.println(line);
}catch (Exception ex){
System.out.println(ex);
}
}
}
这是抛出的异常:
io.vertx.core.VertxException: Thread blocked
at java.io.FileInputStream.readBytes(Native Method)
at java.io.FileInputStream.read(FileInputStream.java:255)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:284)
at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
at java.io.InputStreamReader.read(InputStreamReader.java:184)
at java.io.BufferedReader.fill(BufferedReader.java:161)
at java.io.BufferedReader.readLine(BufferedReader.java:324)
at java.io.BufferedReader.readLine(BufferedReader.java:389)
at io.vertx.example.InputHandler.start(InputHandler.java:27)
at io.vertx.core.AbstractVerticle.start(AbstractVerticle.java:106)
at io.vertx.core.impl.DeploymentManager.lambda$doDeploy(DeploymentManager.java:483)
at io.vertx.core.impl.DeploymentManager$$Lambda/884603232.handle(Unknown Source)
at io.vertx.core.impl.ContextImpl.lambda$wrapTask(ContextImpl.java:339)
at io.vertx.core.impl.ContextImpl$$Lambda/154173878.run(Unknown Source)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:886)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:745)
您应该使用 Worker verticle
。如此处所述 https://vertx.io/preview/docs/vertx-core/java/#blocking_code 他们可以处理阻塞代码。
例如,您应该以这种方式部署您的 Verticle:
vertx.deployVerticle(YourVerticle.class.getName(), new DeploymentOptions().setWorker(true));
所以,我找到了解决这个问题的方法:
public void start() throws Exception {
Runtime r = Runtime.getRuntime();
Process p = r.exec(command);
AsyncInputStream asyncInputStream = new AsyncInputStream(vertx, vertx.nettyEventLoopGroup(), p.getInputStream());
asyncInputStream.handler(buffer ->
lineHandler(buffer.getString(0, buffer.length() - 1))
);
}
我需要深入了解如何使用 org.wisdom.framework.vertx.AsyncInputStream
,因此您需要在 pom XML 中使用它:
<wisdomVertxEngin.version>0.10.0</wisdomVertxEngin.version>
<dependency>
<groupId>org.wisdom-framework</groupId>
<artifactId>wisdom-vertx-engine</artifactId>
<version>${wisdomVertxEngin.version}</version>
</dependency>