将格式化输入流流式传输到服务器的问题

Issue with streaming formatted input stream to server

我正在尝试将 "formatted" 输入流写入 tomcat servlet(使用 Guice)。

根本问题如下:我想将数据从数据库直接流式传输到服务器。因此我加载数据,将其转换为 JSON 并将其上传到服务器。我不想先将 JSON 写入临时文件,这是由于性能问题而完成的,所以我想通过直接流式传输到服务器来绕过使用硬盘。

编辑:类似于

但是回答里有评论说丢失数据,我好像也有同样的问题

我写了一个"ModelInputStream"那个

  1. 在流式传输前一个模型时从数据库加载下一个模型
  2. 为类型(枚举序数)写入一个字节
  3. 为下一个字节数组的长度写入 4 个字节 (int)
  4. 写入字符串 (refId)
  5. 为下一个字节数组的长度写入 4 个字节 (int)
  6. 写入实际 json
  7. 重复直到流式传输所有模型

我还写了一个 "ModelStreamReader" 知道逻辑并相应地阅读。

当我直接测试它时它工作正常,但是一旦我在客户端创建 ModelInputStream 并使用服务器上的传入输入流与 ModelStreamReader 实际 json 字节小于 4定义长度的字节。我猜这是由于放气或压缩造成的。

我尝试了不同的内容 headers 来尝试禁用压缩等,但没有任何效果。

java.io.IOException: Unexpected length, expected 8586, received 7905

所以在客户端上 JSON 字节数组是 8586 字节长,当它到达服务器时是 7905 字节长,这打破了整个概念。

而且它似乎并没有真正流式传输,而是首先缓存从输入流返回的全部内容。

我需要如何调整调用代码才能获得我描述的结果?

模型输入流

package *;

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;

import ***.Daos;
import ***.IDatabase;
import ***.CategorizedEntity;
import ***.CategorizedDescriptor;
import ***.JsonExport;

import com.google.gson.Gson;
import com.google.gson.JsonObject;

public class ModelInputStream extends InputStream {

    private final Gson gson = new Gson();
    private final IDatabase db;
    private final Queue<CategorizedDescriptor> descriptors;
    private byte[] buffer = new byte[0];
    private int position = 0;

    public ModelInputStream(IDatabase db, List<CategorizedDescriptor> descriptors) {
        this.db = db;
        this.descriptors = new LinkedList<>();
        this.descriptors.addAll(descriptors);
    }

    @Override
    public int read() throws IOException {
        if (position == buffer.length) {
            if (descriptors.size() == 0)
                return -1;
            loadNext();
            position = 0;
        }
        return buffer[position++];
    }

    private void loadNext() throws IOException {
        CategorizedDescriptor descriptor = descriptors.poll();
        byte type = (byte) descriptor.getModelType().ordinal();
        byte[] refId = descriptor.getRefId().getBytes();
        byte[] json = getData(descriptor);
        buildBuffer(type, refId, json);
    }

    private byte[] getData(CategorizedDescriptor d) {
        CategorizedEntity entity = Daos.createCategorizedDao(db, d.getModelType()).getForId(d.getId());
        JsonObject object = JsonExport.toJson(entity);
        String json = gson.toJson(object);
        return json.getBytes();
    }

    private void buildBuffer(byte type, byte[] refId, byte[] json) throws IOException {
        buffer = new byte[1 + 4 + refId.length + 4 + json.length];
        int index = put(buffer, 0, type);
        index = put(buffer, index, asByteArray(refId.length));
        index = put(buffer, index, refId);
        index = put(buffer, index, asByteArray(json.length));
        put(buffer, index, json);
    }

    private byte[] asByteArray(int i) {
        return ByteBuffer.allocate(4).putInt(i).array();
    }

    private int put(byte[] array, int index, byte... bytes) {
        for (int i = 0; i < bytes.length; i++) {
            array[index + i] = bytes[i];
        }
        return index + bytes.length;
    }

}

ModelStreamReader

package *;

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;

import *.ModelType;

public class ModelStreamReader {

    private InputStream stream;

