使用 Vert.x 逐行读取文件(AsyncFile 和 RecordParser 帮助)
Using Vert.x to read file line by line (AsyncFile and RecordParser help)
我正在尝试使用 Vert.x 从文件系统中读取一个大文件并逐行处理它。从核心文档中,我认为实现这一点的方法是通过 AsyncFile
和 RecordParser
。理想情况下,我想要 Pump
数据(以避免背压),但 RecordParser
不是 WriteStream
:
AsyncFile asyncFile = vertx.fileSystem().openBlocking(/*path and options*/);
RecordParser recordParser = RecordParser.newDelimited("\n", bufferedLine -> {
// Do something per line
});
Pump.pump(asyncFile, recordParser).start(); // Error - RecordParser cannot be converted to WriteStream
所以我想我必须自己抽水?我试过类似的东西:
RecordParser recordParser = RecordParser.newDelimited("\n", bufferedLine -> {
// Do something per line
// I can see this code get run
})
.exceptionHandler(cause -> {
// Do I need this? What are the repercussions if I don't have this handler? Are exceptions just lost?
})
.endHandler(_void -> {
// This never gets called!
});
asyncFile.handler(recordParser); // Crazy Java8 syntax passes recordParser.handle to this handler :)
但是,我不确定我的文件是否正在关闭,因为 recordParser.endHandler
从未被调用(尽管我可以看到正在调用的行处理程序)。
我做错了什么?文件没有关闭?我尝试将 endHandler
添加到 asyncFile
并关闭它,但这没有用。
理想情况下,我宁愿 Pump
工作。 Pump
在这种情况下有什么方法可以使用吗?
提前致谢!
P.S.: 对不起 "crosspost".
你可以按照你说的 AsyncFile
和 RecordParser
来做。
RecordParser recordParser = RecordParser.newDelimited("\n", bufferedLine -> {
System.out.println("bufferedLine = " + bufferedLine);
});
asyncFile.handler(recordParser)
.endHandler(v -> {
asyncFile.close();
System.out.println("Done");
});
如果你这样做,你应该在 AsyncFile
上设置异常处理程序,而不是在 RecordParser
.
上
使用上面的代码,文件将被正确关闭。
However, I'm not sure my file is being closed as the recordParser.endHandler never gets called (although I can see the line handler being called).
实际上,只有 RecordParser
是通过包装另一个 ReadStream
创建的,endHandler
和 exceptionHandler
才会被调用。在您的情况下,它们不是必需的。
此外,关于您的评论:
// Crazy Java8 syntax passes recordParser.handle to this handler :)
这不是 Java 8 技巧,只是 AsyncFile.handler()
需要 Handler<Buffer>
而 RecordParser
实现了这个接口。
我正在尝试使用 Vert.x 从文件系统中读取一个大文件并逐行处理它。从核心文档中,我认为实现这一点的方法是通过 AsyncFile
和 RecordParser
。理想情况下,我想要 Pump
数据(以避免背压),但 RecordParser
不是 WriteStream
:
AsyncFile asyncFile = vertx.fileSystem().openBlocking(/*path and options*/);
RecordParser recordParser = RecordParser.newDelimited("\n", bufferedLine -> {
// Do something per line
});
Pump.pump(asyncFile, recordParser).start(); // Error - RecordParser cannot be converted to WriteStream
所以我想我必须自己抽水?我试过类似的东西:
RecordParser recordParser = RecordParser.newDelimited("\n", bufferedLine -> {
// Do something per line
// I can see this code get run
})
.exceptionHandler(cause -> {
// Do I need this? What are the repercussions if I don't have this handler? Are exceptions just lost?
})
.endHandler(_void -> {
// This never gets called!
});
asyncFile.handler(recordParser); // Crazy Java8 syntax passes recordParser.handle to this handler :)
但是,我不确定我的文件是否正在关闭,因为 recordParser.endHandler
从未被调用(尽管我可以看到正在调用的行处理程序)。
我做错了什么?文件没有关闭?我尝试将 endHandler
添加到 asyncFile
并关闭它,但这没有用。
理想情况下,我宁愿 Pump
工作。 Pump
在这种情况下有什么方法可以使用吗?
提前致谢!
P.S.: 对不起 "crosspost".
你可以按照你说的 AsyncFile
和 RecordParser
来做。
RecordParser recordParser = RecordParser.newDelimited("\n", bufferedLine -> {
System.out.println("bufferedLine = " + bufferedLine);
});
asyncFile.handler(recordParser)
.endHandler(v -> {
asyncFile.close();
System.out.println("Done");
});
如果你这样做,你应该在 AsyncFile
上设置异常处理程序,而不是在 RecordParser
.
使用上面的代码,文件将被正确关闭。
However, I'm not sure my file is being closed as the recordParser.endHandler never gets called (although I can see the line handler being called).
实际上,只有 RecordParser
是通过包装另一个 ReadStream
创建的,endHandler
和 exceptionHandler
才会被调用。在您的情况下,它们不是必需的。
此外,关于您的评论:
// Crazy Java8 syntax passes recordParser.handle to this handler :)
这不是 Java 8 技巧,只是 AsyncFile.handler()
需要 Handler<Buffer>
而 RecordParser
实现了这个接口。