在 Java Spring 应用程序中使用 Apache Storm 读取 Kafka 消息导致 NotSerializeableException,为什么?
Reading Kafka messages with Apache Storm in a Java Spring application causing NotSerializeableException, why?
我是 Apache Storm 的新手,正在尝试尝试。
现在我只想记录或打印作为 ProtoBuf 对象字节数组接收的传入 Kafka 消息。
我需要在 Java Spring 应用程序中执行此操作。
我正在使用 Kafka 0.11.0.2
我正在使用 Storm 1.1.2,并且在我的 pom 中有 storm-core、storm-kafka 和 storm-starters。
主要服务class示例
//annotations for spring
public class MyService{
public static void main(String[] args){
SpringApplication.run(MyService.class, args);
}
@PostConstruct
public void postConstruct() throws Exception {
SpoutConfig spoutConfig = new SpoutConfig(new ZKHosts("localhost:9092"), "topic", "/topic", "storm-spout");
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("storm-spout", kafkaSpout);
builder.setBolt("printer", new PrinterBolt())
.shuffleGrouping("storm-spout");
Config config = new Config();
config.setDebug(true);
config.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("kafka", config, builder.createTopology());
Thread.sleep(30000);
cluster.shutdown();
}
private class PrinterBolt extends BaseBasicBolt {
@Override
public void execute(Tuple input, BasicOutputCollector){
System.out.println("\n\n INPUT: "+input.toString()+"\n\n");
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer){}
}
}
我用一个 Dockerfile 从中构建了一个 docker 图像,我知道它可以在我的环境中用于其他 spring 应用程序并且 运行 它在容器中抛出异常并挂起.
例外是java.io.NotSerializeableException
我看到了 Caused by java.lang.IllegalStateException: Bolt 'printer' contains a non-seriablizeable field of type my.package.MyService$$EnhancerBySpringCGLIB$6afb49, which was instantiated prior to topology creation. my.package.MyService$$EnhancerBySpringCLGIB$6afb49 should be instantiated within the prepare method of 'printer at the earliest.
我想这可能是因为 storm 正在尝试序列化传入的字节数组但未能成功,但我不确定如何补救,而且我还没有看到很多人尝试这样做。
要么在新文件中声明 PrinterBolt,要么使 class 静态化。您 运行 遇到的问题是 PrinterBolt 是 MyService 的非静态内部 class,这意味着它包含对外部 MyService class 的引用。由于 MyService 不可序列化,因此 PrinterBolt 也不可序列化。 Storm 要求螺栓是可序列化的。
同样与您看到的错误无关,您可能需要考虑使用 storm-kafka-client 而不是 storm-kafka,因为后者已被弃用。
我是 Apache Storm 的新手,正在尝试尝试。
现在我只想记录或打印作为 ProtoBuf 对象字节数组接收的传入 Kafka 消息。
我需要在 Java Spring 应用程序中执行此操作。
我正在使用 Kafka 0.11.0.2
我正在使用 Storm 1.1.2,并且在我的 pom 中有 storm-core、storm-kafka 和 storm-starters。
主要服务class示例
//annotations for spring
public class MyService{
public static void main(String[] args){
SpringApplication.run(MyService.class, args);
}
@PostConstruct
public void postConstruct() throws Exception {
SpoutConfig spoutConfig = new SpoutConfig(new ZKHosts("localhost:9092"), "topic", "/topic", "storm-spout");
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("storm-spout", kafkaSpout);
builder.setBolt("printer", new PrinterBolt())
.shuffleGrouping("storm-spout");
Config config = new Config();
config.setDebug(true);
config.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("kafka", config, builder.createTopology());
Thread.sleep(30000);
cluster.shutdown();
}
private class PrinterBolt extends BaseBasicBolt {
@Override
public void execute(Tuple input, BasicOutputCollector){
System.out.println("\n\n INPUT: "+input.toString()+"\n\n");
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer){}
}
}
我用一个 Dockerfile 从中构建了一个 docker 图像,我知道它可以在我的环境中用于其他 spring 应用程序并且 运行 它在容器中抛出异常并挂起.
例外是java.io.NotSerializeableException
我看到了 Caused by java.lang.IllegalStateException: Bolt 'printer' contains a non-seriablizeable field of type my.package.MyService$$EnhancerBySpringCGLIB$6afb49, which was instantiated prior to topology creation. my.package.MyService$$EnhancerBySpringCLGIB$6afb49 should be instantiated within the prepare method of 'printer at the earliest.
我想这可能是因为 storm 正在尝试序列化传入的字节数组但未能成功,但我不确定如何补救,而且我还没有看到很多人尝试这样做。
要么在新文件中声明 PrinterBolt,要么使 class 静态化。您 运行 遇到的问题是 PrinterBolt 是 MyService 的非静态内部 class,这意味着它包含对外部 MyService class 的引用。由于 MyService 不可序列化,因此 PrinterBolt 也不可序列化。 Storm 要求螺栓是可序列化的。
同样与您看到的错误无关,您可能需要考虑使用 storm-kafka-client 而不是 storm-kafka,因为后者已被弃用。