由于错误类型不兼容,使用 apache flink 的 Avro SpecificRecord 文件接收器未编译:FileSink<?> 无法转换为 SinkFunction<?>
Avro SpecificRecord File Sink using apache flink is not compiling due to error incompatible types: FileSink<?> cannot be converted to SinkFunction<?>
我有以下 avro 架构 User.avsc
{
"type": "record",
"namespace": "com.myorg",
"name": "User",
"fields": [
{
"name": "id",
"type": "long"
},
{
"name": "name",
"type": "string"
}
]
}
下面的 java User.java
class 是使用 avro-maven-plugin.
从上面的 User.avsc 生成的
package com.myorg;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.nio.ByteBuffer;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Parser;
import org.apache.avro.data.RecordBuilder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.message.BinaryMessageDecoder;
import org.apache.avro.message.BinaryMessageEncoder;
import org.apache.avro.message.SchemaStore;
import org.apache.avro.specific.AvroGenerated;
import org.apache.avro.specific.SpecificData;
import org.apache.avro.specific.SpecificRecord;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.avro.specific.SpecificRecordBuilderBase;
@AvroGenerated
public class User extends SpecificRecordBase implements SpecificRecord {
private static final long serialVersionUID = 8699049231783654635L;
public static final Schema SCHEMA$ = (new Parser()).parse("{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"com.myorg\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"name\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}]}");
private static SpecificData MODEL$ = new SpecificData();
private static final BinaryMessageEncoder<User> ENCODER;
private static final BinaryMessageDecoder<User> DECODER;
/** @deprecated */
@Deprecated
public long id;
/** @deprecated */
@Deprecated
public String name;
private static final DatumWriter<User> WRITER$;
private static final DatumReader<User> READER$;
public static Schema getClassSchema() {
return SCHEMA$;
}
public static BinaryMessageDecoder<User> getDecoder() {
return DECODER;
}
public static BinaryMessageDecoder<User> createDecoder(SchemaStore resolver) {
return new BinaryMessageDecoder(MODEL$, SCHEMA$, resolver);
}
public ByteBuffer toByteBuffer() throws IOException {
return ENCODER.encode(this);
}
public static User fromByteBuffer(ByteBuffer b) throws IOException {
return (User)DECODER.decode(b);
}
public User() {
}
public User(Long id, String name) {
this.id = id;
this.name = name;
}
public Schema getSchema() {
return SCHEMA$;
}
public Object get(int field$) {
switch(field$) {
case 0:
return this.id;
case 1:
return this.name;
default:
throw new AvroRuntimeException("Bad index");
}
}
public void put(int field$, Object value$) {
switch(field$) {
case 0:
this.id = (Long)value$;
break;
case 1:
this.name = (String)value$;
break;
default:
throw new AvroRuntimeException("Bad index");
}
}
public Long getId() {
return this.id;
}
public void setId(Long value) {
this.id = value;
}
public String getName() {
return this.name;
}
public void setName(String value) {
this.name = value;
}
public void writeExternal(ObjectOutput out) throws IOException {
WRITER$.write(this, SpecificData.getEncoder(out));
}
public void readExternal(ObjectInput in) throws IOException {
READER$.read(this, SpecificData.getDecoder(in));
}
static {
ENCODER = new BinaryMessageEncoder(MODEL$, SCHEMA$);
DECODER = new BinaryMessageDecoder(MODEL$, SCHEMA$);
WRITER$ = MODEL$.createDatumWriter(SCHEMA$);
READER$ = MODEL$.createDatumReader(SCHEMA$);
}
}
我想使用 apache flink 将 User SpecificRecord 实例写入文件 FileSink。
下面是我写的程序-
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.avro.AvroWriters;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.myorg.User;
public class AvroFileSinkApp {
private static final String OUTPUT_PATH = "./il/";
public static void main(String[] args) {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
env.enableCheckpointing(5000L);
DataStream<User> source = env.fromCollection(Arrays.asList(getUser()));
FileSink<User> sink = org.apache.flink.connector.file.sink.FileSink.forBulkFormat(new Path(OUTPUT_PATH), AvroWriters.forSpecificRecord(User.class)).build();
source.addSink( sink);
env.execute("FileSinkProgram");
}
public static User getUser() {
User u = new User();
u.setId(1L);
return u;
}
}
我使用 this and this 作为参考编写了这个程序。出于某种原因,source.addSink( sink);
行抛出了以下编译错误。
incompatible types: org.apache.flink.connector.file.sink.FileSink<com.myorg.User> cannot be converted to org.apache.flink.streaming.api.functions.sink.SinkFunction<com.myorg.User>
项目在githubhere
也许你可以看看数据流接口。 addSink函数的入参为SinkFunction类型,sinkTo函数的入参为Sink
FileSink是基于Sink接口实现的,应该使用sinkTo函数
public class DataStream<T> {
......
public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {
// read the output type of the input Transform to coax out errors about MissingTypeInfo
transformation.getOutputType();
// configure the type if needed
if (sinkFunction instanceof InputTypeConfigurable) {
((InputTypeConfigurable) sinkFunction).setInputType(getType(), getExecutionConfig());
}
StreamSink<T> sinkOperator = new StreamSink<>(clean(sinkFunction));
DataStreamSink<T> sink = new DataStreamSink<>(this, sinkOperator);
getExecutionEnvironment().addOperator(sink.getTransformation());
return sink;
}
/**
* Adds the given {@link Sink} to this DataStream. Only streams with sinks added will be
* executed once the {@link StreamExecutionEnvironment#execute()} method is called.
*
* @param sink The user defined sink.
* @return The closed DataStream.
*/
@Experimental
public DataStreamSink<T> sinkTo(Sink<T, ?, ?, ?> sink) {
// read the output type of the input Transform to coax out errors about MissingTypeInfo
transformation.getOutputType();
return new DataStreamSink<>(this, sink);
}
......
}
我有以下 avro 架构 User.avsc
{
"type": "record",
"namespace": "com.myorg",
"name": "User",
"fields": [
{
"name": "id",
"type": "long"
},
{
"name": "name",
"type": "string"
}
]
}
下面的 java User.java
class 是使用 avro-maven-plugin.
package com.myorg;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.nio.ByteBuffer;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Parser;
import org.apache.avro.data.RecordBuilder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.message.BinaryMessageDecoder;
import org.apache.avro.message.BinaryMessageEncoder;
import org.apache.avro.message.SchemaStore;
import org.apache.avro.specific.AvroGenerated;
import org.apache.avro.specific.SpecificData;
import org.apache.avro.specific.SpecificRecord;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.avro.specific.SpecificRecordBuilderBase;
@AvroGenerated
public class User extends SpecificRecordBase implements SpecificRecord {
private static final long serialVersionUID = 8699049231783654635L;
public static final Schema SCHEMA$ = (new Parser()).parse("{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"com.myorg\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"name\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}]}");
private static SpecificData MODEL$ = new SpecificData();
private static final BinaryMessageEncoder<User> ENCODER;
private static final BinaryMessageDecoder<User> DECODER;
/** @deprecated */
@Deprecated
public long id;
/** @deprecated */
@Deprecated
public String name;
private static final DatumWriter<User> WRITER$;
private static final DatumReader<User> READER$;
public static Schema getClassSchema() {
return SCHEMA$;
}
public static BinaryMessageDecoder<User> getDecoder() {
return DECODER;
}
public static BinaryMessageDecoder<User> createDecoder(SchemaStore resolver) {
return new BinaryMessageDecoder(MODEL$, SCHEMA$, resolver);
}
public ByteBuffer toByteBuffer() throws IOException {
return ENCODER.encode(this);
}
public static User fromByteBuffer(ByteBuffer b) throws IOException {
return (User)DECODER.decode(b);
}
public User() {
}
public User(Long id, String name) {
this.id = id;
this.name = name;
}
public Schema getSchema() {
return SCHEMA$;
}
public Object get(int field$) {
switch(field$) {
case 0:
return this.id;
case 1:
return this.name;
default:
throw new AvroRuntimeException("Bad index");
}
}
public void put(int field$, Object value$) {
switch(field$) {
case 0:
this.id = (Long)value$;
break;
case 1:
this.name = (String)value$;
break;
default:
throw new AvroRuntimeException("Bad index");
}
}
public Long getId() {
return this.id;
}
public void setId(Long value) {
this.id = value;
}
public String getName() {
return this.name;
}
public void setName(String value) {
this.name = value;
}
public void writeExternal(ObjectOutput out) throws IOException {
WRITER$.write(this, SpecificData.getEncoder(out));
}
public void readExternal(ObjectInput in) throws IOException {
READER$.read(this, SpecificData.getDecoder(in));
}
static {
ENCODER = new BinaryMessageEncoder(MODEL$, SCHEMA$);
DECODER = new BinaryMessageDecoder(MODEL$, SCHEMA$);
WRITER$ = MODEL$.createDatumWriter(SCHEMA$);
READER$ = MODEL$.createDatumReader(SCHEMA$);
}
}
我想使用 apache flink 将 User SpecificRecord 实例写入文件 FileSink。
下面是我写的程序-
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.avro.AvroWriters;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.myorg.User;
public class AvroFileSinkApp {
private static final String OUTPUT_PATH = "./il/";
public static void main(String[] args) {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
env.enableCheckpointing(5000L);
DataStream<User> source = env.fromCollection(Arrays.asList(getUser()));
FileSink<User> sink = org.apache.flink.connector.file.sink.FileSink.forBulkFormat(new Path(OUTPUT_PATH), AvroWriters.forSpecificRecord(User.class)).build();
source.addSink( sink);
env.execute("FileSinkProgram");
}
public static User getUser() {
User u = new User();
u.setId(1L);
return u;
}
}
我使用 this and this 作为参考编写了这个程序。出于某种原因,source.addSink( sink);
行抛出了以下编译错误。
incompatible types: org.apache.flink.connector.file.sink.FileSink<com.myorg.User> cannot be converted to org.apache.flink.streaming.api.functions.sink.SinkFunction<com.myorg.User>
项目在githubhere
也许你可以看看数据流接口。 addSink函数的入参为SinkFunction类型,sinkTo函数的入参为Sink
FileSink是基于Sink接口实现的,应该使用sinkTo函数
public class DataStream<T> {
......
public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {
// read the output type of the input Transform to coax out errors about MissingTypeInfo
transformation.getOutputType();
// configure the type if needed
if (sinkFunction instanceof InputTypeConfigurable) {
((InputTypeConfigurable) sinkFunction).setInputType(getType(), getExecutionConfig());
}
StreamSink<T> sinkOperator = new StreamSink<>(clean(sinkFunction));
DataStreamSink<T> sink = new DataStreamSink<>(this, sinkOperator);
getExecutionEnvironment().addOperator(sink.getTransformation());
return sink;
}
/**
* Adds the given {@link Sink} to this DataStream. Only streams with sinks added will be
* executed once the {@link StreamExecutionEnvironment#execute()} method is called.
*
* @param sink The user defined sink.
* @return The closed DataStream.
*/
@Experimental
public DataStreamSink<T> sinkTo(Sink<T, ?, ?, ?> sink) {
// read the output type of the input Transform to coax out errors about MissingTypeInfo
transformation.getOutputType();
return new DataStreamSink<>(this, sink);
}
......
}