如何让一个简单的 DAG 在 Hazelcast Jet 中工作?

How to get a simple DAG to work in Hazelcast Jet?

在使用 hazelcast Jet 处理我的 DAG 时,我遇到了一个奇怪的问题。为了检查错误,我完全简化了我的方法并且:根据教程,边缘似乎没有工作。

下面的代码几乎是最简单的。两个顶点(一源一汇),一条边。

source是从map中读取,sink应该放入map中。

data.addEntryListener 正确地告诉我地图由另一个应用程序填充了 100 个列表(每个列表有 25 个对象,大小为 400 字节)...然后什么也没有。地图填满了,但 dag 根本没有与之交互。

知道在哪里寻找问题吗?

package be.andersch.clusterbench;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.hazelcast.config.Config;
import com.hazelcast.config.SerializerConfig;
import com.hazelcast.core.EntryEvent;
import com.hazelcast.jet.*;
import com.hazelcast.jet.config.JetConfig;
import com.hazelcast.jet.stream.IStreamMap;
import com.hazelcast.map.listener.EntryAddedListener;
import be.andersch.anotherpackage.myObject;

import java.util.List;
import java.util.concurrent.ExecutionException;

import static com.hazelcast.jet.Edge.between;
import static com.hazelcast.jet.Processors.*;

/**
 * Created by abernard on 24.03.2017.
 */
public class Analyzer {
    private static final ObjectMapper mapper = new ObjectMapper();
    private static JetInstance jet;
    private static final IStreamMap<Long, List<String>> data;
    private static final IStreamMap<Long, List<String>> testmap;

    static {
        JetConfig config = new JetConfig();
        Config hazelConfig = config.getHazelcastConfig();
        hazelConfig.getGroupConfig().setName( "name" ).setPassword( "password" );
        hazelConfig.getNetworkConfig().getInterfaces().setEnabled( true ).addInterface( "my_IP_range_here" );
        hazelConfig.getSerializationConfig().getSerializerConfigs().add(
                new SerializerConfig().
                        setTypeClass(myObject.class).
                        setImplementation(new OsamKryoSerializer()));
        jet = Jet.newJetInstance(config);
        data = jet.getMap("data");
        testmap = jet.getMap("testmap");
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        DAG dag = new DAG();
        Vertex source = dag.newVertex("source", readMap("data"));
        Vertex test = dag.newVertex("test", writeMap("testmap"));

        dag.edge(between(source, test));

        jet.newJob(dag).execute()get();

        data.addEntryListener((EntryAddedListener<Long, List<String>>) (EntryEvent<Long, List<String>> entryEvent) -> {
            System.out.println("Got data: " + entryEvent.getKey() + " at " + System.currentTimeMillis() + ", Size: " + jet.getHazelcastInstance().getMap("data").size());
        }, true);

        testmap.addEntryListener((EntryAddedListener<Long, List<String>>) (EntryEvent<Long, List<String>> entryEvent) -> {
            System.out.println("Got test: " + entryEvent.getKey() + " at " + System.currentTimeMillis());
        }, true);

        Runtime.getRuntime().addShutdownHook(new Thread(() -> Jet.shutdownAll()));
    }
}

Jet 作业已经在第 jet.newJob(dag).execute().get() 行完成,甚至在您创建入口侦听器之前。这意味着作业在空地图上运行。也许您对这项工作的性质感到困惑:它是一项 批处理 工作,而不是无限流处理工作。 Jet 0.3 版本尚不支持无限流处理。