反应式@StreamListener

Reactive @StreamListener

快点。

是否可以使用 Spring Cloud Stream 进行反应式 @StreamListener 输入?我的意思是这样的:

@StreamListener
public void log(@Input(Sink.INPUT) Flux<String> strings)
{
    strings.log();
}

在文档中我只看到了这样的例子:

@StreamListener(Sink.INPUT)
public void log(String message)
{
    log.info(message);
}

或这样的处理器:

@StreamListener
@Output(Source.OUTPUT)
public Flux<String> log(@Input(Sink.INPUT) Flux<String> strings)
{
    return strings.map(String::toUpperCase);
}

当我 运行 来自第一个片段的代码时,我得到一个 Dispatcher has no subscribers 异常。

前两个是等价的。如果 运行 遇到错误,请打开 GitHub 问题。

编辑:

这两个注解是等价的,但是如果对输入通量有订阅,监听器只会订阅输入。为了完全等效,反应式应该是:

@StreamListener
public void log(@Input(Sink.INPUT) Flux<String> strings)
{
    strings.log().subscribe();
}

@StreamListener
public void log(@Input(Sink.INPUT) Flux<String> strings)
{
    strings.subscribe(log::info);
}

只需在 flux 上调用 log() 即可创建另一个 Flux 而无需自己订阅。