在 Java Spring 应用程序中使用 Apache Storm 读取 Kafka 消息导致 NotSerializeableException,为什么?

Reading Kafka messages with Apache Storm in a Java Spring application causing NotSerializeableException, why?

我是 Apache Storm 的新手,正在尝试尝试。

现在我只想记录或打印作为 ProtoBuf 对象字节数组接收的传入 Kafka 消息。

我需要在 Java Spring 应用程序中执行此操作。

我正在使用 Kafka 0.11.0.2

我正在使用 Storm 1.1.2,并且在我的 pom 中有 storm-core、storm-kafka 和 storm-starters。

主要服务class示例

//annotations for spring
public class MyService{

    public static void main(String[] args){
        SpringApplication.run(MyService.class, args);
    }

    @PostConstruct
    public void postConstruct() throws Exception {
        SpoutConfig spoutConfig = new SpoutConfig(new ZKHosts("localhost:9092"), "topic", "/topic", "storm-spout");

        KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("storm-spout", kafkaSpout);
        builder.setBolt("printer", new PrinterBolt())
            .shuffleGrouping("storm-spout");

        Config config = new Config();
        config.setDebug(true);
        config.setMaxTaskParallelism(3);
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("kafka", config, builder.createTopology());

        Thread.sleep(30000);
        cluster.shutdown();
    }

    private class PrinterBolt extends BaseBasicBolt {

        @Override
        public void execute(Tuple input, BasicOutputCollector){
            System.out.println("\n\n INPUT: "+input.toString()+"\n\n");
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer){}
    }

}

我用一个 Dockerfile 从中构建了一个 docker 图像,我知道它可以在我的环境中用于其他 spring 应用程序并且 运行 它在容器中抛出异常并挂起.

例外是java.io.NotSerializeableException

我看到了 Caused by java.lang.IllegalStateException: Bolt 'printer' contains a non-seriablizeable field of type my.package.MyService$$EnhancerBySpringCGLIB$6afb49, which was instantiated prior to topology creation. my.package.MyService$$EnhancerBySpringCLGIB$6afb49 should be instantiated within the prepare method of 'printer at the earliest.

我想这可能是因为 storm 正在尝试序列化传入的字节数组但未能成功,但我不确定如何补救,而且我还没有看到很多人尝试这样做。

我用这个作为参考。 https://github.com/thehydroimpulse/storm-kafka-starter/blob/master/src/jvm/storm/starter/KafkaTopology.java

要么在新文件中声明 PrinterBolt,要么使 class 静态化。您 运行 遇到的问题是 PrinterBolt 是 MyService 的非静态内部 class,这意味着它包含对外部 MyService class 的引用。由于 MyService 不可序列化,因此 PrinterBolt 也不可序列化。 Storm 要求螺栓是可序列化的。

同样与您看到的错误无关,您可能需要考虑使用 storm-kafka-client 而不是 storm-kafka,因为后者已被弃用。