Reactive Spring Boot API 包装 Elasticsearch 的异步批量索引
Reactive Spring Boot API wrapping Elasticsearch's async bulk indexing
我正在为一个新项目开发原型。这个想法是为 Elasticsearch 中的批量索引文档提供 Reactive Spring Boot 微服务。 Elasticsearch 提供了一个高级 Rest 客户端,它提供了一个异步方法来批量处理索引请求。提到了异步使用侦听器传递回调 here. The callbacks receive index responses (per requests) in batches. I am trying to send this response back to the client as Flux. I have come up with something based on this blog post。
控制器
@RestController
public class AppController {
@SuppressWarnings("unchecked")
@RequestMapping(value = "/test3", method = RequestMethod.GET)
public Flux<String> index3() {
ElasticAdapter es = new ElasticAdapter();
JSONObject json = new JSONObject();
json.put("TestDoc", "Stack123");
Flux<String> fluxResponse = es.bulkIndex(json);
return fluxResponse;
}
弹性适配器
@Component
class ElasticAdapter {
String indexName = "test2";
private final RestHighLevelClient client;
private final ObjectMapper mapper;
private int processed = 1;
Flux<String> bulkIndex(JSONObject doc) {
return bulkIndexDoc(doc)
.doOnError(e -> System.out.print("Unable to index {}" + doc+ e));
}
private Flux<String> bulkIndexDoc(JSONObject doc) {
return Flux.create(sink -> {
try {
doBulkIndex(doc, bulkListenerToSink(sink));
} catch (JsonProcessingException e) {
sink.error(e);
}
});
}
private void doBulkIndex(JSONObject doc, BulkProcessor.Listener listener) throws JsonProcessingException {
System.out.println("Going to submit index request");
BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer =
(request, bulkListener) ->
client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
BulkProcessor.Builder builder =
BulkProcessor.builder(bulkConsumer, listener);
builder.setBulkActions(10);
BulkProcessor bulkProcessor = builder.build();
// Submitting 5,000 index requests ( repeating same JSON)
for (int i = 0; i < 5000; i++) {
IndexRequest indexRequest = new IndexRequest(indexName, "person", i+1+"");
String json = doc.toJSONString();
indexRequest.source(json, XContentType.JSON);
bulkProcessor.add(indexRequest);
}
System.out.println("Submitted all docs
}
private BulkProcessor.Listener bulkListenerToSink(FluxSink<String> sink) {
return new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
}
@SuppressWarnings("unchecked")
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
for (BulkItemResponse bulkItemResponse : response) {
JSONObject json = new JSONObject();
json.put("id", bulkItemResponse.getResponse().getId());
json.put("status", bulkItemResponse.getResponse().getResult
sink.next(json.toJSONString());
processed++;
}
if(processed >= 5000) {
sink.complete();
}
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
failure.printStackTrace();
sink.error(failure);
}
};
}
public ElasticAdapter() {
// Logic to initialize Elasticsearch Rest Client
}
}
我使用 FluxSink 创建响应通量以发送回客户端。在这一点上,我不知道这是否正确。
我的期望是调用客户端应该以 10 个为一组接收响应(因为批量处理器以 10 个为一组处理它 - builder.setBulkActions(10);
)。我尝试使用 Spring Webflix 客户端使用端点。但是无法解决。这是我试过的
WebClient
public class FluxClient {
public static void main(String[] args) {
WebClient client = WebClient.create("http://localhost:8080");
Flux<String> responseFlux = client.get()
.uri("/test3")
.retrieve()
.bodyToFlux(String.class);
responseFlux.subscribe(System.out::println);
}
}
如我所料,控制台上没有打印任何内容。我尝试使用 System.out.println(responseFlux.blockFirst());
。它在末尾将所有响应打印为单个批次,而不是在 .
如果我的做法是正确的,那么正确的食用方法是什么?对于我心目中的解决方案,这个客户端将驻留在另一个 Webapp 中。
注:我对 Reactor API 的理解是有限的。使用的elasticsearch版本是6.8.
因此对您的代码进行了以下更改。
在 ElasticAdapter 中,
public Flux<Object> bulkIndex(JSONObject doc) {
return bulkIndexDoc(doc)
.subscribeOn(Schedulers.elastic(), true)
.doOnError(e -> System.out.print("Unable to index {}" + doc+ e));
}
在 Flux 上调用了 subscribeOn(Scheduler, requestOnSeparateThread),从中了解它,https://github.com/spring-projects/spring-framework/issues/21507
在 FluxClient 中,
Flux<String> responseFlux = client.get()
.uri("/test3")
.headers(httpHeaders -> {
httpHeaders.set("Accept", "text/event-stream");
})
.retrieve()
.bodyToFlux(String.class);
responseFlux.delayElements(Duration.ofSeconds(1)).subscribe(System.out::println);
将 "Accept" header 添加为 "text/event-stream" 并延迟了 Flux 元素。
通过以上改动,可以实时的从服务器得到响应。
我正在为一个新项目开发原型。这个想法是为 Elasticsearch 中的批量索引文档提供 Reactive Spring Boot 微服务。 Elasticsearch 提供了一个高级 Rest 客户端,它提供了一个异步方法来批量处理索引请求。提到了异步使用侦听器传递回调 here. The callbacks receive index responses (per requests) in batches. I am trying to send this response back to the client as Flux. I have come up with something based on this blog post。
控制器
@RestController
public class AppController {
@SuppressWarnings("unchecked")
@RequestMapping(value = "/test3", method = RequestMethod.GET)
public Flux<String> index3() {
ElasticAdapter es = new ElasticAdapter();
JSONObject json = new JSONObject();
json.put("TestDoc", "Stack123");
Flux<String> fluxResponse = es.bulkIndex(json);
return fluxResponse;
}
弹性适配器
@Component
class ElasticAdapter {
String indexName = "test2";
private final RestHighLevelClient client;
private final ObjectMapper mapper;
private int processed = 1;
Flux<String> bulkIndex(JSONObject doc) {
return bulkIndexDoc(doc)
.doOnError(e -> System.out.print("Unable to index {}" + doc+ e));
}
private Flux<String> bulkIndexDoc(JSONObject doc) {
return Flux.create(sink -> {
try {
doBulkIndex(doc, bulkListenerToSink(sink));
} catch (JsonProcessingException e) {
sink.error(e);
}
});
}
private void doBulkIndex(JSONObject doc, BulkProcessor.Listener listener) throws JsonProcessingException {
System.out.println("Going to submit index request");
BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer =
(request, bulkListener) ->
client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
BulkProcessor.Builder builder =
BulkProcessor.builder(bulkConsumer, listener);
builder.setBulkActions(10);
BulkProcessor bulkProcessor = builder.build();
// Submitting 5,000 index requests ( repeating same JSON)
for (int i = 0; i < 5000; i++) {
IndexRequest indexRequest = new IndexRequest(indexName, "person", i+1+"");
String json = doc.toJSONString();
indexRequest.source(json, XContentType.JSON);
bulkProcessor.add(indexRequest);
}
System.out.println("Submitted all docs
}
private BulkProcessor.Listener bulkListenerToSink(FluxSink<String> sink) {
return new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
}
@SuppressWarnings("unchecked")
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
for (BulkItemResponse bulkItemResponse : response) {
JSONObject json = new JSONObject();
json.put("id", bulkItemResponse.getResponse().getId());
json.put("status", bulkItemResponse.getResponse().getResult
sink.next(json.toJSONString());
processed++;
}
if(processed >= 5000) {
sink.complete();
}
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
failure.printStackTrace();
sink.error(failure);
}
};
}
public ElasticAdapter() {
// Logic to initialize Elasticsearch Rest Client
}
}
我使用 FluxSink 创建响应通量以发送回客户端。在这一点上,我不知道这是否正确。
我的期望是调用客户端应该以 10 个为一组接收响应(因为批量处理器以 10 个为一组处理它 - builder.setBulkActions(10);
)。我尝试使用 Spring Webflix 客户端使用端点。但是无法解决。这是我试过的
WebClient
public class FluxClient {
public static void main(String[] args) {
WebClient client = WebClient.create("http://localhost:8080");
Flux<String> responseFlux = client.get()
.uri("/test3")
.retrieve()
.bodyToFlux(String.class);
responseFlux.subscribe(System.out::println);
}
}
如我所料,控制台上没有打印任何内容。我尝试使用 System.out.println(responseFlux.blockFirst());
。它在末尾将所有响应打印为单个批次,而不是在 .
如果我的做法是正确的,那么正确的食用方法是什么?对于我心目中的解决方案,这个客户端将驻留在另一个 Webapp 中。
注:我对 Reactor API 的理解是有限的。使用的elasticsearch版本是6.8.
因此对您的代码进行了以下更改。
在 ElasticAdapter 中,
public Flux<Object> bulkIndex(JSONObject doc) {
return bulkIndexDoc(doc)
.subscribeOn(Schedulers.elastic(), true)
.doOnError(e -> System.out.print("Unable to index {}" + doc+ e));
}
在 Flux 上调用了 subscribeOn(Scheduler, requestOnSeparateThread),从中了解它,https://github.com/spring-projects/spring-framework/issues/21507
在 FluxClient 中,
Flux<String> responseFlux = client.get()
.uri("/test3")
.headers(httpHeaders -> {
httpHeaders.set("Accept", "text/event-stream");
})
.retrieve()
.bodyToFlux(String.class);
responseFlux.delayElements(Duration.ofSeconds(1)).subscribe(System.out::println);
将 "Accept" header 添加为 "text/event-stream" 并延迟了 Flux 元素。
通过以上改动,可以实时的从服务器得到响应。