来自 kafka 主题的 java bean 的 Storm Kafkaspout KryoSerialization 问题

Storm Kafkaspout KryoSerialization issue for java bean from kafka topic

大家好,我是 Storm 和 Kafka 的新手。 我正在使用风暴 1.0.1 和卡夫卡 0.10.0 我们有一个 kafkaspout 可以从 kafka 主题接收 java bean。 我花了几个小时来挖掘以找到正确的方法。 找到了一些有用的文章,但到目前为止 none 的方法对我有用。

以下是我的代码:

风暴拓扑:

public class StormTopology {

public static void main(String[] args) throws Exception {
    //Topo test /zkroot test
    if (args.length == 4) {
        System.out.println("started");
        BrokerHosts hosts = new ZkHosts("localhost:2181");

        SpoutConfig kafkaConf1 = new SpoutConfig(hosts, args[1], args[2],
                args[3]);

        kafkaConf1.zkRoot = args[2];
        kafkaConf1.useStartOffsetTimeIfOffsetOutOfRange = true;
        kafkaConf1.startOffsetTime = kafka.api.OffsetRequest.LatestTime();
        kafkaConf1.scheme = new SchemeAsMultiScheme(new KryoScheme());
        KafkaSpout kafkaSpout1 = new KafkaSpout(kafkaConf1);

        System.out.println("started");

        ShuffleBolt shuffleBolt = new ShuffleBolt(args[1]);
        AnalysisBolt analysisBolt = new AnalysisBolt(args[1]);
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.setSpout("kafkaspout", kafkaSpout1, 1);
        //builder.setBolt("counterbolt2", countbolt2, 3).shuffleGrouping("kafkaspout");
        //This is for field grouping in bolt we need two bolt for field grouping or it wont work
        topologyBuilder.setBolt("shuffleBolt", shuffleBolt, 3).shuffleGrouping("kafkaspout");
        topologyBuilder.setBolt("analysisBolt", analysisBolt, 5).fieldsGrouping("shuffleBolt", new Fields("trip"));
        Config config = new Config();
        config.registerSerialization(VehicleTrip.class, VehicleTripKyroSerializer.class);
        config.setDebug(true);
        config.setNumWorkers(1);

        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology(args[0], config, topologyBuilder.createTopology());

        // StormSubmitter.submitTopology(args[0], config,
        // builder.createTopology());

    } else {
        System.out
                .println("Insufficent Arguements - topologyName kafkaTopic ZKRoot ID");
    }
}

}

我正在使用 kryo 在 kafka 序列化数据

卡夫卡生产者:

public class StreamKafkaProducer {

private static Producer producer;
private final Properties props = new Properties();
private static final StreamKafkaProducer KAFKA_PRODUCER = new StreamKafkaProducer();

private StreamKafkaProducer(){
    props.put("bootstrap.servers", "localhost:9092");
    props.put("acks", "all");
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "com.abc.serializer.MySerializer");
    producer = new org.apache.kafka.clients.producer.KafkaProducer(props);
}

public static StreamKafkaProducer getStreamKafkaProducer(){
    return KAFKA_PRODUCER;
}

public void produce(String topic, VehicleTrip vehicleTrip){
    ProducerRecord<String,VehicleTrip> producerRecord = new ProducerRecord<>(topic,vehicleTrip);
    producer.send(producerRecord);
    //producer.close();
}

public static void closeProducer(){
    producer.close();
}

}

Kyro 序列化程序:

public class DataKyroSerializer extends Serializer<Data> implements Serializable {
@Override
public void write(Kryo kryo, Output output, VehicleTrip vehicleTrip) {
    output.writeLong(data.getStartedOn().getTime());
    output.writeLong(data.getEndedOn().getTime());
}

@Override
public Data read(Kryo kryo, Input input, Class<VehicleTrip> aClass) {
    Data data = new Data();
    data.setStartedOn(new Date(input.readLong()));
    data.setEndedOn(new Date(input.readLong()));
    return data;
}

我需要将数据取回数据 bean。

根据几篇文章,我需要提供自定义方案并将其作为拓扑的一部分,但直到现在我还没有运气

螺栓和方案代码

方案:

public class KryoScheme implements Scheme {

    private ThreadLocal<Kryo> kryos = new ThreadLocal<Kryo>() {
        protected Kryo initialValue() {
            Kryo kryo = new Kryo();
            kryo.addDefaultSerializer(Data.class, new DataKyroSerializer());
            return kryo;
        };
    };

    @Override
    public List<Object> deserialize(ByteBuffer ser) {
        return Utils.tuple(kryos.get().readObject(new ByteBufferInput(ser.array()), Data.class));
    }

    @Override
    public Fields getOutputFields( ) {
        return new Fields( "data" );
    }

}

和螺栓:

public class AnalysisBolt implements IBasicBolt {
/**
 *
 */
private static final long serialVersionUID = 1L;
private String topicname = null;

public AnalysisBolt(String topicname) {
    this.topicname = topicname;
}

public void prepare(Map stormConf, TopologyContext topologyContext) {
    System.out.println("prepare");
}

public void execute(Tuple input, BasicOutputCollector collector) {
    System.out.println("execute");

    Fields fields = input.getFields();
    try {   

        JSONObject eventJson = (JSONObject) JSONSerializer.toJSON((String) input
                .getValueByField(fields.get(1)));
        String StartTime = (String) eventJson.get("startedOn");
        String EndTime = (String) eventJson.get("endedOn");
        String Oid = (String) eventJson.get("_id");
        int V_id =  (Integer) eventJson.get("vehicleId");
        //call method getEventForVehicleWithinTime(Long vehicleId, Date startTime, Date endTime)

        System.out.println("==========="+Oid+"| "+V_id+"| "+StartTime+"| "+EndTime);

} catch (Exception e) {
    e.printStackTrace();

}

}

但如果我提交风暴拓扑,我会收到错误消息:

java.lang.IllegalStateException: Spout 'kafkaspout' contains a
non-serializable field of type com.abc.topology.KryoScheme, which
was instantiated prior to topology creation.
com.minda.iconnect.topology.KryoScheme should be instantiated within
the prepare method of 'kafkaspout at the earliest.

感谢帮助调试问题并指导正确的路径。

谢谢

在 Storm 生命周期内,拓扑被实例化,然后序列化为字节格式以存储在 ZooKeeper 中,然后再执行拓扑。在此步骤中,如果拓扑中的 spout 或 bolt 具有初始化的不可序列化 属性,序列化将失败。

如果需要一个不可序列化的字段,在 bolt 或 spout 的 prepare 方法中对其进行初始化,即 运行 在将拓扑交付给 worker 之后。

来源: Best Practices for implementing Apache Storm

您的 ThreadLocal 不可序列化。更好的解决方案是使您的序列化程序既可序列化又是线程安全的。如果这不可能,那么我会看到 2 个备选方案,因为没有 prepare 方法,就像您在螺栓中获得的那样。

  1. 将其声明为静态的,本质上是瞬态的。
  2. 将其声明为 transient 并通过私有 get 方法访问它。然后你可以在第一次访问时初始化变量。