Flink TypeInfo Java 泛型
Flink TypeInfo Java Generics
我希望我的 Flink 应用使用 Flink org.apache.flink.api.java.tuple.Tuple2
class 将来自 Kafka 主题的数据反序列化为 ConsumerRecord<byte[], byte[]>
的 Flink 流或 Tuple2<byte[], byte[]>
的 Flink 流.
对于这两个选项,我都无法获得可编译的 getProducedType 实现。
public class KafkaNOOPDeserialization implements KafkaDeserializationSchema<ConsumerRecord<byte[], byte[]>> {
@Override
public boolean isEndOfStream(ConsumerRecord<byte[], byte[]> nextElement) {
return false;
}
@Override
public ConsumerRecord<byte[], byte[]> deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
return record;
}
@Override
public TypeInformation<ConsumerRecord<byte[], byte[]>> getProducedType() {
// This provides TypeInformation<ConsumerRecord>
// It needs TypeInformation<ConsumerRecord<byte[], byte[]>>
var typeInfo = TypeExtractor.getForClass(ConsumerRecord.class);
// This hard cast won't compile
return (TypeInformation<ConsumerRecord<byte[], byte[]>>) typeInfo;
}
}
public class KafkaByteArrayTupleDeserializer implements KafkaDeserializationSchema<Tuple2<byte[], byte[]>> {
@Override
public boolean isEndOfStream(Tuple2<byte[], byte[]> nextElement) {
return false;
}
@Override
public Tuple2<byte[], byte[]> deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
return new Tuple2(record.key(), record.value());
}
@Override
public TypeInformation<Tuple2<byte[], byte[]>> getProducedType() {
// This provides TupleTypeInfo<Tuple>
// It needs TupleTypeInfo<Tuple2<byte[], byte[]>>
var typeInfo = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(
byte[].class, byte[].class);
// Hard cast won't compile
return (TypeInformation<Tuple2<byte[], byte[]>>) typeInfo;
}
}
Flink 提供了 org.apache.flink.api.common.typeinfo.TypeHint
class 来帮助解决这个问题。 (documentation)
例如在你的第二个例子中而不是:
return (TypeInformation<Tuple2<byte[], byte[]>>) typeInfo;
你可以这样写:
return new TypeHint<Tuple2<byte[], byte[]>>(){}.getTypeInfo();
我希望我的 Flink 应用使用 Flink org.apache.flink.api.java.tuple.Tuple2
class 将来自 Kafka 主题的数据反序列化为 ConsumerRecord<byte[], byte[]>
的 Flink 流或 Tuple2<byte[], byte[]>
的 Flink 流.
对于这两个选项,我都无法获得可编译的 getProducedType 实现。
public class KafkaNOOPDeserialization implements KafkaDeserializationSchema<ConsumerRecord<byte[], byte[]>> {
@Override
public boolean isEndOfStream(ConsumerRecord<byte[], byte[]> nextElement) {
return false;
}
@Override
public ConsumerRecord<byte[], byte[]> deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
return record;
}
@Override
public TypeInformation<ConsumerRecord<byte[], byte[]>> getProducedType() {
// This provides TypeInformation<ConsumerRecord>
// It needs TypeInformation<ConsumerRecord<byte[], byte[]>>
var typeInfo = TypeExtractor.getForClass(ConsumerRecord.class);
// This hard cast won't compile
return (TypeInformation<ConsumerRecord<byte[], byte[]>>) typeInfo;
}
}
public class KafkaByteArrayTupleDeserializer implements KafkaDeserializationSchema<Tuple2<byte[], byte[]>> {
@Override
public boolean isEndOfStream(Tuple2<byte[], byte[]> nextElement) {
return false;
}
@Override
public Tuple2<byte[], byte[]> deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
return new Tuple2(record.key(), record.value());
}
@Override
public TypeInformation<Tuple2<byte[], byte[]>> getProducedType() {
// This provides TupleTypeInfo<Tuple>
// It needs TupleTypeInfo<Tuple2<byte[], byte[]>>
var typeInfo = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(
byte[].class, byte[].class);
// Hard cast won't compile
return (TypeInformation<Tuple2<byte[], byte[]>>) typeInfo;
}
}
Flink 提供了 org.apache.flink.api.common.typeinfo.TypeHint
class 来帮助解决这个问题。 (documentation)
例如在你的第二个例子中而不是:
return (TypeInformation<Tuple2<byte[], byte[]>>) typeInfo;
你可以这样写:
return new TypeHint<Tuple2<byte[], byte[]>>(){}.getTypeInfo();