    public ModelStreamReader(InputStream stream) {
        this.stream = stream;
    }

    public Model next() throws IOException {
        int modelType = stream.read();
        if (modelType == -1)
            return null;
        Model next = new Model();
        next.type = ModelType.values()[modelType];
        next.refId = readNextPart();
        next.data = readNextPart();
        return next;
    }

    private String readNextPart() throws IOException {
        int length = readInt();
        byte[] bytes = readBytes(length);
        return new String(bytes);
    }

    private int readInt() throws IOException {
        byte[] bytes = readBytes(4);
        return ByteBuffer.wrap(bytes).getInt();
    }

    private byte[] readBytes(int length) throws IOException {
        byte[] buffer = new byte[length];
        int read = stream.read(buffer);
        if (read != length)
            throw new IOException("Unexpected length, expected " + length + ", received " + read);
        return buffer;
    }

    public class Model {

        public ModelType type;
        public String refId;
        public String data;

    }

}

调用代码

ModelInputStream stream = new ModelInputStream(db, getAll(db));
URL url = new URL("http://localhost:8080/ws/test/streamed");
HttpURLConnection con = (HttpURLConnection) url.openConnection();
con.setDoOutput(true);
con.setRequestMethod("POST");
con.connect();
int read = -1;
while ((read = stream.read()) != -1) {
    con.getOutputStream().write(read);
}
con.getOutputStream().flush();
System.out.println(con.getResponseCode());
System.out.println(con.getResponseMessage());
con.disconnect();

服务器部分(泽西网络资源)

package *.webservice;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.util.HashMap;
import java.util.List;
import java.util.UUID;

import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.core.Response;

import *.ModelStreamReader;
import *.ModelStreamReader.Model;

@Path("test")
public class TestResource {

    @POST
    @Path("streamed")
    public Response streamed(InputStream modelStream) throws IOException {
        ModelStreamReader reader = new ModelStreamReader(modelStream);
        writeDatasets(reader);
        return Response.ok(new HashMap<>()).build();
    }

    private void writeDatasets(ModelStreamReader reader) throws IOException {
        String commitId = UUID.randomUUID().toString();
        File dir = new File("/opt/tests/streamed/" + commitId);
        dir.mkdirs();
        Model dataset = null;
        while ((dataset = reader.next()) != null) {
            File file = new File(dir, dataset.refId);
            writeDataset(file, dataset.data);
        }
    }

    private void writeDataset(File file, String data) {
        try {
            if (data == null)
                file.createNewFile();
            else
                Files.write(file.toPath(), data.getBytes(Charset.forName("utf-8")));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}

我发现了性质完全不同的问题。

首先,输入流没有被压缩或任何东西。读取的字节必须移入 (0, 255) 范围而不是 (-128, 127)。因此,流读取被 -1 字节值中断。

模型输入流

@Override
public int read() throws IOException {
    ...
    return buffer[position++] + 128;
} 

其次,数据必须分块传输才能真正 "streaming"。因此 ModelStreamReader.readBytes(int) 方法必须额外调整为:

ModelStreamReader

private byte[] readBytes(int length) throws IOException {
    byte[] result = new byte[length];
    int totalRead = 0;
    int position = 0;
    int previous = -1;
    while (totalRead != length) {
        int read = stream.read();
        if (read != -1) {
            result[position++] = (byte) read - 128;
            totalRead++;
        } else if (previous == -1) {
            break;
        }
        previous = read;
    }
    return result;
}

最后必须将这一行添加到调用代码中:

...
HttpURLConnection con = (HttpURLConnection) url.openConnection();
con.setChunkedStreamingMode(1024 * 1024);
...

读取的字节必须移入 (0, 255) 范围(参见 ByteArrayInputStream)。

模型输入流

@Override
public int read() throws IOException {
    ...
    return buffer[position++] & 0xff;
} 

最后必须将这一行添加到调用代码中(用于分块):

...
HttpURLConnection con = (HttpURLConnection) url.openConnection();
con.setChunkedStreamingMode(1024 * 1024);
...