Spring-data-elasticsearch:更新到 7.6.2 后无法从 Flux<SearchHit<Sugestao>> 转换为 Flux<Sugestao>。如何处理SearchHit?

Spring-data-elasticsearch: cannot convert from Flux<SearchHit<Sugestao>> to Flux<Sugestao> after updated to 7.6.2. How deal with SearchHit?

上下文:我想在 ElasticSearch 和 Spring WebFlux 的完整反应堆组合中使用 ElasticSearch。

这是我第一次使用 springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient 和 springframework.data.elasticsearch.core.ReactiveElasticsearchOperations。我曾使用 MongoDb 在反应式堆栈中工作,但这是我第一次使用 ElasticSearch。

我已经成功地遵循了使用 ReactiveElasticsearchOperations 和 spring-data-elasticsearch-3.2.6 和 elasticsearch-6.8.7 (Elastic Tutorial)

的教程

并且 findAll/findById 与 elastic-6.8.7 和 spring-data-elasticsearch-3.2.6

一起正常工作

我的模型服务:

...
    private final ReactiveElasticsearchOperations reactiveElasticsearchOperations;

    private final ReactiveElasticsearchClient reactiveElasticsearchClient;

    public MyModelServiceImpl(ReactiveElasticsearchOperations reactiveElasticsearchOperations,
                              ReactiveElasticsearchClient reactiveElasticsearchClient) {
        this.reactiveElasticsearchOperations = reactiveElasticsearchOperations;
        this.reactiveElasticsearchClient = reactiveElasticsearchClient;
    }

    @Override
    public Mono<MyModel> findMyModelById(String id){

        return reactiveElasticsearchOperations.findById(
            id,
            MyModel.class,
            MYMODEL_ES_INDEX,
            DEFAULT_ES_DOC_TYPE
        ).doOnError(throwable -> logger.error(throwable.getMessage(), throwable));
    }

    @Override
    public Flux<MyModel> findAllMyModels(String field, String value){

        NativeSearchQueryBuilder query = new NativeSearchQueryBuilder();

        if (!StringUtils.isEmpty(field) && !StringUtils.isEmpty(value)) {

            query.withQuery(QueryBuilders.matchQuery(field, value));
        }

        return reactiveElasticsearchOperations.find(
            query.build(),
            MyModel.class,
            MYMODEL_ES_INDEX
        ).doOnError(throwable -> logger.error(throwable.getMessage(), throwable));
    }

我尝试对更新版本(spring-data-elasticsearch-4 和 elast-7.6.2)遵循相同的想法。因为我可以阅读 "Deprecated. since 4.0, use search(Query, ...) Flux emitting matching entities one by one wrapped in a SearchHit." 然后我完全卡住了,因为结果是包裹在 SearchHit 中。好吧,四处搜索我不明白为什么这样的包装器既不明白如何通过控制器方法 convert/map/flatMap/etc 将我的模型的 Flux 变为 return。

这里是我暂时导致这个问题主题中提到的问题:

服务:

import com.poc.favoritos.model.Sugestao;
import org.elasticsearch.index.query.QueryBuilders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient;
import org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations;
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
import org.springframework.util.StringUtils;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class SugestaoServiceImpl implements SugestaoService{

    private static final Logger logger = LoggerFactory.getLogger(SugestaoServiceImpl.class);

    private final ReactiveElasticsearchOperations reactiveElasticsearchOperations;

    private final ReactiveElasticsearchClient reactiveElasticsearchClient;

    public SugestaoServiceImpl(ReactiveElasticsearchOperations reactiveElasticsearchOperations,
                              ReactiveElasticsearchClient reactiveElasticsearchClient) {
        this.reactiveElasticsearchOperations = reactiveElasticsearchOperations;
        this.reactiveElasticsearchClient = reactiveElasticsearchClient;
    }

    @Override
    public Mono<Sugestao> findSugestaoById(String id) {
        return reactiveElasticsearchOperations.get(id, Sugestao.class)
            .doOnError(throwable -> logger.error(throwable.getMessage(), throwable));
    }

    @Override
    public Flux<Sugestao> findAllMySugestoes(String field, String value) {
        NativeSearchQueryBuilder query = new NativeSearchQueryBuilder();

        if (!StringUtils.isEmpty(field) && !StringUtils.isEmpty(value)) {

            query.withQuery(QueryBuilders.matchQuery(field, value));
        }

        return reactiveElasticsearchOperations.search(query.build(), Sugestao.class);

    }

}

