无法处理 StreamingFileUpload
Failing to process StreamingFileUpload
我在尝试循环遍历 Micronaut 提供的 StreamingFileUpload 的各个部分并将字节读入 ByteArrayOutputStream(因此它可以一起批处理并发送到 S3)时遇到了一个问题,这会导致 NullPointerException 或 IndexOutOfBoundsException。
应用程序启动时第一次上传有时会失败,但后续上传似乎没问题(第一次上传后从未遇到过任何上传问题)。不清楚我实现 ByteArrayOutputStream 的方式是否有错误。
final baos = new ByteArrayOutputStream()
baos.withCloseable { stream ->
Flowable.fromPublisher( file )
.all( { partData ->
stream.write( partData.bytes )
true
} ).blockingGet()
}
异常
ERROR i.m.h.s.netty.RoutingInBoundHandler - Unexpected error occurred: null
java.lang.NullPointerException: null
at io.netty.buffer.CompositeByteBuf.updateComponentOffsets(CompositeByteBuf.java:573)
at io.netty.buffer.CompositeByteBuf.removeComponent(CompositeByteBuf.java:593)
错误的替代变体
ERROR i.m.h.s.netty.RoutingInBoundHandler - Unexpected error occurred: cIndex: 14 (expected: >= 0 && <= numComponents(13))
java.lang.IndexOutOfBoundsException: cIndex: 14 (expected: >= 0 && <= numComponents(13))
at io.netty.buffer.CompositeByteBuf.checkComponentIndex(CompositeByteBuf.java:548)
这些错误是由于样本代码处理部分不正确以及 .blockingGet
的不正确使用,对部分数据的访问不是线程安全操作。
感谢 jameskleeh 的解决方案。
@Override
Single<HttpResponse<String>> upload(StreamingFileUpload file) {
log.info("Received file upload for file: ${file.filename}")
int partCounter = 0
Single.<HttpResponse<String>>create({ emitter ->
ByteArrayOutputStream baos = new ByteArrayOutputStream()
baos.withCloseable {
file.subscribe(new Subscriber<PartData>() {
private Subscription s
@Override
void onSubscribe(Subscription s) {
this.s = s
s.request(1)
}
@Override
void onNext(PartData partData) {
log.info("Processing part ${partCounter++}")
baos.write(partData.bytes)
s.request(1)
}
@Override
void onError(Throwable t) {
emitter.onError(t)
}
@Override
void onComplete() {
log.info("Successfully streamed file. Output length: ${baos.size()}")
emitter.onSuccess(HttpResponse.ok('Success'))
}
})
}
})
}
https://github.com/micronaut-projects/micronaut-core/issues/2488
我在尝试循环遍历 Micronaut 提供的 StreamingFileUpload 的各个部分并将字节读入 ByteArrayOutputStream(因此它可以一起批处理并发送到 S3)时遇到了一个问题,这会导致 NullPointerException 或 IndexOutOfBoundsException。
应用程序启动时第一次上传有时会失败,但后续上传似乎没问题(第一次上传后从未遇到过任何上传问题)。不清楚我实现 ByteArrayOutputStream 的方式是否有错误。
final baos = new ByteArrayOutputStream()
baos.withCloseable { stream ->
Flowable.fromPublisher( file )
.all( { partData ->
stream.write( partData.bytes )
true
} ).blockingGet()
}
异常
ERROR i.m.h.s.netty.RoutingInBoundHandler - Unexpected error occurred: null
java.lang.NullPointerException: null
at io.netty.buffer.CompositeByteBuf.updateComponentOffsets(CompositeByteBuf.java:573)
at io.netty.buffer.CompositeByteBuf.removeComponent(CompositeByteBuf.java:593)
错误的替代变体
ERROR i.m.h.s.netty.RoutingInBoundHandler - Unexpected error occurred: cIndex: 14 (expected: >= 0 && <= numComponents(13))
java.lang.IndexOutOfBoundsException: cIndex: 14 (expected: >= 0 && <= numComponents(13))
at io.netty.buffer.CompositeByteBuf.checkComponentIndex(CompositeByteBuf.java:548)
这些错误是由于样本代码处理部分不正确以及 .blockingGet
的不正确使用,对部分数据的访问不是线程安全操作。
感谢 jameskleeh 的解决方案。
@Override
Single<HttpResponse<String>> upload(StreamingFileUpload file) {
log.info("Received file upload for file: ${file.filename}")
int partCounter = 0
Single.<HttpResponse<String>>create({ emitter ->
ByteArrayOutputStream baos = new ByteArrayOutputStream()
baos.withCloseable {
file.subscribe(new Subscriber<PartData>() {
private Subscription s
@Override
void onSubscribe(Subscription s) {
this.s = s
s.request(1)
}
@Override
void onNext(PartData partData) {
log.info("Processing part ${partCounter++}")
baos.write(partData.bytes)
s.request(1)
}
@Override
void onError(Throwable t) {
emitter.onError(t)
}
@Override
void onComplete() {
log.info("Successfully streamed file. Output length: ${baos.size()}")
emitter.onSuccess(HttpResponse.ok('Success'))
}
})
}
})
}
https://github.com/micronaut-projects/micronaut-core/issues/2488