将格式化输入流流式传输到服务器的问题
Issue with streaming formatted input stream to server
我正在尝试将 "formatted" 输入流写入 tomcat servlet(使用 Guice)。
根本问题如下:我想将数据从数据库直接流式传输到服务器。因此我加载数据,将其转换为 JSON 并将其上传到服务器。我不想先将 JSON 写入临时文件,这是由于性能问题而完成的,所以我想通过直接流式传输到服务器来绕过使用硬盘。
编辑:类似于
但是回答里有评论说丢失数据,我好像也有同样的问题
我写了一个"ModelInputStream"那个
- 在流式传输前一个模型时从数据库加载下一个模型
- 为类型(枚举序数)写入一个字节
- 为下一个字节数组的长度写入 4 个字节 (int)
- 写入字符串 (refId)
- 为下一个字节数组的长度写入 4 个字节 (int)
- 写入实际 json
- 重复直到流式传输所有模型
我还写了一个 "ModelStreamReader" 知道逻辑并相应地阅读。
当我直接测试它时它工作正常,但是一旦我在客户端创建 ModelInputStream 并使用服务器上的传入输入流与 ModelStreamReader 实际 json 字节小于 4定义长度的字节。我猜这是由于放气或压缩造成的。
我尝试了不同的内容 headers 来尝试禁用压缩等,但没有任何效果。
java.io.IOException: Unexpected length, expected 8586, received 7905
所以在客户端上 JSON 字节数组是 8586 字节长,当它到达服务器时是 7905 字节长,这打破了整个概念。
而且它似乎并没有真正流式传输,而是首先缓存从输入流返回的全部内容。
我需要如何调整调用代码才能获得我描述的结果?
模型输入流
package *;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import ***.Daos;
import ***.IDatabase;
import ***.CategorizedEntity;
import ***.CategorizedDescriptor;
import ***.JsonExport;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
public class ModelInputStream extends InputStream {
private final Gson gson = new Gson();
private final IDatabase db;
private final Queue<CategorizedDescriptor> descriptors;
private byte[] buffer = new byte[0];
private int position = 0;
public ModelInputStream(IDatabase db, List<CategorizedDescriptor> descriptors) {
this.db = db;
this.descriptors = new LinkedList<>();
this.descriptors.addAll(descriptors);
}
@Override
public int read() throws IOException {
if (position == buffer.length) {
if (descriptors.size() == 0)
return -1;
loadNext();
position = 0;
}
return buffer[position++];
}
private void loadNext() throws IOException {
CategorizedDescriptor descriptor = descriptors.poll();
byte type = (byte) descriptor.getModelType().ordinal();
byte[] refId = descriptor.getRefId().getBytes();
byte[] json = getData(descriptor);
buildBuffer(type, refId, json);
}
private byte[] getData(CategorizedDescriptor d) {
CategorizedEntity entity = Daos.createCategorizedDao(db, d.getModelType()).getForId(d.getId());
JsonObject object = JsonExport.toJson(entity);
String json = gson.toJson(object);
return json.getBytes();
}
private void buildBuffer(byte type, byte[] refId, byte[] json) throws IOException {
buffer = new byte[1 + 4 + refId.length + 4 + json.length];
int index = put(buffer, 0, type);
index = put(buffer, index, asByteArray(refId.length));
index = put(buffer, index, refId);
index = put(buffer, index, asByteArray(json.length));
put(buffer, index, json);
}
private byte[] asByteArray(int i) {
return ByteBuffer.allocate(4).putInt(i).array();
}
private int put(byte[] array, int index, byte... bytes) {
for (int i = 0; i < bytes.length; i++) {
array[index + i] = bytes[i];
}
return index + bytes.length;
}
}
ModelStreamReader
package *;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import *.ModelType;
public class ModelStreamReader {
private InputStream stream;
public ModelStreamReader(InputStream stream) {
this.stream = stream;
}
public Model next() throws IOException {
int modelType = stream.read();
if (modelType == -1)
return null;
Model next = new Model();
next.type = ModelType.values()[modelType];
next.refId = readNextPart();
next.data = readNextPart();
return next;
}
private String readNextPart() throws IOException {
int length = readInt();
byte[] bytes = readBytes(length);
return new String(bytes);
}
private int readInt() throws IOException {
byte[] bytes = readBytes(4);
return ByteBuffer.wrap(bytes).getInt();
}
private byte[] readBytes(int length) throws IOException {
byte[] buffer = new byte[length];
int read = stream.read(buffer);
if (read != length)
throw new IOException("Unexpected length, expected " + length + ", received " + read);
return buffer;
}
public class Model {
public ModelType type;
public String refId;
public String data;
}
}
调用代码
ModelInputStream stream = new ModelInputStream(db, getAll(db));
URL url = new URL("http://localhost:8080/ws/test/streamed");
HttpURLConnection con = (HttpURLConnection) url.openConnection();
con.setDoOutput(true);
con.setRequestMethod("POST");
con.connect();
int read = -1;
while ((read = stream.read()) != -1) {
con.getOutputStream().write(read);
}
con.getOutputStream().flush();
System.out.println(con.getResponseCode());
System.out.println(con.getResponseMessage());
con.disconnect();
服务器部分(泽西网络资源)
package *.webservice;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.util.HashMap;
import java.util.List;
import java.util.UUID;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.core.Response;
import *.ModelStreamReader;
import *.ModelStreamReader.Model;
@Path("test")
public class TestResource {
@POST
@Path("streamed")
public Response streamed(InputStream modelStream) throws IOException {
ModelStreamReader reader = new ModelStreamReader(modelStream);
writeDatasets(reader);
return Response.ok(new HashMap<>()).build();
}
private void writeDatasets(ModelStreamReader reader) throws IOException {
String commitId = UUID.randomUUID().toString();
File dir = new File("/opt/tests/streamed/" + commitId);
dir.mkdirs();
Model dataset = null;
while ((dataset = reader.next()) != null) {
File file = new File(dir, dataset.refId);
writeDataset(file, dataset.data);
}
}
private void writeDataset(File file, String data) {
try {
if (data == null)
file.createNewFile();
else
Files.write(file.toPath(), data.getBytes(Charset.forName("utf-8")));
} catch (IOException e) {
e.printStackTrace();
}
}
}
我发现了性质完全不同的问题。
首先,输入流没有被压缩或任何东西。读取的字节必须移入 (0, 255) 范围而不是 (-128, 127)。因此,流读取被 -1 字节值中断。
模型输入流
@Override
public int read() throws IOException {
...
return buffer[position++] + 128;
}
其次,数据必须分块传输才能真正 "streaming"。因此 ModelStreamReader.readBytes(int) 方法必须额外调整为:
ModelStreamReader
private byte[] readBytes(int length) throws IOException {
byte[] result = new byte[length];
int totalRead = 0;
int position = 0;
int previous = -1;
while (totalRead != length) {
int read = stream.read();
if (read != -1) {
result[position++] = (byte) read - 128;
totalRead++;
} else if (previous == -1) {
break;
}
previous = read;
}
return result;
}
最后必须将这一行添加到调用代码中:
...
HttpURLConnection con = (HttpURLConnection) url.openConnection();
con.setChunkedStreamingMode(1024 * 1024);
...
读取的字节必须移入 (0, 255) 范围(参见 ByteArrayInputStream
)。
模型输入流
@Override
public int read() throws IOException {
...
return buffer[position++] & 0xff;
}
最后必须将这一行添加到调用代码中(用于分块):
...
HttpURLConnection con = (HttpURLConnection) url.openConnection();
con.setChunkedStreamingMode(1024 * 1024);
...
我正在尝试将 "formatted" 输入流写入 tomcat servlet(使用 Guice)。
根本问题如下:我想将数据从数据库直接流式传输到服务器。因此我加载数据,将其转换为 JSON 并将其上传到服务器。我不想先将 JSON 写入临时文件,这是由于性能问题而完成的,所以我想通过直接流式传输到服务器来绕过使用硬盘。
编辑:类似于
但是回答里有评论说丢失数据,我好像也有同样的问题
我写了一个"ModelInputStream"那个
- 在流式传输前一个模型时从数据库加载下一个模型
- 为类型(枚举序数)写入一个字节
- 为下一个字节数组的长度写入 4 个字节 (int)
- 写入字符串 (refId)
- 为下一个字节数组的长度写入 4 个字节 (int)
- 写入实际 json
- 重复直到流式传输所有模型
我还写了一个 "ModelStreamReader" 知道逻辑并相应地阅读。
当我直接测试它时它工作正常,但是一旦我在客户端创建 ModelInputStream 并使用服务器上的传入输入流与 ModelStreamReader 实际 json 字节小于 4定义长度的字节。我猜这是由于放气或压缩造成的。
我尝试了不同的内容 headers 来尝试禁用压缩等,但没有任何效果。
java.io.IOException: Unexpected length, expected 8586, received 7905
所以在客户端上 JSON 字节数组是 8586 字节长,当它到达服务器时是 7905 字节长,这打破了整个概念。
而且它似乎并没有真正流式传输,而是首先缓存从输入流返回的全部内容。
我需要如何调整调用代码才能获得我描述的结果?
模型输入流
package *;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import ***.Daos;
import ***.IDatabase;
import ***.CategorizedEntity;
import ***.CategorizedDescriptor;
import ***.JsonExport;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
public class ModelInputStream extends InputStream {
private final Gson gson = new Gson();
private final IDatabase db;
private final Queue<CategorizedDescriptor> descriptors;
private byte[] buffer = new byte[0];
private int position = 0;
public ModelInputStream(IDatabase db, List<CategorizedDescriptor> descriptors) {
this.db = db;
this.descriptors = new LinkedList<>();
this.descriptors.addAll(descriptors);
}
@Override
public int read() throws IOException {
if (position == buffer.length) {
if (descriptors.size() == 0)
return -1;
loadNext();
position = 0;
}
return buffer[position++];
}
private void loadNext() throws IOException {
CategorizedDescriptor descriptor = descriptors.poll();
byte type = (byte) descriptor.getModelType().ordinal();
byte[] refId = descriptor.getRefId().getBytes();
byte[] json = getData(descriptor);
buildBuffer(type, refId, json);
}
private byte[] getData(CategorizedDescriptor d) {
CategorizedEntity entity = Daos.createCategorizedDao(db, d.getModelType()).getForId(d.getId());
JsonObject object = JsonExport.toJson(entity);
String json = gson.toJson(object);
return json.getBytes();
}
private void buildBuffer(byte type, byte[] refId, byte[] json) throws IOException {
buffer = new byte[1 + 4 + refId.length + 4 + json.length];
int index = put(buffer, 0, type);
index = put(buffer, index, asByteArray(refId.length));
index = put(buffer, index, refId);
index = put(buffer, index, asByteArray(json.length));
put(buffer, index, json);
}
private byte[] asByteArray(int i) {
return ByteBuffer.allocate(4).putInt(i).array();
}
private int put(byte[] array, int index, byte... bytes) {
for (int i = 0; i < bytes.length; i++) {
array[index + i] = bytes[i];
}
return index + bytes.length;
}
}
ModelStreamReader
package *;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import *.ModelType;
public class ModelStreamReader {
private InputStream stream;
public ModelStreamReader(InputStream stream) {
this.stream = stream;
}
public Model next() throws IOException {
int modelType = stream.read();
if (modelType == -1)
return null;
Model next = new Model();
next.type = ModelType.values()[modelType];
next.refId = readNextPart();
next.data = readNextPart();
return next;
}
private String readNextPart() throws IOException {
int length = readInt();
byte[] bytes = readBytes(length);
return new String(bytes);
}
private int readInt() throws IOException {
byte[] bytes = readBytes(4);
return ByteBuffer.wrap(bytes).getInt();
}
private byte[] readBytes(int length) throws IOException {
byte[] buffer = new byte[length];
int read = stream.read(buffer);
if (read != length)
throw new IOException("Unexpected length, expected " + length + ", received " + read);
return buffer;
}
public class Model {
public ModelType type;
public String refId;
public String data;
}
}
调用代码
ModelInputStream stream = new ModelInputStream(db, getAll(db));
URL url = new URL("http://localhost:8080/ws/test/streamed");
HttpURLConnection con = (HttpURLConnection) url.openConnection();
con.setDoOutput(true);
con.setRequestMethod("POST");
con.connect();
int read = -1;
while ((read = stream.read()) != -1) {
con.getOutputStream().write(read);
}
con.getOutputStream().flush();
System.out.println(con.getResponseCode());
System.out.println(con.getResponseMessage());
con.disconnect();
服务器部分(泽西网络资源)
package *.webservice;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.util.HashMap;
import java.util.List;
import java.util.UUID;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.core.Response;
import *.ModelStreamReader;
import *.ModelStreamReader.Model;
@Path("test")
public class TestResource {
@POST
@Path("streamed")
public Response streamed(InputStream modelStream) throws IOException {
ModelStreamReader reader = new ModelStreamReader(modelStream);
writeDatasets(reader);
return Response.ok(new HashMap<>()).build();
}
private void writeDatasets(ModelStreamReader reader) throws IOException {
String commitId = UUID.randomUUID().toString();
File dir = new File("/opt/tests/streamed/" + commitId);
dir.mkdirs();
Model dataset = null;
while ((dataset = reader.next()) != null) {
File file = new File(dir, dataset.refId);
writeDataset(file, dataset.data);
}
}
private void writeDataset(File file, String data) {
try {
if (data == null)
file.createNewFile();
else
Files.write(file.toPath(), data.getBytes(Charset.forName("utf-8")));
} catch (IOException e) {
e.printStackTrace();
}
}
}
我发现了性质完全不同的问题。
首先,输入流没有被压缩或任何东西。读取的字节必须移入 (0, 255) 范围而不是 (-128, 127)。因此,流读取被 -1 字节值中断。
模型输入流
@Override
public int read() throws IOException {
...
return buffer[position++] + 128;
}
其次,数据必须分块传输才能真正 "streaming"。因此 ModelStreamReader.readBytes(int) 方法必须额外调整为:
ModelStreamReader
private byte[] readBytes(int length) throws IOException {
byte[] result = new byte[length];
int totalRead = 0;
int position = 0;
int previous = -1;
while (totalRead != length) {
int read = stream.read();
if (read != -1) {
result[position++] = (byte) read - 128;
totalRead++;
} else if (previous == -1) {
break;
}
previous = read;
}
return result;
}
最后必须将这一行添加到调用代码中:
...
HttpURLConnection con = (HttpURLConnection) url.openConnection();
con.setChunkedStreamingMode(1024 * 1024);
...
读取的字节必须移入 (0, 255) 范围(参见 ByteArrayInputStream
)。
模型输入流
@Override
public int read() throws IOException {
...
return buffer[position++] & 0xff;
}
最后必须将这一行添加到调用代码中(用于分块):
...
HttpURLConnection con = (HttpURLConnection) url.openConnection();
con.setChunkedStreamingMode(1024 * 1024);
...