如何窥探 Hazelcast Jet Vertex 的工作原理?
How to peek into the workings of a Hazelcast Jet Vertex?
我正在尝试将几周前编写的 hazelcast Jet 0.3 DAG 系统重做为 v0.4,作为将其从批处理更改为流处理的第一步。有趣的是,我突然遇到了一些奇怪的行为,我无法确定顶点是否按预期工作。试图了解正在发生的事情,但我找不到关于如何窥视每个顶点的内部运作的选项。有没有办法至少从中获取一些错误消息?
为了隔离问题,我试图将其简化为一个非常简单的 "read from list, map it to a map write to map" DAG。但是还是没有成功。
在我这个愚蠢的例子下面,也许我犯了一个非常简单的错误,更有知识的人会马上看到?
发布者:
// every second via executorservice:
final IStreamMap<Long, List<byte[]>> data = jet.getMap("data");
data.set(jet.getHazelcastInstance().getAtomicLong("key").getAndIncrement(), myByteArray);
分析仪:
jet.getList(key.toString()).addAll((List<byte[]>) jet.getMap("data").get(key));
jet.getMap("data").remove(key);
logger.debug("List {} has size: {}", key, jet.getList(key.toString()).size());
final Vertex sourceDataMap = this.newVertex("sourceDataMap", readList(key.toString())).localParallelism(1);
final Vertex parseByteArrayToMap = this.newVertex("parseByteArrayToMap", map(
(byte[] e) -> new AbstractMap.SimpleEntry<>(jet.getHazelcastInstance().getAtomicLong("counter").getAndIncrement(), e)));
final Vertex sinkIntoResultMap = this.newVertex("sinkIntoResultMap", writeMap("result"));
this.edge(between(sourceDataMap, parseByteArrayToMap))
.edge(between(parseByteArrayToMap, sinkIntoResultMap));
听众:
jet.getMap("result").addEntryListener((EntryAddedListener<Long, byte[]>)
(EntryEvent<Long, byte[]> entryEvent)
-> logger.debug("Got result: {} at {}",entryEvent.getValue().length, System.currentTimeMillis())
, true);
数据生成工作正常,一切正常,直到 DAG 应该接管...但没有错误消息或来自 DAG 的任何内容。有什么建议吗?
这是你的代码,经过稍微清理后可以在我这边工作:
public class Main {
public static void main(String[] args) throws Exception {
JetInstance jet = Jet.newJetInstance();
try {
HazelcastInstance hz = jet.getHazelcastInstance();
ILogger logger = hz.getLoggingService().getLogger("a");
// every second via executorservice:
final IStreamMap<Long, List<byte[]>> data = jet.getMap("data");
List<byte[]> myByteArray = asList(new byte[1], new byte[2]);
IAtomicLong keyGen = hz.getAtomicLong("key");
Long key = keyGen.getAndIncrement();
data.set(key, myByteArray);
String stringKey = key.toString();
hz.getList(stringKey).addAll((List<byte[]>) jet.getMap("data").get(key));
jet.getMap("data").remove(key);
logger.severe(String.format("List %s has size: %d", key, jet.getList(stringKey).size()));
hz.getMap("result").addEntryListener((EntryAddedListener<Long, byte[]>)
(EntryEvent<Long, byte[]> entryEvent) -> logger.severe(String.format(
"Got result: %d at %d", entryEvent.getValue().length, System.currentTimeMillis())),
true);
DAG dag = new DAG();
Vertex sourceDataMap = dag.newVertex("sourceDataMap", readList(stringKey)).localParallelism(1);
Vertex parseByteArrayToMap = dag.newVertex("parseByteArrayToMap", map(
(byte[] e) -> entry(randomUUID(), e)));
Vertex sinkIntoResultMap = dag.newVertex("sinkIntoResultMap", writeMap("result"));
dag.edge(between(sourceDataMap, parseByteArrayToMap))
.edge(between(parseByteArrayToMap, sinkIntoResultMap));
jet.newJob(dag).execute().get();
Thread.sleep(1000);
} finally {
Jet.shutdownAll();
}
}
}
在控制台中我看到:
SEVERE: [192.168.5.12]:5701 [jet] [0.4-SNAPSHOT] [3.8.2] List 0 has size: 2
SEVERE: [192.168.5.12]:5701 [jet] [0.4-SNAPSHOT] [3.8.2] Got result: 2 at 1498822322228
SEVERE: [192.168.5.12]:5701 [jet] [0.4-SNAPSHOT] [3.8.2] Got result: 1 at 1498822322228
我正在尝试将几周前编写的 hazelcast Jet 0.3 DAG 系统重做为 v0.4,作为将其从批处理更改为流处理的第一步。有趣的是,我突然遇到了一些奇怪的行为,我无法确定顶点是否按预期工作。试图了解正在发生的事情,但我找不到关于如何窥视每个顶点的内部运作的选项。有没有办法至少从中获取一些错误消息?
为了隔离问题,我试图将其简化为一个非常简单的 "read from list, map it to a map write to map" DAG。但是还是没有成功。
在我这个愚蠢的例子下面,也许我犯了一个非常简单的错误,更有知识的人会马上看到?
发布者:
// every second via executorservice:
final IStreamMap<Long, List<byte[]>> data = jet.getMap("data");
data.set(jet.getHazelcastInstance().getAtomicLong("key").getAndIncrement(), myByteArray);
分析仪:
jet.getList(key.toString()).addAll((List<byte[]>) jet.getMap("data").get(key));
jet.getMap("data").remove(key);
logger.debug("List {} has size: {}", key, jet.getList(key.toString()).size());
final Vertex sourceDataMap = this.newVertex("sourceDataMap", readList(key.toString())).localParallelism(1);
final Vertex parseByteArrayToMap = this.newVertex("parseByteArrayToMap", map(
(byte[] e) -> new AbstractMap.SimpleEntry<>(jet.getHazelcastInstance().getAtomicLong("counter").getAndIncrement(), e)));
final Vertex sinkIntoResultMap = this.newVertex("sinkIntoResultMap", writeMap("result"));
this.edge(between(sourceDataMap, parseByteArrayToMap))
.edge(between(parseByteArrayToMap, sinkIntoResultMap));
听众:
jet.getMap("result").addEntryListener((EntryAddedListener<Long, byte[]>)
(EntryEvent<Long, byte[]> entryEvent)
-> logger.debug("Got result: {} at {}",entryEvent.getValue().length, System.currentTimeMillis())
, true);
数据生成工作正常,一切正常,直到 DAG 应该接管...但没有错误消息或来自 DAG 的任何内容。有什么建议吗?
这是你的代码,经过稍微清理后可以在我这边工作:
public class Main {
public static void main(String[] args) throws Exception {
JetInstance jet = Jet.newJetInstance();
try {
HazelcastInstance hz = jet.getHazelcastInstance();
ILogger logger = hz.getLoggingService().getLogger("a");
// every second via executorservice:
final IStreamMap<Long, List<byte[]>> data = jet.getMap("data");
List<byte[]> myByteArray = asList(new byte[1], new byte[2]);
IAtomicLong keyGen = hz.getAtomicLong("key");
Long key = keyGen.getAndIncrement();
data.set(key, myByteArray);
String stringKey = key.toString();
hz.getList(stringKey).addAll((List<byte[]>) jet.getMap("data").get(key));
jet.getMap("data").remove(key);
logger.severe(String.format("List %s has size: %d", key, jet.getList(stringKey).size()));
hz.getMap("result").addEntryListener((EntryAddedListener<Long, byte[]>)
(EntryEvent<Long, byte[]> entryEvent) -> logger.severe(String.format(
"Got result: %d at %d", entryEvent.getValue().length, System.currentTimeMillis())),
true);
DAG dag = new DAG();
Vertex sourceDataMap = dag.newVertex("sourceDataMap", readList(stringKey)).localParallelism(1);
Vertex parseByteArrayToMap = dag.newVertex("parseByteArrayToMap", map(
(byte[] e) -> entry(randomUUID(), e)));
Vertex sinkIntoResultMap = dag.newVertex("sinkIntoResultMap", writeMap("result"));
dag.edge(between(sourceDataMap, parseByteArrayToMap))
.edge(between(parseByteArrayToMap, sinkIntoResultMap));
jet.newJob(dag).execute().get();
Thread.sleep(1000);
} finally {
Jet.shutdownAll();
}
}
}
在控制台中我看到:
SEVERE: [192.168.5.12]:5701 [jet] [0.4-SNAPSHOT] [3.8.2] List 0 has size: 2
SEVERE: [192.168.5.12]:5701 [jet] [0.4-SNAPSHOT] [3.8.2] Got result: 2 at 1498822322228
SEVERE: [192.168.5.12]:5701 [jet] [0.4-SNAPSHOT] [3.8.2] Got result: 1 at 1498822322228