Beam 写入 avro 文件序列化错误
Beam write to avro file serialization error
我按照Beam documentation中写AVRO文件的例子做了。但是它在 p.run().waitUntilFinish()
步骤给了我一个错误 Caused by: java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema
。但是,如果我从一个 AVRO 文件读取并将其写入另一个 AVRO 输出,它工作正常。我的目标是从任意输入源编写 AVRO 文件。有没有人见过类似的问题?你是怎么解决的?
public class WriteAvro {
public interface CsvToAvroOptions extends PipelineOptions {
@Description("Path of the file to read from")
@Default.String("test.avro")
String getInputFile();
void setInputFile(String value);
}
static void run(CsvToAvroOptions options) throws IOException {
final Schema schema = new Schema.Parser().parse(Resources.getResource("person.avsc").openStream());
Pipeline p = Pipeline.create(options);
// This works fine
// PCollection<GenericRecord> input = p.apply(AvroIO.readGenericRecords(schema).from(options.getInputFile()));
// This doesn't work
PCollection<GenericRecord> input =
p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
.apply(ParDo.of(new DoFn<String, GenericRecord>() {
@ProcessElement
public void processElement(ProcessContext c) {
GenericRecord record = new GenericData.Record(schema);
record.put("name", "John Doe");
record.put("age", 42);
record.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
c.output(record);
}
}))
.setCoder(AvroCoder.of(GenericRecord.class, schema));
input.apply(AvroIO.writeGenericRecords(schema).to("prefix"));
p.run().waitUntilFinish();
}
public static void main(String[] args) throws IOException {
CsvToAvroOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(CsvToAvroOptions.class);
run(options);
}
}
- 光束版本:2.11.0
- 亚军:直接亚军
Schema
不可序列化导致此错误。
您可以将架构存储为文本并在 DoFn 设置时对其进行解析。
这里是你如何做到的。
public interface CsvToAvroOptions extends PipelineOptions {
@Description("Path of the file to read from")
@Default.String("test.avro")
String getInputFile();
void setInputFile(String value);
}
private static class ConstructAvroRecordsFn extends DoFn<String, GenericRecord> {
private final String schemaJson;
private Schema schema;
ConstructAvroRecordsFn(Schema schema){
schemaJson = schema.toString();
}
@Setup
public void setup(){
schema = new Schema.Parser().parse(schemaJson);
}
@ProcessElement
public void processElement(ProcessContext c) {
GenericRecord record = new GenericData.Record(schema);
record.put("name", "John Doe");
record.put("age", 42);
record.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
c.output(record);
}
}
static void run(CsvToAvroOptions options) throws IOException {
final Schema schema = new Schema.Parser().parse(Resources.getResource("person.avsc").openStream());
Pipeline p = Pipeline.create(options);
// This works fine
// PCollection<GenericRecord> input = p.apply(AvroIO.readGenericRecords(schema).from(options.getInputFile()));
// This doesn't work
PCollection<GenericRecord> input =
p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
.apply(ParDo.of(new ConstructAvroRecordsFn(schema)))
.setCoder(AvroCoder.of(GenericRecord.class, schema));
input.apply(AvroIO.writeGenericRecords(schema).to("prefix"));
p.run().waitUntilFinish();
}
public static void main(String[] args) throws IOException {
CsvToAvroOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(CsvToAvroOptions.class);
run(options);
}
}
我按照Beam documentation中写AVRO文件的例子做了。但是它在 p.run().waitUntilFinish()
步骤给了我一个错误 Caused by: java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema
。但是,如果我从一个 AVRO 文件读取并将其写入另一个 AVRO 输出,它工作正常。我的目标是从任意输入源编写 AVRO 文件。有没有人见过类似的问题?你是怎么解决的?
public class WriteAvro {
public interface CsvToAvroOptions extends PipelineOptions {
@Description("Path of the file to read from")
@Default.String("test.avro")
String getInputFile();
void setInputFile(String value);
}
static void run(CsvToAvroOptions options) throws IOException {
final Schema schema = new Schema.Parser().parse(Resources.getResource("person.avsc").openStream());
Pipeline p = Pipeline.create(options);
// This works fine
// PCollection<GenericRecord> input = p.apply(AvroIO.readGenericRecords(schema).from(options.getInputFile()));
// This doesn't work
PCollection<GenericRecord> input =
p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
.apply(ParDo.of(new DoFn<String, GenericRecord>() {
@ProcessElement
public void processElement(ProcessContext c) {
GenericRecord record = new GenericData.Record(schema);
record.put("name", "John Doe");
record.put("age", 42);
record.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
c.output(record);
}
}))
.setCoder(AvroCoder.of(GenericRecord.class, schema));
input.apply(AvroIO.writeGenericRecords(schema).to("prefix"));
p.run().waitUntilFinish();
}
public static void main(String[] args) throws IOException {
CsvToAvroOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(CsvToAvroOptions.class);
run(options);
}
}
- 光束版本:2.11.0
- 亚军:直接亚军
Schema
不可序列化导致此错误。
您可以将架构存储为文本并在 DoFn 设置时对其进行解析。
这里是你如何做到的。
public interface CsvToAvroOptions extends PipelineOptions {
@Description("Path of the file to read from")
@Default.String("test.avro")
String getInputFile();
void setInputFile(String value);
}
private static class ConstructAvroRecordsFn extends DoFn<String, GenericRecord> {
private final String schemaJson;
private Schema schema;
ConstructAvroRecordsFn(Schema schema){
schemaJson = schema.toString();
}
@Setup
public void setup(){
schema = new Schema.Parser().parse(schemaJson);
}
@ProcessElement
public void processElement(ProcessContext c) {
GenericRecord record = new GenericData.Record(schema);
record.put("name", "John Doe");
record.put("age", 42);
record.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
c.output(record);
}
}
static void run(CsvToAvroOptions options) throws IOException {
final Schema schema = new Schema.Parser().parse(Resources.getResource("person.avsc").openStream());
Pipeline p = Pipeline.create(options);
// This works fine
// PCollection<GenericRecord> input = p.apply(AvroIO.readGenericRecords(schema).from(options.getInputFile()));
// This doesn't work
PCollection<GenericRecord> input =
p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
.apply(ParDo.of(new ConstructAvroRecordsFn(schema)))
.setCoder(AvroCoder.of(GenericRecord.class, schema));
input.apply(AvroIO.writeGenericRecords(schema).to("prefix"));
p.run().waitUntilFinish();
}
public static void main(String[] args) throws IOException {
CsvToAvroOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(CsvToAvroOptions.class);
run(options);
}
}