使用 Vert.x 逐行读取文件(AsyncFile 和 RecordParser 帮助)

Using Vert.x to read file line by line (AsyncFile and RecordParser help)

我正在尝试使用 Vert.x 从文件系统中读取一个大文件并逐行处理它。从核心文档中,我认为实现这一点的方法是通过 AsyncFileRecordParser。理想情况下,我想要 Pump 数据(以避免背压),但 RecordParser 不是 WriteStream:

AsyncFile asyncFile = vertx.fileSystem().openBlocking(/*path and options*/);

RecordParser recordParser = RecordParser.newDelimited("\n", bufferedLine -> {
  // Do something per line
});

Pump.pump(asyncFile, recordParser).start(); // Error - RecordParser cannot be converted to WriteStream

所以我想我必须自己抽水?我试过类似的东西:

RecordParser recordParser = RecordParser.newDelimited("\n", bufferedLine -> {
  // Do something per line
  // I can see this code get run
})
.exceptionHandler(cause -> {
  // Do I need this? What are the repercussions if I don't have this handler? Are exceptions just lost?
})
.endHandler(_void -> {
  // This never gets called!
});

asyncFile.handler(recordParser); // Crazy Java8 syntax passes recordParser.handle to this handler :)

但是,我不确定我的文件是否正在关闭,因为 recordParser.endHandler 从未被调用(尽管我可以看到正在调用的行处理程序)。

我做错了什么?文件没有关闭?我尝试将 endHandler 添加到 asyncFile 并关闭它,但这没有用。

理想情况下,我宁愿 Pump 工作。 Pump在这种情况下有什么方法可以使用吗?

提前致谢!

P.S.: 对不起 "crosspost".

你可以按照你说的 AsyncFileRecordParser 来做。

RecordParser recordParser = RecordParser.newDelimited("\n", bufferedLine -> {
  System.out.println("bufferedLine = " + bufferedLine);
});

asyncFile.handler(recordParser)
    .endHandler(v -> {
      asyncFile.close();
      System.out.println("Done");
    });

如果你这样做,你应该在 AsyncFile 上设置异常处理程序,而不是在 RecordParser.

使用上面的代码,文件将被正确关闭。

However, I'm not sure my file is being closed as the recordParser.endHandler never gets called (although I can see the line handler being called).

实际上,只有 RecordParser 是通过包装另一个 ReadStream 创建的,endHandlerexceptionHandler 才会被调用。在您的情况下,它们不是必需的。

此外,关于您的评论:

// Crazy Java8 syntax passes recordParser.handle to this handler :)

这不是 Java 8 技巧,只是 AsyncFile.handler() 需要 Handler<Buffer>RecordParser 实现了这个接口。