来自 kafka 主题的 java bean 的 Storm Kafkaspout KryoSerialization 问题
Storm Kafkaspout KryoSerialization issue for java bean from kafka topic
大家好,我是 Storm 和 Kafka 的新手。
我正在使用风暴 1.0.1 和卡夫卡 0.10.0
我们有一个 kafkaspout 可以从 kafka 主题接收 java bean。
我花了几个小时来挖掘以找到正确的方法。
找到了一些有用的文章,但到目前为止 none 的方法对我有用。
以下是我的代码:
风暴拓扑:
public class StormTopology {
public static void main(String[] args) throws Exception {
//Topo test /zkroot test
if (args.length == 4) {
System.out.println("started");
BrokerHosts hosts = new ZkHosts("localhost:2181");
SpoutConfig kafkaConf1 = new SpoutConfig(hosts, args[1], args[2],
args[3]);
kafkaConf1.zkRoot = args[2];
kafkaConf1.useStartOffsetTimeIfOffsetOutOfRange = true;
kafkaConf1.startOffsetTime = kafka.api.OffsetRequest.LatestTime();
kafkaConf1.scheme = new SchemeAsMultiScheme(new KryoScheme());
KafkaSpout kafkaSpout1 = new KafkaSpout(kafkaConf1);
System.out.println("started");
ShuffleBolt shuffleBolt = new ShuffleBolt(args[1]);
AnalysisBolt analysisBolt = new AnalysisBolt(args[1]);
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout("kafkaspout", kafkaSpout1, 1);
//builder.setBolt("counterbolt2", countbolt2, 3).shuffleGrouping("kafkaspout");
//This is for field grouping in bolt we need two bolt for field grouping or it wont work
topologyBuilder.setBolt("shuffleBolt", shuffleBolt, 3).shuffleGrouping("kafkaspout");
topologyBuilder.setBolt("analysisBolt", analysisBolt, 5).fieldsGrouping("shuffleBolt", new Fields("trip"));
Config config = new Config();
config.registerSerialization(VehicleTrip.class, VehicleTripKyroSerializer.class);
config.setDebug(true);
config.setNumWorkers(1);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(args[0], config, topologyBuilder.createTopology());
// StormSubmitter.submitTopology(args[0], config,
// builder.createTopology());
} else {
System.out
.println("Insufficent Arguements - topologyName kafkaTopic ZKRoot ID");
}
}
}
我正在使用 kryo 在 kafka 序列化数据
卡夫卡生产者:
public class StreamKafkaProducer {
private static Producer producer;
private final Properties props = new Properties();
private static final StreamKafkaProducer KAFKA_PRODUCER = new StreamKafkaProducer();
private StreamKafkaProducer(){
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "com.abc.serializer.MySerializer");
producer = new org.apache.kafka.clients.producer.KafkaProducer(props);
}
public static StreamKafkaProducer getStreamKafkaProducer(){
return KAFKA_PRODUCER;
}
public void produce(String topic, VehicleTrip vehicleTrip){
ProducerRecord<String,VehicleTrip> producerRecord = new ProducerRecord<>(topic,vehicleTrip);
producer.send(producerRecord);
//producer.close();
}
public static void closeProducer(){
producer.close();
}
}
Kyro 序列化程序:
public class DataKyroSerializer extends Serializer<Data> implements Serializable {
@Override
public void write(Kryo kryo, Output output, VehicleTrip vehicleTrip) {
output.writeLong(data.getStartedOn().getTime());
output.writeLong(data.getEndedOn().getTime());
}
@Override
public Data read(Kryo kryo, Input input, Class<VehicleTrip> aClass) {
Data data = new Data();
data.setStartedOn(new Date(input.readLong()));
data.setEndedOn(new Date(input.readLong()));
return data;
}
我需要将数据取回数据 bean。
根据几篇文章,我需要提供自定义方案并将其作为拓扑的一部分,但直到现在我还没有运气
螺栓和方案代码
方案:
public class KryoScheme implements Scheme {
private ThreadLocal<Kryo> kryos = new ThreadLocal<Kryo>() {
protected Kryo initialValue() {
Kryo kryo = new Kryo();
kryo.addDefaultSerializer(Data.class, new DataKyroSerializer());
return kryo;
};
};
@Override
public List<Object> deserialize(ByteBuffer ser) {
return Utils.tuple(kryos.get().readObject(new ByteBufferInput(ser.array()), Data.class));
}
@Override
public Fields getOutputFields( ) {
return new Fields( "data" );
}
}
和螺栓:
public class AnalysisBolt implements IBasicBolt {
/**
*
*/
private static final long serialVersionUID = 1L;
private String topicname = null;
public AnalysisBolt(String topicname) {
this.topicname = topicname;
}
public void prepare(Map stormConf, TopologyContext topologyContext) {
System.out.println("prepare");
}
public void execute(Tuple input, BasicOutputCollector collector) {
System.out.println("execute");
Fields fields = input.getFields();
try {
JSONObject eventJson = (JSONObject) JSONSerializer.toJSON((String) input
.getValueByField(fields.get(1)));
String StartTime = (String) eventJson.get("startedOn");
String EndTime = (String) eventJson.get("endedOn");
String Oid = (String) eventJson.get("_id");
int V_id = (Integer) eventJson.get("vehicleId");
//call method getEventForVehicleWithinTime(Long vehicleId, Date startTime, Date endTime)
System.out.println("==========="+Oid+"| "+V_id+"| "+StartTime+"| "+EndTime);
} catch (Exception e) {
e.printStackTrace();
}
}
但如果我提交风暴拓扑,我会收到错误消息:
java.lang.IllegalStateException: Spout 'kafkaspout' contains a
non-serializable field of type com.abc.topology.KryoScheme, which
was instantiated prior to topology creation.
com.minda.iconnect.topology.KryoScheme should be instantiated within
the prepare method of 'kafkaspout at the earliest.
感谢帮助调试问题并指导正确的路径。
谢谢
在 Storm 生命周期内,拓扑被实例化,然后序列化为字节格式以存储在 ZooKeeper 中,然后再执行拓扑。在此步骤中,如果拓扑中的 spout 或 bolt 具有初始化的不可序列化 属性,序列化将失败。
如果需要一个不可序列化的字段,在 bolt 或 spout 的 prepare 方法中对其进行初始化,即 运行 在将拓扑交付给 worker 之后。
您的 ThreadLocal 不可序列化。更好的解决方案是使您的序列化程序既可序列化又是线程安全的。如果这不可能,那么我会看到 2 个备选方案,因为没有 prepare 方法,就像您在螺栓中获得的那样。
- 将其声明为静态的,本质上是瞬态的。
- 将其声明为 transient 并通过私有 get 方法访问它。然后你可以在第一次访问时初始化变量。
大家好,我是 Storm 和 Kafka 的新手。 我正在使用风暴 1.0.1 和卡夫卡 0.10.0 我们有一个 kafkaspout 可以从 kafka 主题接收 java bean。 我花了几个小时来挖掘以找到正确的方法。 找到了一些有用的文章,但到目前为止 none 的方法对我有用。
以下是我的代码:
风暴拓扑:
public class StormTopology {
public static void main(String[] args) throws Exception {
//Topo test /zkroot test
if (args.length == 4) {
System.out.println("started");
BrokerHosts hosts = new ZkHosts("localhost:2181");
SpoutConfig kafkaConf1 = new SpoutConfig(hosts, args[1], args[2],
args[3]);
kafkaConf1.zkRoot = args[2];
kafkaConf1.useStartOffsetTimeIfOffsetOutOfRange = true;
kafkaConf1.startOffsetTime = kafka.api.OffsetRequest.LatestTime();
kafkaConf1.scheme = new SchemeAsMultiScheme(new KryoScheme());
KafkaSpout kafkaSpout1 = new KafkaSpout(kafkaConf1);
System.out.println("started");
ShuffleBolt shuffleBolt = new ShuffleBolt(args[1]);
AnalysisBolt analysisBolt = new AnalysisBolt(args[1]);
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout("kafkaspout", kafkaSpout1, 1);
//builder.setBolt("counterbolt2", countbolt2, 3).shuffleGrouping("kafkaspout");
//This is for field grouping in bolt we need two bolt for field grouping or it wont work
topologyBuilder.setBolt("shuffleBolt", shuffleBolt, 3).shuffleGrouping("kafkaspout");
topologyBuilder.setBolt("analysisBolt", analysisBolt, 5).fieldsGrouping("shuffleBolt", new Fields("trip"));
Config config = new Config();
config.registerSerialization(VehicleTrip.class, VehicleTripKyroSerializer.class);
config.setDebug(true);
config.setNumWorkers(1);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(args[0], config, topologyBuilder.createTopology());
// StormSubmitter.submitTopology(args[0], config,
// builder.createTopology());
} else {
System.out
.println("Insufficent Arguements - topologyName kafkaTopic ZKRoot ID");
}
}
}
我正在使用 kryo 在 kafka 序列化数据
卡夫卡生产者:
public class StreamKafkaProducer {
private static Producer producer;
private final Properties props = new Properties();
private static final StreamKafkaProducer KAFKA_PRODUCER = new StreamKafkaProducer();
private StreamKafkaProducer(){
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "com.abc.serializer.MySerializer");
producer = new org.apache.kafka.clients.producer.KafkaProducer(props);
}
public static StreamKafkaProducer getStreamKafkaProducer(){
return KAFKA_PRODUCER;
}
public void produce(String topic, VehicleTrip vehicleTrip){
ProducerRecord<String,VehicleTrip> producerRecord = new ProducerRecord<>(topic,vehicleTrip);
producer.send(producerRecord);
//producer.close();
}
public static void closeProducer(){
producer.close();
}
}
Kyro 序列化程序:
public class DataKyroSerializer extends Serializer<Data> implements Serializable {
@Override
public void write(Kryo kryo, Output output, VehicleTrip vehicleTrip) {
output.writeLong(data.getStartedOn().getTime());
output.writeLong(data.getEndedOn().getTime());
}
@Override
public Data read(Kryo kryo, Input input, Class<VehicleTrip> aClass) {
Data data = new Data();
data.setStartedOn(new Date(input.readLong()));
data.setEndedOn(new Date(input.readLong()));
return data;
}
我需要将数据取回数据 bean。
根据几篇文章,我需要提供自定义方案并将其作为拓扑的一部分,但直到现在我还没有运气
螺栓和方案代码
方案:
public class KryoScheme implements Scheme {
private ThreadLocal<Kryo> kryos = new ThreadLocal<Kryo>() {
protected Kryo initialValue() {
Kryo kryo = new Kryo();
kryo.addDefaultSerializer(Data.class, new DataKyroSerializer());
return kryo;
};
};
@Override
public List<Object> deserialize(ByteBuffer ser) {
return Utils.tuple(kryos.get().readObject(new ByteBufferInput(ser.array()), Data.class));
}
@Override
public Fields getOutputFields( ) {
return new Fields( "data" );
}
}
和螺栓:
public class AnalysisBolt implements IBasicBolt {
/**
*
*/
private static final long serialVersionUID = 1L;
private String topicname = null;
public AnalysisBolt(String topicname) {
this.topicname = topicname;
}
public void prepare(Map stormConf, TopologyContext topologyContext) {
System.out.println("prepare");
}
public void execute(Tuple input, BasicOutputCollector collector) {
System.out.println("execute");
Fields fields = input.getFields();
try {
JSONObject eventJson = (JSONObject) JSONSerializer.toJSON((String) input
.getValueByField(fields.get(1)));
String StartTime = (String) eventJson.get("startedOn");
String EndTime = (String) eventJson.get("endedOn");
String Oid = (String) eventJson.get("_id");
int V_id = (Integer) eventJson.get("vehicleId");
//call method getEventForVehicleWithinTime(Long vehicleId, Date startTime, Date endTime)
System.out.println("==========="+Oid+"| "+V_id+"| "+StartTime+"| "+EndTime);
} catch (Exception e) {
e.printStackTrace();
}
}
但如果我提交风暴拓扑,我会收到错误消息:
java.lang.IllegalStateException: Spout 'kafkaspout' contains a
non-serializable field of type com.abc.topology.KryoScheme, which
was instantiated prior to topology creation.
com.minda.iconnect.topology.KryoScheme should be instantiated within
the prepare method of 'kafkaspout at the earliest.
感谢帮助调试问题并指导正确的路径。
谢谢
在 Storm 生命周期内,拓扑被实例化,然后序列化为字节格式以存储在 ZooKeeper 中,然后再执行拓扑。在此步骤中,如果拓扑中的 spout 或 bolt 具有初始化的不可序列化 属性,序列化将失败。
如果需要一个不可序列化的字段,在 bolt 或 spout 的 prepare 方法中对其进行初始化,即 运行 在将拓扑交付给 worker 之后。
您的 ThreadLocal 不可序列化。更好的解决方案是使您的序列化程序既可序列化又是线程安全的。如果这不可能,那么我会看到 2 个备选方案,因为没有 prepare 方法,就像您在螺栓中获得的那样。
- 将其声明为静态的,本质上是瞬态的。
- 将其声明为 transient 并通过私有 get 方法访问它。然后你可以在第一次访问时初始化变量。