如何使用 Apache Apex 对 Kafka 0.9 运算符进行单元测试?

How to unit test Kafka 0.9 operator with Apache Apex?

用户转发@apex.incubator.apache.org

我想 运行 使用支持 0.9 版本协议的新 Kafka Operator 的单元测试代码。

在此过程中,我包含了 Malhar-Kafka 库版本 ( 3.3.1-incubating ) 并使用 Apex 引擎 ( 版本 3.3.0 ) 作为 test/provided。

编译工作正常,但我的单元测试无法正确 运行,出现“java.lang.ClassNotFoundException: com.datatorrent.lib.util.KryoCloneUtils”异常。

运行使用与 Apex 引擎集成的 Kafka 0.9 运算符的单元测试的推荐方法是什么?我假设 Malhar-contrib 库 Kafka 运算符不符合 0.9 ..

单元测试代码是这样的:

class CassandraEventDetailsS​​treamingApp 在下面的代码片段中扩展了 AbstractKafkaInputOperator。

异常出现在方法lma.getController();

@Test
public void testApplication() throws IOException, Exception {
    try {
        LocalMode lma = LocalMode.newInstance();
        Configuration conf = new Configuration(false);
        conf.addResource(this.getClass().getResourceAsStream("/dag-test-props.xml"));
        lma.prepareDAG(new CassandraEventDetailsStreamingApp(), conf);
        LocalMode.Controller lc = lma.getController();
        lc.run();
    } catch (ConstraintViolationException e) {
        Assert.fail("constraint violations: " + e.getConstraintViolations());
    }
}

我能够通过从 Apex-engine apex-api 的依赖部分中排除 Malhar-library 和 Malhar-contrib 的依赖来解决问题。

这使得 Malhar 的 3.3.1 孵化版本进入了类路径,随后是带有 3.3.1 孵化版本的 Malhar-Kafka 库)。