Spring 反应文件集成
Spring reactive file integration
我正在尝试使用 spring-integration-file 来轮询目录并从放置在该目录中的文件创建反应流。这在大多数情况下都有效,但是当我放置一个文件但没有订阅者时,我会遇到异常。为了演示这个问题,我编写了一个小型演示应用程序:
import org.reactivestreams.Publisher;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.http.MediaType;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.file.dsl.Files;
import org.springframework.messaging.Message;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import java.io.File;
@SpringBootApplication
@RestController
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@Bean
public Publisher<Message<File>> reactiveSource() {
return IntegrationFlows
.from(Files.inboundAdapter(new File("."))
.patternFilter("*.csv"),
e -> e.poller(Pollers.fixedDelay(1000)))
.channel("processFileChannel")
.toReactivePublisher();
}
@GetMapping(value = "/files", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> files() {
return Flux.from(reactiveSource())
.map(message -> message.getPayload().getAbsolutePath());
}
}
因此,如果我现在对 localhost:8080/files 执行 curl,然后将 csv 文件放在项目的根目录中,一切都很好,我看到文件的路径作为对我的 curl 的响应。但是当我不做卷曲然后将文件放在根目录中时,我得到以下异常:
java.lang.IllegalStateException: The [bean 'reactiveSource.channel#0'; defined in: 'com.example.demo.DemoApplication';
from source: 'bean method reactiveSource'] doesn't have subscribers to accept messages
at org.springframework.util.Assert.state(Assert.java:97)
at org.springframework.integration.channel.FluxMessageChannel.doSend(FluxMessageChannel.java:61)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:570)
... 38 more
我认为反应流的属性之一是,当没有订阅者时,由于流是惰性的,流将不会启动。但显然情况并非如此。有人可以解释一下如果没有订阅者我需要做什么才能让流不开始吗?
如果您使用最新版本之一,那么您可以使用 FluxMessageChannel
频道而不是 "processFileChannel"
的 DirectChannel
。这样,SourcePollingChannel
适配器将变为反应式,并且实际上不会轮询源,直到订阅发生在 FluxMessageChannel
。
然后您从这个 FluxMessageChannel
在您的 files()
API 中创建一个 Flux
- .toReactivePublisher()
.
中不需要
重点是 .toReactivePublisher()
正好在此时将集成流程作为 Publisher
。在此之前的所有内容都是常规的命令式方式,并且独立于下游逻辑工作。
更新
像这样:
@Bean
FluxMessageChannel filesChannel() {
return new FluxMessageChannel();
}
@Bean
public IntegrationFlow reactiveSource() {
return IntegrationFlows
.from(Files.inboundAdapter(new File("."))
.patternFilter("*.csv"),
e -> e.poller(Pollers.fixedDelay(1000)))
.channel(filesChannel())
.get();
}
@GetMapping(value = "/files", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> files() {
return Flux.from(filesChannel())
.map(message -> ((File) message.getPayload()).getAbsolutePath());
}
我正在尝试使用 spring-integration-file 来轮询目录并从放置在该目录中的文件创建反应流。这在大多数情况下都有效,但是当我放置一个文件但没有订阅者时,我会遇到异常。为了演示这个问题,我编写了一个小型演示应用程序:
import org.reactivestreams.Publisher;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.http.MediaType;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.file.dsl.Files;
import org.springframework.messaging.Message;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import java.io.File;
@SpringBootApplication
@RestController
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@Bean
public Publisher<Message<File>> reactiveSource() {
return IntegrationFlows
.from(Files.inboundAdapter(new File("."))
.patternFilter("*.csv"),
e -> e.poller(Pollers.fixedDelay(1000)))
.channel("processFileChannel")
.toReactivePublisher();
}
@GetMapping(value = "/files", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> files() {
return Flux.from(reactiveSource())
.map(message -> message.getPayload().getAbsolutePath());
}
}
因此,如果我现在对 localhost:8080/files 执行 curl,然后将 csv 文件放在项目的根目录中,一切都很好,我看到文件的路径作为对我的 curl 的响应。但是当我不做卷曲然后将文件放在根目录中时,我得到以下异常:
java.lang.IllegalStateException: The [bean 'reactiveSource.channel#0'; defined in: 'com.example.demo.DemoApplication';
from source: 'bean method reactiveSource'] doesn't have subscribers to accept messages
at org.springframework.util.Assert.state(Assert.java:97)
at org.springframework.integration.channel.FluxMessageChannel.doSend(FluxMessageChannel.java:61)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:570)
... 38 more
我认为反应流的属性之一是,当没有订阅者时,由于流是惰性的,流将不会启动。但显然情况并非如此。有人可以解释一下如果没有订阅者我需要做什么才能让流不开始吗?
如果您使用最新版本之一,那么您可以使用 FluxMessageChannel
频道而不是 "processFileChannel"
的 DirectChannel
。这样,SourcePollingChannel
适配器将变为反应式,并且实际上不会轮询源,直到订阅发生在 FluxMessageChannel
。
然后您从这个 FluxMessageChannel
在您的 files()
API 中创建一个 Flux
- .toReactivePublisher()
.
重点是 .toReactivePublisher()
正好在此时将集成流程作为 Publisher
。在此之前的所有内容都是常规的命令式方式,并且独立于下游逻辑工作。
更新
像这样:
@Bean
FluxMessageChannel filesChannel() {
return new FluxMessageChannel();
}
@Bean
public IntegrationFlow reactiveSource() {
return IntegrationFlows
.from(Files.inboundAdapter(new File("."))
.patternFilter("*.csv"),
e -> e.poller(Pollers.fixedDelay(1000)))
.channel(filesChannel())
.get();
}
@GetMapping(value = "/files", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> files() {
return Flux.from(filesChannel())
.map(message -> ((File) message.getPayload()).getAbsolutePath());
}