如何使用Flink中提供的成本估算器class获取Flink中的操作成本
how to get the operation cost in Flink using cost estimator class provided in Flink
我想做 Flink CEP 引擎的性能分析,我遇到了这些 classes
org.apache.flink.optimizer.costs.CostEstimator;
org.apache.flink.optimizer.costs.Costs;
org.apache.flink.optimizer.costs.DefaultCostEstimator;
但问题是我不知道如何使用这两个 class。有人可以向我提供有关如何在 Flink 中找到运算符 { join 例如} 的成本估算的代码或暗示。
下面是我在 Flink 中执行的连接代码
DataStream<JoinedEvent> joinedEventDataStream = stream1.join(stream2).where(new KeySelector<RRIntervalStreamEvent, Long>() {
@Override
public Long getKey(RRIntervalStreamEvent rrIntervalStreamEvent) throws Exception {
return rrIntervalStreamEvent.getTime();
}
})
.equalTo(new KeySelector<qrsIntervalStreamEvent, Long>() {
@Override
public Long getKey(qrsIntervalStreamEvent qrsIntervalStreamEvent) throws Exception {
return qrsIntervalStreamEvent.getTime();
}
})
.window(TumblingEventTimeWindows.of(Time.milliseconds(1000)))
.apply(new JoinFunction<RRIntervalStreamEvent, qrsIntervalStreamEvent, JoinedEvent>() {
@Override
public JoinedEvent join(RRIntervalStreamEvent rr, qrsIntervalStreamEvent qrs) throws Exception {
//getting the cost -- just checking
// costs.getCpuCost();
return new JoinedEvent(rr.getTime(),rr.getSensor_id(),qrs.getSensor_id(),rr.getRRInterval(),qrs.getQrsInterval());
}
});
如何计算此连接的成本?
开销类属于DataSet的优化器API(Flink的批处理API),而CEP库是建立在DataStreamAPI上的。 DataStream API 不利用 DataSet API。
CEP 库和数据集优化器完全无关。因此,不可能使用此代码来估算 CEP 模式的成本。我也不知道还有另一种内置方法可以估算 CEP 模式(或任何其他 DataStream 程序)的成本。
我想做 Flink CEP 引擎的性能分析,我遇到了这些 classes
org.apache.flink.optimizer.costs.CostEstimator;
org.apache.flink.optimizer.costs.Costs;
org.apache.flink.optimizer.costs.DefaultCostEstimator;
但问题是我不知道如何使用这两个 class。有人可以向我提供有关如何在 Flink 中找到运算符 { join 例如} 的成本估算的代码或暗示。
下面是我在 Flink 中执行的连接代码
DataStream<JoinedEvent> joinedEventDataStream = stream1.join(stream2).where(new KeySelector<RRIntervalStreamEvent, Long>() {
@Override
public Long getKey(RRIntervalStreamEvent rrIntervalStreamEvent) throws Exception {
return rrIntervalStreamEvent.getTime();
}
})
.equalTo(new KeySelector<qrsIntervalStreamEvent, Long>() {
@Override
public Long getKey(qrsIntervalStreamEvent qrsIntervalStreamEvent) throws Exception {
return qrsIntervalStreamEvent.getTime();
}
})
.window(TumblingEventTimeWindows.of(Time.milliseconds(1000)))
.apply(new JoinFunction<RRIntervalStreamEvent, qrsIntervalStreamEvent, JoinedEvent>() {
@Override
public JoinedEvent join(RRIntervalStreamEvent rr, qrsIntervalStreamEvent qrs) throws Exception {
//getting the cost -- just checking
// costs.getCpuCost();
return new JoinedEvent(rr.getTime(),rr.getSensor_id(),qrs.getSensor_id(),rr.getRRInterval(),qrs.getQrsInterval());
}
});
如何计算此连接的成本?
开销类属于DataSet的优化器API(Flink的批处理API),而CEP库是建立在DataStreamAPI上的。 DataStream API 不利用 DataSet API。
CEP 库和数据集优化器完全无关。因此,不可能使用此代码来估算 CEP 模式的成本。我也不知道还有另一种内置方法可以估算 CEP 模式(或任何其他 DataStream 程序)的成本。