如何在反应性 Spring 数据中应用分页?

How apply pagination in reactive Spring Data?

在 Spring 数据中,我们有 PagingAndSortingRepository 继承自 CrudRepository。在反应性 Spring 数据中,我们只有 ReactiveSortingRepository 继承自 ReactiveCrudRepository。 我们如何以反应方式进行分页? 例如,我们将来可以用 ReactivePagingAndSortingRepository 来实现吗?

Reactive Spring Data MongoDB 存储库不提供分页,因为它是为命令式存储库设计的。命令式分页在获取页面时需要额外的细节。特别是:

  • 分页查询返回记录数
  • 可选,如果返回的记录数为零或与页面大小匹配,则查询产生的记录总数以计算总页数

这两个方面都不符合高效、非阻塞资源使用的概念。等到收到所有记录(以确定分页详细信息的第一块)将消除您通过响应式数据访问获得的大部分好处。此外,执行计数查询相当昂贵,并且会增加延迟,直到您能够处理数据。

您仍然可以通过将 Pageable (PageRequest) 传递给存储库查询方法来自己获取数据块:

interface ReactivePersonRepository extends Repository<Person, Long> {

  Flux<Person> findByFirstnameOrderByLastname(String firstname, Pageable pageable);
}

Spring 数据将通过将 Pageable 转换为 LIMITOFFSET.

对查询应用分页

参考文献:

import com.thepracticaldeveloper.reactiveweb.domain.Quote;
import org.springframework.data.domain.Pageable;
import org.springframework.data.mongodb.repository.Query;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
import reactor.core.publisher.Flux;

public interface QuoteMongoReactiveRepository extends ReactiveCrudRepository<Quote, String> {

    @Query("{ id: { $exists: true }}")
    Flux<Quote> retrieveAllQuotesPaged(final Pageable page);
}

更多详情,you could check here

我用这种方法为可能仍在寻找解决方案的任何人创建了一个服务:

@Resource
private UserRepository userRepository; //Extends ReactiveSortingRepository<User, String>

public Mono<Page<User>> findAllUsersPaged(Pageable pageable) {
        return this.userRepository.count()
                .flatMap(userCount -> {
                    return this.userRepository.findAll(pageable.getSort())
                            .buffer(pageable.getPageSize(),(pageable.getPageNumber() + 1))
                            .elementAt(pageable.getPageNumber(), new ArrayList<>())
                            .map(users -> new PageImpl<User>(users, pageable, userCount));
                });
    }

我使用@kn3l 解决方案创建了另一种方法(不使用@Query):

fun findByIdNotNull(page: Pageable): Flux< Quote>

它创建相同的查询而不使用@Query 方法

public Mono<Page<ChatUser>> findByChannelIdPageable(String channelId, Integer page, Integer size) {
    Pageable pageable = PageRequest.of(page, size, Sort.by(Sort.Direction.DESC, "chatChannels.joinedTime"));
    Criteria criteria = new Criteria("chatChannels.chatChannelId").is(channelId);
    Query query = new Query().with(pageable);
    query.addCriteria(criteria);
    Flux<ChatUser> chatUserFlux = reactiveMongoTemplate.find(query, ChatUser.class, "chatUser");
    Mono<Long> countMono = reactiveMongoTemplate.count(Query.of(query).limit(-1).skip(-1), ChatUser.class);
    return Mono.zip(chatUserFlux.collectList(),countMono).map(tuple2 -> {
        return PageableExecutionUtils.getPage(
                tuple2.getT1(),
                pageable,
                () -> tuple2.getT2());
    });
}

我遇到了同样的问题,最终采用了与上述类似的方法,但在我使用 Query DSL 时稍微更改了代码,如果有人需要,可以参考以下示例。

@Repository
public interface PersonRepository extends ReactiveMongoRepository<Person, String>, ReactiveQuerydslPredicateExecutor<Person> {

    default Flux<Person> applyPagination(Flux<Person> persons, Pageable pageable) {
    return persons.buffer(pageable.getPageSize(), (pageable.getPageNumber() + 1))
        .elementAt(pageable.getPageNumber(), new ArrayList<>())
        .flatMapMany(Flux::fromIterable);
    }

}


public Flux<Person> findAll(Pageable pageable, Predicate predicate) {
    return personRepository.applyPagination(personRepository.findAll(predicate), pageable);
}

在寻找反应式可分页存储库的一些想法时,我看到了会导致可怕的样板代码的解决方案,所以我最终得到了这个(在现实生活中还没有尝试过,但应该可以正常工作,或者它可能是灵感为您的解决方案)

所以……让我们用这样的方法创建一个全新的工具箱class

    public static 
           <R extends PageableForReactiveMongo<S, K>, S, T, K> Mono<Page<T>>
           pageableForReactiveMongo(Pageable pageable, 
                                             R repository, Class<T> clazzTo) {
        return repository.count()
                .flatMap(c ->
                        repository.findOderByLimitedTo(pageable.getSort(),
                                              pageable.getPageNumber() + 1)
                                .buffer(pageable.getPageSize(), (pageable.getPageNumber() + 1))
                                .elementAt(pageable.getPageNumber(), new ArrayList<>())
                                .map(r -> mapToPage(pageable, c, r, clazzTo))
                );
    }

它还需要这样的东西:

    private static <S, T> Page<T> mapToPage(Pageable pageable, Long userCount, Collection<S> collection, Class<T> clazzTo) {
        return new PageImpl<>(
                collection.stream()
                        .map(r -> mapper.map(r, clazzTo))
                        .collect(Collectors.toList())
                , pageable, userCount);
    }

然后我们还需要一个包装反应式存储库的抽象层

public interface PageableForReactiveMongo<D, K> extends ReactiveMongoRepository<D, K> {
    Flux<D> findOderByLimitedTo(Sort sort, int i);
}

让它实例化spring

@Repository
interface ControllerRepository extends PageableForReactiveMongo<ControllerDocument, String> {
}

最后就这样用了很多很多次

public Mono<Page<Controller>> findAllControllers(Pageable pageable) {
    return getFromPageableForReactiveMongo(pageable, controllerRepository, Controller.class);
}

这就是您的代码的样子 :) 请告诉我它是否正常,或者有什么帮助