Flink TypeInfo Java 泛型

Flink TypeInfo Java Generics

我希望我的 Flink 应用使用 Flink org.apache.flink.api.java.tuple.Tuple2 class 将来自 Kafka 主题的数据反序列化为 ConsumerRecord<byte[], byte[]> 的 Flink 流或 Tuple2<byte[], byte[]> 的 Flink 流.

对于这两个选项,我都无法获得可编译的 getProducedType 实现。

public class KafkaNOOPDeserialization implements KafkaDeserializationSchema<ConsumerRecord<byte[], byte[]>> {
    @Override
    public boolean isEndOfStream(ConsumerRecord<byte[], byte[]> nextElement) {
        return false;
    }

    @Override
    public ConsumerRecord<byte[], byte[]> deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
        return record;
    }

    @Override
    public TypeInformation<ConsumerRecord<byte[], byte[]>> getProducedType() {
        // This provides TypeInformation<ConsumerRecord>
        // It needs TypeInformation<ConsumerRecord<byte[], byte[]>>
        var typeInfo = TypeExtractor.getForClass(ConsumerRecord.class);
        // This hard cast won't compile
        return (TypeInformation<ConsumerRecord<byte[], byte[]>>) typeInfo;
    }
}
public class KafkaByteArrayTupleDeserializer implements KafkaDeserializationSchema<Tuple2<byte[], byte[]>> {
    @Override
    public boolean isEndOfStream(Tuple2<byte[], byte[]> nextElement) {
        return false;
    }

    @Override
    public Tuple2<byte[], byte[]> deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
        return new Tuple2(record.key(), record.value());
    }

    @Override
    public TypeInformation<Tuple2<byte[], byte[]>> getProducedType() {
        // This provides TupleTypeInfo<Tuple>
        // It needs TupleTypeInfo<Tuple2<byte[], byte[]>>
        var typeInfo = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(
                byte[].class, byte[].class);
        // Hard cast won't compile
        return (TypeInformation<Tuple2<byte[], byte[]>>) typeInfo;
    }
}

Flink 提供了 org.apache.flink.api.common.typeinfo.TypeHint class 来帮助解决这个问题。 (documentation)

例如在你的第二个例子中而不是:

return (TypeInformation<Tuple2<byte[], byte[]>>) typeInfo;

你可以这样写:

return new TypeHint<Tuple2<byte[], byte[]>>(){}.getTypeInfo();