Apache Ignite 不会将正在执行的闭包分发到计算网格中新生成的节点

Apache Ignite does not distribute an executing closure to a newly spawned node in the compute grid

Apache Ignite 版本为:2.1.0

我正在使用客户端和服务器的默认配置。以下是客户端配置。服务器配置没有"clientMode"属性。

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:util="http://www.springframework.org/schema/util"
       xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/util
        http://www.springframework.org/schema/util/spring-util.xsd">
    <bean abstract="true" id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
        <!-- Set to true to enable distributed class loading for examples, default is false. -->
        <property name="peerClassLoadingEnabled" value="true"/>
        <property name="clientMode" value="true"/>

        <!-- Enable task execution events for examples. -->
        <property name="includeEventTypes">
            <list>
                <!--Task execution events-->
                <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_STARTED"/>
                <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FINISHED"/>
                <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FAILED"/>
                <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT"/>
                <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_SESSION_ATTR_SET"/>
                <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_REDUCED"/>

                <!--Cache events -->
                <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/>
                <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ"/>
                <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/>

            </list>
        </property>

        <!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
        <property name="discoverySpi">
            <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
                <property name="ipFinder">
                    <!--
                        Ignite provides several options for automatic discovery that can be used
                        instead os static IP based discovery. For information on all options refer
                        to our documentation: http://apacheignite.readme.io/docs/cluster-config
                    -->
                    <!-- Uncomment static IP finder to enable static-based discovery of initial nodes. -->
                    <!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">-->
                    <!-- <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">  -->
                    <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
                        <property name="addresses">
                            <list>
                                <!-- In distributed environment, replace with actual host IP address. -->
                                <value>xxx.1y4.1zz.91:47500..47509</value>
                                <value>xxx.1y4.1zz.92:47500..47509</value>
                            </list>
                        </property>
                    </bean>
                </property>
            </bean>
        </property>
    </bean>
</beans>

闭包按预期在网格中的服务器节点上执行。

当我们在执行闭包的过程中通过执行以下命令向网格添加新节点时

.\ignite.bat ..\examples\config\example-ignite.xml

现有节点确认在网格中添加了新节点,但闭包没有分发给新添加的节点。

是否有任何配置可用于对在执行闭包期间添加的节点执行闭包?

编辑 1:

下面是 IgniteClosure 的实现 class:

public class SimpleInterestClosure implements IgniteClosure<SimpleInterestParam, AccruedSimpleInterest> {

private static final long serialVersionUID = -5542687183747797356L;

private static final BigInteger HUNDRED = new BigInteger("100".getBytes());

private static Logger log = Logger.getLogger("SimpleInterestClosure");

@Override
public AccruedSimpleInterest apply(SimpleInterestParam e) {
    BigInteger si = e.getPrincipal().multiply(new BigInteger(e.getDurationInYears().toString().getBytes())).
            multiply(new BigInteger(e.getInterestRate().toString().getBytes())).divide(SimpleInterestClosure.HUNDRED);
    log.info("Calculated SI for id=" + e.getId());
    return  new AccruedSimpleInterest(e, si);
}

}

编辑 2:

下面是调用 IgniteClosure 实现的方法

public void method() throws IgniteException, IOException {

    Factory<SimpleInterestClosure> siClosureFactory = FactoryBuilder.singletonfactoryOf( new SimpleInterestClosure());

    ClassPathResource ress = new ClassPathResource("example-ignite.xml");
    File file = new File(ress.getPath());

    try (Ignite ignite = Ignition.start(file.getPath())) {
        log.info("Started Ignite Cluster");
        IgniteFuture<Collection<AccruedSimpleInterest>> igniteFuture = ignite.compute()
                .applyAsync(siClosureFactory.create(), createParamCollection());
        Collection<AccruedSimpleInterest> res = igniteFuture.get();
    }

}

这听起来像是在找工作偷窃:http://apacheignite.readme.io/docs/load-balancing#job-stealing

虽然它目前有一个错误,在这种特殊情况下可能是一个问题:http://issues.apache.org/jira/browse/IGNITE-1267