Hazelcast Jet 0.6.1 - Dag 定义

Hazelcast Jet 0.6.1 - Dag Definition

Hazelcast Jet 在控制台上打印 DAG 定义,一旦启动

这会将管道定义转换为 DAG。

这是管道定义。

    private Pipeline buildPipeline() {
 Pipeline p = Pipeline.create();
 p.drawFrom(Sources.<String, Record>remoteMapJournal("record", getClientConfig(), START_FROM_OLDEST))
          .addTimestamps((v) ->  getTimeStamp(v), 3000)
          .peek()
          .groupingKey((v) -> Tuple2.tuple2(getUserID(v),getTranType(v)))
    .window(WindowDefinition.sliding(SLIDING_WINDOW_LENGTH_MILLIS, SLIDE_STEP_MILLIS))
    .aggregate(counting())
    .map((v)-> getMapKey(v))
    .drainTo(Sinks.remoteMap("Test", getClientConfig()));
    return p;
  }

这是打印在控制台上的 DAG 定义。

 .vertex("remoteMapJournalSource(record)").localParallelism(1)
.vertex("sliding-window-step1").localParallelism(4)
.vertex("sliding-window-step2").localParallelism(4)
.vertex("map").localParallelism(4)
.vertex("remoteMapSink(Test)").localParallelism(1)
.edge(between("remoteMapJournalSource(record)", "sliding-window-step1").partitioned(?))
.edge(between("sliding-window-step1", "sliding-window-step2").partitioned(?).distributed())
.edge(between("sliding-window-step2", "map"))
.edge(between("map", "remoteMapSink(Test)"))

有什么方法可以获取包含滑动 window 详细信息、聚合 API 等所有详细信息的 DAG 定义?

不,这在技术上是不可能的。如果您编写一个 lambda(例如用于密钥提取器),则无法显示定义该 lambda 的代码。获取更多信息的唯一方法是将该信息嵌入到顶点名称中。

在 Jet 0.7 中,此打印输出将更改为 graphviz 格式,以便您可以将其复制粘贴到工具中并以图像形式查看 DAG。