Akka Stream:mapMaterializedValue 是什么意思
Akka Stream: what does mapMaterializedValue mean
我已阅读 Akka streams materialization concept,并且了解流实现是:
the process of taking a stream description (the graph) and allocating all the necessary resources it needs in order to run.
我按照一个示例使用 mapMaterializedValue 将消息发送到队列来构建我的 akka 流。该代码的目的是在构建流蓝图并且代码正常工作后将消息推送到队列,但我真的不明白 mapMaterrializaedValue 在代码中做了什么:
Promise<SourceQueueWithComplete<String>> promise = new Promise.DefaultPromise<>();
Source<String, SourceQueueWithComplete<String>> s = Source
.queue(100, OverflowStrategy.fail())
.mapMaterializaedValue(queue -> {
promise.trySuccess(queue);
});
source.toMat(Sink.foreach(x -> System.out.println(x)), Keep.left()).run(materIalizer);
promise.<SourceQueueWithComplete<String>>future().map(mapMapperFunction(), actorSystem.dispatcher());
mapMaterializedValue
的目的是在物化后立即转换物化值。例如,假设您有一个第三方库接受这样的回调:
interface Callback<T> {
void onNext(T next);
void onError(Throwable t);
void onComplete();
}
然后你可以创建一个 returns 一个 Source<T, Callback<T>>
的方法,当流实际上是 运行:[=16 时,你可以立即将其物化值传递给第三方库=]
<T> Source<T, Callback<T>> callbackSource() {
return Source.queue(1024, OverflowStrategy.fail())
.mapMaterializedValue(queue -> new Callback<T> {
// an implementation of Callback which pushes the data
// to the queue
});
}
Source<Integer, Callback<Integer>> source = callbackSource();
Callback<Integer> callback = source
.toMat(Sink.foreach(System.out::println), Keep.left())
.run(materializer);
thirdPartyApiObject.runSomethingWithCallback(callback);
你可以看到这里可以简化必须使用这种第三方的代码API因为你只做一次这个队列->回调转换并封装在一个方法中。
然而,在您的情况下,您并不真正需要它。您正在使用 mapMaterializedValue
来完成具有物化值的外部承诺,这是完全没有必要的,因为您可以在物化后直接使用物化值:
Source<String, SourceQueueWithComplete<String>> s = Source
.queue(100, OverflowStrategy.fail());
SourceQueueWithComplete<String> queue = source
.toMat(Sink.foreach(x -> System.out.println(x)), Keep.left())
.run(materIalizer);
mapMapperFunction().apply(queue);
我已阅读 Akka streams materialization concept,并且了解流实现是:
the process of taking a stream description (the graph) and allocating all the necessary resources it needs in order to run.
我按照一个示例使用 mapMaterializedValue 将消息发送到队列来构建我的 akka 流。该代码的目的是在构建流蓝图并且代码正常工作后将消息推送到队列,但我真的不明白 mapMaterrializaedValue 在代码中做了什么:
Promise<SourceQueueWithComplete<String>> promise = new Promise.DefaultPromise<>();
Source<String, SourceQueueWithComplete<String>> s = Source
.queue(100, OverflowStrategy.fail())
.mapMaterializaedValue(queue -> {
promise.trySuccess(queue);
});
source.toMat(Sink.foreach(x -> System.out.println(x)), Keep.left()).run(materIalizer);
promise.<SourceQueueWithComplete<String>>future().map(mapMapperFunction(), actorSystem.dispatcher());
mapMaterializedValue
的目的是在物化后立即转换物化值。例如,假设您有一个第三方库接受这样的回调:
interface Callback<T> {
void onNext(T next);
void onError(Throwable t);
void onComplete();
}
然后你可以创建一个 returns 一个 Source<T, Callback<T>>
的方法,当流实际上是 运行:[=16 时,你可以立即将其物化值传递给第三方库=]
<T> Source<T, Callback<T>> callbackSource() {
return Source.queue(1024, OverflowStrategy.fail())
.mapMaterializedValue(queue -> new Callback<T> {
// an implementation of Callback which pushes the data
// to the queue
});
}
Source<Integer, Callback<Integer>> source = callbackSource();
Callback<Integer> callback = source
.toMat(Sink.foreach(System.out::println), Keep.left())
.run(materializer);
thirdPartyApiObject.runSomethingWithCallback(callback);
你可以看到这里可以简化必须使用这种第三方的代码API因为你只做一次这个队列->回调转换并封装在一个方法中。
然而,在您的情况下,您并不真正需要它。您正在使用 mapMaterializedValue
来完成具有物化值的外部承诺,这是完全没有必要的,因为您可以在物化后直接使用物化值:
Source<String, SourceQueueWithComplete<String>> s = Source
.queue(100, OverflowStrategy.fail());
SourceQueueWithComplete<String> queue = source
.toMat(Sink.foreach(x -> System.out.println(x)), Keep.left())
.run(materIalizer);
mapMapperFunction().apply(queue);