使用 Hazelcast Jet 可以下沉到 java 列表吗?

Sink to java list possible with Hazelcast Jet?

我有一个账户列表,并对报价执行散列连接,return 具有报价数据的账户。但是在 hashjoin 之后我有 drainTo lListJet 然后用 DistributedStream 和 return 读取它。

public List<Account> populateTicksInAccounts(List<Account> accounts) {
    ...
    ...
    Pipeline p = Pipeline.create();
    BatchSource<Tick> ticksSource = Sources.list(TICKS_LIST_NAME);
    BatchSource<Account> accountSource = Sources.fromProcessor(AccountProcessor.of(accounts));

    p.drawFrom(ticksSource)
        .hashJoin(p.drawFrom(accountSource), JoinClause.joinMapEntries(Tick::getTicker), accountMapper())
        .drainTo(Sinks.list(TEMP_LIST));

    jet.newJob(p).join();
    IListJet<Account> list = jet.getList(TEMP_LIST);
    return DistributedStream.fromList(list).collect(DistributedCollectors.toIList());
}

是否可以在执行 hashjoin 后 drainTo 到 java List 而不是 lListJet

像下面这样的东西是可能的吗?

IListJet<Account> accountWithTicks = new ArrayList<>();
p.drawFrom(ticksSource)
    .hashJoin(p.drawFrom(accountSource), JoinClause.joinMapEntries(Tick::getTicker), accountMapper())
    .drainTo(<CustomSinkProcessor(accountWithTicks)>);
return accountWithTicks;

CustomSinkProcessor 中的什么地方会使用空 java 列表和 return 帐户?

不,不是,因为 Jet 的分布式特性。 sink 将在多个并行处理器(worker)中执行。它不能添加到普通 Collection。接收器必须能够在多个集群成员上插入项目。

请记住,您提交给 Jet 执行的代码 运行 在您提交它的进程之外。虽然理论上可以提供您所要求的 API,但在幕后,它只需要对 运行 集群每个成员的代码执行一些技巧,让所有成员发送把自己的成绩归到一个地方,然后填一个清单给return给你。这将违背分布式计算的本质。

如果您认为这有助于提高代码的可读性,您可以编写如下辅助方法:

public <T, R> List<R> drainToList(GeneralStage<T> stage) {
    String tmpListName = randomListName();
    SinkStage sinkStage = stage.drainTo(Sinks.list(tmpListName));
    IListJet<R> tmpList = jet.getList(tmpListName);
    try {
        jet.newJob(sinkStage.getPipeline()).join();
        return new ArrayList<>(tmpList);
    } finally {
        tmpList.destroy();
    }
}

特别注意那一行

return new ArrayList<>(tmpList);

相对于你的

IListJet<Account> list = jet.getList(TEMP_LIST);
return DistributedStream.fromList(list).collect(DistributedCollectors.toIList());

这只是将一个 Hazelcast 列表复制到另一个列表,并且 return 是它的一个句柄。现在您已经泄露了 Jet 集群中的两个列表。当您停止使用它们时,它们不会自动消失。

即使我提供的代码仍然有漏洞。 运行 的 JVM 进程可能会在 Job.join() 期间死亡而不会到达 finally。那么这个临时列表就一直存在。