将对 OutputStream 的写入转换为 ServerResponse 可用的 Flux<DataBuffer>

Convert writes to OutputStream into a Flux<DataBuffer> usable by ServerResponse

我有一个遗留库,我必须用它来检索文件。这个遗留库在 InputStream 中没有 return,正如您通常期望的那样用于读取内容,但它期望它传递一个开放的 OutputStream,它可以写入。

我必须编写 Webflux REST 服务,将此 OutputStream 写入 org.springframework.web.reactive.function.server.ServerResponse 正文。

legacyLib.BlobRead(outputStream); // writes the stream to an outputstream, that has to be provided by me, and somehow has to end up in the ServerResponse

因为我想将 Stream 直接传递给 ServerResponse,我可能必须做这样的事情,对吗?

ServerResponse.ok().body(magicOutpuStreamToFluxConverter(), DataBuffer.class);

这是 RequestHandler 的重要部分。我遗漏了一些 errorhandling/catching 例外情况,通常可能不需要。请注意,我 publishedOn 一个不同的 Scheduler 用于读取(或者至少,这就是我想要做的),以便此阻塞读取不会干扰我的主事件线程:

    private Mono<ServerResponse> writeToServerResponse(@NotNull FPTag tag) {
        final long blobSize = tag.getBlobSize();
        return ServerResponse.ok()
            .contentType(MediaType.APPLICATION_OCTET_STREAM)
            .body(Flux.<DataBuffer>create((FluxSink<DataBuffer> emitter) -> {
          // for a really big blob I want to read it in chunks, so that my server doesn't use too much memory
          for(int i = 0; i < blobSize; i+= tagChunkSize) {
            // new DataBuffer that is written to, then emitted later
            DefaultDataBuffer dataBuffer = new DefaultDataBufferFactory().allocateBuffer();
            try (OutputStream outputStream = dataBuffer.asOutputStream()) {
              // write to the outputstream of DataBuffer
              tag.BlobReadPartial(outputStream, i, tagChunkSize, FPLibraryConstants.FP_OPTION_DEFAULT_OPTIONS);
              // don't know if flushing is strictly neccessary
              outputStream.flush();
            } catch (IOException | FPLibraryException e) {
              log.error("Error reading + writing from tag to http outputstream", e);
              emitter.error(e);
            }
            emitter.next(dataBuffer);
          }
          // if blob is finished, send "complete" to my flux of DataBuffers
          emitter.complete();
        }, FluxSink.OverflowStrategy.BUFFER).publishOn(Schedulers.newElastic("centera")).doOnComplete(() -> closeQuietly(tag)), DataBuffer.class);
  
    }