Spring 反应文件集成

Spring reactive file integration

我正在尝试使用 spring-integration-file 来轮询目录并从放置在该目录中的文件创建反应流。这在大多数情况下都有效,但是当我放置一个文件但没有订阅者时,我会遇到异常。为了演示这个问题,我编写了一个小型演示应用程序:

import org.reactivestreams.Publisher;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.http.MediaType;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.file.dsl.Files;
import org.springframework.messaging.Message;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;

import java.io.File;

@SpringBootApplication
@RestController
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

    @Bean
    public Publisher<Message<File>> reactiveSource() {
        return IntegrationFlows
                .from(Files.inboundAdapter(new File("."))
                                .patternFilter("*.csv"),
                        e -> e.poller(Pollers.fixedDelay(1000)))
                .channel("processFileChannel")
                .toReactivePublisher();
    }

    @GetMapping(value = "/files", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> files() {
        return Flux.from(reactiveSource())
                .map(message -> message.getPayload().getAbsolutePath());
    }
}

因此,如果我现在对 localhost:8080/files 执行 curl,然后将 csv 文件放在项目的根目录中,一切都很好,我看到文件的路径作为对我的 curl 的响应。但是当我不做卷曲然后将文件放在根目录中时,我得到以下异常:

java.lang.IllegalStateException: The [bean 'reactiveSource.channel#0'; defined in: 'com.example.demo.DemoApplication'; 
from source: 'bean method reactiveSource'] doesn't have subscribers to accept messages
        at org.springframework.util.Assert.state(Assert.java:97)
        at org.springframework.integration.channel.FluxMessageChannel.doSend(FluxMessageChannel.java:61)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:570)
        ... 38 more

我认为反应流的属性之一是,当没有订阅者时,由于流是惰性的,流将不会启动。但显然情况并非如此。有人可以解释一下如果没有订阅者我需要做什么才能让流不开始吗?

如果您使用最新版本之一,那么您可以使用 FluxMessageChannel 频道而不是 "processFileChannel"DirectChannel。这样,SourcePollingChannel 适配器将变为反应式,并且实际上不会轮询源,直到订阅发生在 FluxMessageChannel

然后您从这个 FluxMessageChannel 在您的 files() API 中创建一个 Flux - .toReactivePublisher().

中不需要

在文档中查看更多信息:https://docs.spring.io/spring-integration/docs/current/reference/html/reactive-streams.html#source-polling-channel-adapter

重点是 .toReactivePublisher() 正好在此时将集成流程作为 Publisher。在此之前的所有内容都是常规的命令式方式,并且独立于下游逻辑工作。

更新

像这样:

@Bean
FluxMessageChannel filesChannel() {
    return new FluxMessageChannel();
}

@Bean
public IntegrationFlow reactiveSource() {
        return IntegrationFlows
                .from(Files.inboundAdapter(new File("."))
                                .patternFilter("*.csv"),
                        e -> e.poller(Pollers.fixedDelay(1000)))
                .channel(filesChannel())
                .get();
    }

@GetMapping(value = "/files", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> files() {
        return Flux.from(filesChannel())
                .map(message -> ((File) message.getPayload()).getAbsolutePath());
    }