Mono<> 与 Spring Cloud Stream
Mono<> with Spring Cloud Stream
我 运行 在尝试将 Reactor 中的 Mono 与 Spring Cloud Stream 结合使用时遇到问题,无法真正弄清楚发生了什么。
假设我有一个这样的听众:
@StreamListener
@Output(Urls.OUTUT)
public Flux<String> expandUrls(@Input(Urls.INPUT) Flux<String> urlFormats)
{
return urlFormats
.map(this::expandUrl)
.flatMapIterable(urls -> urls);
}
所以它基本上是将像这样 http://www.example.com/page/%d
格式的 url 扩展成像这样
http://www.example.com/page/1
http://www.example.com/page/2
http://www.example.com/page/3
它按预期工作,但是当我尝试这样做时:
@StreamListener
@Output(Urls.OUTPUT)
public Flux<String> expandUrls(@Input(Urls.INPUT) Mono<String> urlFormats)
{
return urlFormats
.repeat(3)
.zipWith(pageNumbers)
.map(this::formatUrl);
}
其中 pageNumber 是 Flux.fromStream(Stream.iterate(1, p -> p+1).limit(3))
我得到以下异常
Caused by: java.lang.IllegalArgumentException: A method annotated with @StreamListener may use @Input or @Output annotations only in declarative mode and for parameters that are binding targets or convertible from binding targets.
我通过这样做摆脱了异常
@StreamListener(value = Urls.INPUT)
@Output(Urls.OUTPUT)
public Flux<String> expandUrls(Mono<String> urlFormats)
{
return urlFormats
.repeat(3)
.zipWith(pageNumbers)
.map(this::formatUrl);
}
但现在我明白了:
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'http': was expecting ('true', 'false' or 'null')
at [Source: http://www.example.com/page-%d,1,0.html;
我的问题是:如何将 Mono 与 Spring Cloud Stream 一起使用。甚至可以像这样使用它吗?如果是,那么该怎么做?
哦,我正在使用 Kafka 作为 kafka-starter 的代理。
Spring Cloud Stream StreamListener 的 @Input 参数类型支持反应器类型 Flux
只是因为它比在 @Input 参数类型上具有 Mono
更适合反应式流应用程序。
我 运行 在尝试将 Reactor 中的 Mono 与 Spring Cloud Stream 结合使用时遇到问题,无法真正弄清楚发生了什么。
假设我有一个这样的听众:
@StreamListener
@Output(Urls.OUTUT)
public Flux<String> expandUrls(@Input(Urls.INPUT) Flux<String> urlFormats)
{
return urlFormats
.map(this::expandUrl)
.flatMapIterable(urls -> urls);
}
所以它基本上是将像这样 http://www.example.com/page/%d
格式的 url 扩展成像这样
http://www.example.com/page/1
http://www.example.com/page/2
http://www.example.com/page/3
它按预期工作,但是当我尝试这样做时:
@StreamListener
@Output(Urls.OUTPUT)
public Flux<String> expandUrls(@Input(Urls.INPUT) Mono<String> urlFormats)
{
return urlFormats
.repeat(3)
.zipWith(pageNumbers)
.map(this::formatUrl);
}
其中 pageNumber 是 Flux.fromStream(Stream.iterate(1, p -> p+1).limit(3))
我得到以下异常
Caused by: java.lang.IllegalArgumentException: A method annotated with @StreamListener may use @Input or @Output annotations only in declarative mode and for parameters that are binding targets or convertible from binding targets.
我通过这样做摆脱了异常
@StreamListener(value = Urls.INPUT)
@Output(Urls.OUTPUT)
public Flux<String> expandUrls(Mono<String> urlFormats)
{
return urlFormats
.repeat(3)
.zipWith(pageNumbers)
.map(this::formatUrl);
}
但现在我明白了:
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'http': was expecting ('true', 'false' or 'null')
at [Source: http://www.example.com/page-%d,1,0.html;
我的问题是:如何将 Mono 与 Spring Cloud Stream 一起使用。甚至可以像这样使用它吗?如果是,那么该怎么做? 哦,我正在使用 Kafka 作为 kafka-starter 的代理。
Spring Cloud Stream StreamListener 的 @Input 参数类型支持反应器类型 Flux
只是因为它比在 @Input 参数类型上具有 Mono
更适合反应式流应用程序。