由于错误类型不兼容,使用 apache flink 的 Avro SpecificRecord 文件接收器未编译:FileSink<?> 无法转换为 SinkFunction<?>

Avro SpecificRecord File Sink using apache flink is not compiling due to error incompatible types: FileSink<?> cannot be converted to SinkFunction<?>

我有以下 avro 架构 User.avsc

{
  "type": "record",
  "namespace": "com.myorg",
  "name": "User",
  "fields": [
    {
      "name": "id",
      "type": "long"
    },
    {
      "name": "name",
      "type": "string"
    }
  ]
}

下面的 java User.java class 是使用 avro-maven-plugin.

从上面的 User.avsc 生成的
package com.myorg;

import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.nio.ByteBuffer;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Parser;
import org.apache.avro.data.RecordBuilder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.message.BinaryMessageDecoder;
import org.apache.avro.message.BinaryMessageEncoder;
import org.apache.avro.message.SchemaStore;
import org.apache.avro.specific.AvroGenerated;
import org.apache.avro.specific.SpecificData;
import org.apache.avro.specific.SpecificRecord;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.avro.specific.SpecificRecordBuilderBase;

@AvroGenerated
public class User extends SpecificRecordBase implements SpecificRecord {
    private static final long serialVersionUID = 8699049231783654635L;
    public static final Schema SCHEMA$ = (new Parser()).parse("{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"com.myorg\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"name\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}]}");
    private static SpecificData MODEL$ = new SpecificData();
    private static final BinaryMessageEncoder<User> ENCODER;
    private static final BinaryMessageDecoder<User> DECODER;
    /** @deprecated */
    @Deprecated
    public long id;
    /** @deprecated */
    @Deprecated
    public String name;
    private static final DatumWriter<User> WRITER$;
    private static final DatumReader<User> READER$;

    public static Schema getClassSchema() {
        return SCHEMA$;
    }

    public static BinaryMessageDecoder<User> getDecoder() {
        return DECODER;
    }

    public static BinaryMessageDecoder<User> createDecoder(SchemaStore resolver) {
        return new BinaryMessageDecoder(MODEL$, SCHEMA$, resolver);
    }

    public ByteBuffer toByteBuffer() throws IOException {
        return ENCODER.encode(this);
    }

    public static User fromByteBuffer(ByteBuffer b) throws IOException {
        return (User)DECODER.decode(b);
    }

    public User() {
    }

    public User(Long id, String name) {
        this.id = id;
        this.name = name;
    }

    public Schema getSchema() {
        return SCHEMA$;
    }

    public Object get(int field$) {
        switch(field$) {
        case 0:
            return this.id;
        case 1:
            return this.name;
        default:
            throw new AvroRuntimeException("Bad index");
        }
    }

    public void put(int field$, Object value$) {
        switch(field$) {
        case 0:
            this.id = (Long)value$;
            break;
        case 1:
            this.name = (String)value$;
            break;
        default:
            throw new AvroRuntimeException("Bad index");
        }

    }

    public Long getId() {
        return this.id;
    }

    public void setId(Long value) {
        this.id = value;
    }

    public String getName() {
        return this.name;
    }

    public void setName(String value) {
        this.name = value;
    }

    public void writeExternal(ObjectOutput out) throws IOException {
        WRITER$.write(this, SpecificData.getEncoder(out));
    }

    public void readExternal(ObjectInput in) throws IOException {
        READER$.read(this, SpecificData.getDecoder(in));
    }

    static {
        ENCODER = new BinaryMessageEncoder(MODEL$, SCHEMA$);
        DECODER = new BinaryMessageDecoder(MODEL$, SCHEMA$);
        WRITER$ = MODEL$.createDatumWriter(SCHEMA$);
        READER$ = MODEL$.createDatumReader(SCHEMA$);
    }

}

我想使用 apache flink 将 User SpecificRecord 实例写入文件 FileSink

下面是我写的程序-

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.avro.AvroWriters;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.myorg.User;

public class AvroFileSinkApp {

    private static final String OUTPUT_PATH = "./il/";
    public static void main(String[] args) {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(4);
        env.enableCheckpointing(5000L);

        DataStream<User> source = env.fromCollection(Arrays.asList(getUser()));
        FileSink<User> sink = org.apache.flink.connector.file.sink.FileSink.forBulkFormat(new Path(OUTPUT_PATH), AvroWriters.forSpecificRecord(User.class)).build();

        source.addSink( sink);
        env.execute("FileSinkProgram");
    }

    public static User getUser() {
        User u = new User();
        u.setId(1L);
        return u;
    }
}

我使用 this and this 作为参考编写了这个程序。出于某种原因,source.addSink( sink); 行抛出了以下编译错误。

incompatible types: org.apache.flink.connector.file.sink.FileSink<com.myorg.User> cannot be converted to org.apache.flink.streaming.api.functions.sink.SinkFunction<com.myorg.User>

项目在githubhere

也许你可以看看数据流接口。 addSink函数的入参为SinkFunction类型,sinkTo函数的入参为Sink

FileSink是基于Sink接口实现的,应该使用sinkTo函数

public class DataStream<T> {
......
    public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {

        // read the output type of the input Transform to coax out errors about MissingTypeInfo
        transformation.getOutputType();

        // configure the type if needed
        if (sinkFunction instanceof InputTypeConfigurable) {
            ((InputTypeConfigurable) sinkFunction).setInputType(getType(), getExecutionConfig());
        }

        StreamSink<T> sinkOperator = new StreamSink<>(clean(sinkFunction));

        DataStreamSink<T> sink = new DataStreamSink<>(this, sinkOperator);

        getExecutionEnvironment().addOperator(sink.getTransformation());
        return sink;
    }

    /**
     * Adds the given {@link Sink} to this DataStream. Only streams with sinks added will be
     * executed once the {@link StreamExecutionEnvironment#execute()} method is called.
     *
     * @param sink The user defined sink.
     * @return The closed DataStream.
     */
    @Experimental
    public DataStreamSink<T> sinkTo(Sink<T, ?, ?, ?> sink) {
        // read the output type of the input Transform to coax out errors about MissingTypeInfo
        transformation.getOutputType();

        return new DataStreamSink<>(this, sink);
    }
......
}