ElastiSearchConfig 最初是从 Same tutorial mentioned above . Honestly, I am not sure why do I need and what is this config adding to my project. BTW, I am studding it also from operations reference 复制的。

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.elasticsearch.client.ClientConfiguration;
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient;
import org.springframework.data.elasticsearch.client.reactive.ReactiveRestClients;
import org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations;
import org.springframework.data.elasticsearch.core.ReactiveElasticsearchTemplate;
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
import org.springframework.data.elasticsearch.core.convert.MappingElasticsearchConverter;
import org.springframework.data.elasticsearch.core.mapping.SimpleElasticsearchMappingContext;
import org.springframework.web.reactive.function.client.ExchangeStrategies;

@Configuration
public class ElasticsearchConfig {

    @Bean
    public ReactiveElasticsearchClient reactiveElasticsearchClient() {
        ClientConfiguration clientConfiguration = ClientConfiguration.builder()
            .connectedTo(elassandraHostAndPort)
            .withWebClientConfigurer(webClient -> {
                ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder()
                    .codecs(configurer -> configurer.defaultCodecs()
                        .maxInMemorySize(-1))
                    .build();
                return webClient.mutate().exchangeStrategies(exchangeStrategies).build();
            })
            .build();

        return ReactiveRestClients.create(clientConfiguration);
    }

    @Bean
    public ElasticsearchConverter elasticsearchConverter() {
        return new MappingElasticsearchConverter(elasticsearchMappingContext());
    }

    @Bean
    public SimpleElasticsearchMappingContext elasticsearchMappingContext() {
        return new SimpleElasticsearchMappingContext();
    }

    @Bean
    public ReactiveElasticsearchOperations reactiveElasticsearchOperations() {
        return new ReactiveElasticsearchTemplate(reactiveElasticsearchClient(), elasticsearchConverter());
    }

    @Value("${spring.data.elasticsearch.client.reactive.endpoints}")
    private String elassandraHostAndPort;

}

至于 SearchHit:此 class 包含来自搜索结果的信息,该信息不是实体的一部分,而是搜索结果的一部分,例如分数、排序值、突出显示条目。 如果您不需要这个并且只想单独使用实体的 Flux:

Flux<SearchHit<Entity>> fluxSearchHits = ...

Flux<Entity> fluxEntity = fluxSearchHits.map(searchHit -> searchHit.getContent);

至于配置:

您需要 ReactiveElasticsearchClient bean 来配置 Spring Data Elasticsearch。其他 3 个豆子:不知道它们为什么在那里; Spring Data Elasticsearch 4.0

不需要它们

编辑 2020 年 5 月 16 日:

配置:您应该从 AbstractReactiveElasticsearchConfiguration 派生您的配置 class,然后您不需要其他 bean,因为基础 class 定义了必要的东西:

@Configuration
public class ElasticsearchConfig extends AbstractReactiveElasticsearchConfiguration{

    @Value("${spring.data.elasticsearch.client.reactive.endpoints}")
    private String elassandraHostAndPort;

    @Bean
    public ReactiveElasticsearchClient reactiveElasticsearchClient() {
        ClientConfiguration clientConfiguration = ClientConfiguration.builder()
                .connectedTo(elassandraHostAndPort)
                .build();

        return ReactiveRestClients.create(clientConfiguration);
    }
}

并且仅当您检索大型结果集并且结果缓冲区的默认内存大小太低时才需要自定义 WebClientConfiguration。