将文档流发送到 Jersey @POST 端点

Sending a stream of documents to a Jersey @POST endpoint

我希望能够将一堆文档流发送到网络服务。这将节省 Http request/response 开销并专注于文档本身。

在 python 中你可以这样做:

r = requests.post('https://stream.twitter.com/1/statuses/filter.json',
    data={'track': 'requests'}, auth=('username', 'password'),
    stream=True)

for line in r.iter_lines():
    if line: # filter out keep-alive new lines
        print json.loads(line)

我正在寻找一个将请求流式传输到 Jersey rest api 的示例。我希望看到客户端和服务器端显示它正在工作。但我正在努力寻找一个例子。

示例理想情况下会显示:

Client:
  Open request
  Iterate over huge document list
    Write document to open request stream
  Close request

Server:
  @POST method
    Open entity stream
    Iterate over entity stream while next document is available
        Process document
    Close entity stream              

如果我们做对了,您将在服务器上处理实体,同时仍在客户端上发送它们!巨大的胜利!

完成此操作的最简单方法之一是让 Jersey 为 POST 处理程序提供 HTTP POST 主体的 InputStream。该方法可以使用您选择的 InputStream 和 JSON 解析器来解析然后处理每个对象。

在下面的示例中,Jackson ObjectReader 生成一个 MappingIterator,它解析和处理数组中的每个 Person 文档,因为它被传送到服务器

/**
 * Parse and process an arbitrarily large JSON array of Person documents
 */
@Path("persons")
public static class PersonResource {

    private static final ObjectReader reader = new ObjectMapper().readerFor(Person.class);

    @Path("inputstream")
    @Consumes("application/json")
    @POST
    public void inputstream(final InputStream is) throws IOException {
        final MappingIterator<Person> persons = reader.readValues(is);
        while (persons.hasNext()) {
            final Person person = persons.next();
            // process
            System.out.println(person);
        }
    }
}

同样,当配置了 Jackson ObjectMapper 时,Jersey 客户端框架可以发送文档流。以下示例使用 Jersey 测试框架对此进行了演示。客户端流式传输 Person 文档

的任意大迭代器
public class JacksonStreamingTest extends JerseyTest {

    @Override
    protected Application configure() {
        return new ResourceConfig(PersonResource.class, ObjectMapperProvider.class);
    }

    /**
     * Registers the application {@link ObjectMapper} as the JAX-RS provider for application/json
     */
    @Provider
    @Produces(MediaType.APPLICATION_JSON)
    public static class ObjectMapperProvider implements ContextResolver<ObjectMapper> {

        private static final ObjectMapper mapper = new ObjectMapper();

        public ObjectMapper getContext(final Class<?> objectType) {
            return mapper;
        }
    }

    @Override
    protected void configureClient(final ClientConfig config) {
        config.register(ObjectMapperProvider.class);
    }

    @Test
    public void test() {
        final Set<Person> persons = Collections.singleton(Person.of("Tracy", "Jordan"));
        final Response response = target("persons/inputstream").request().post(Entity.json(persons.iterator()));
        assertThat(response.getStatus()).isEqualTo(204);
    }
}