如何创建从 postgres 到 parquet 的管道?

How to create pipeline from postgres to parquet?

我们正在创建数据流管道,我们将从 postgres 读取数据并将其写入 parquet 文件。我们正在使用 org.apache.beam.sdk.io.jdbc 来读取和 org.apache.beam.sdk.io.parquet 包来写入文件。 ParquetIO.Sink 允许您将 GenericRecord 的 PCollection 写入 Parquet 文件(从这里 https://beam.apache.org/releases/javadoc/2.5.0/org/apache/beam/sdk/io/parquet/ParquetIO.html)。

到目前为止,这是我的代码:

Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());

Schema schema = SchemaBuilder
                .record("table").namespace("org.apache.avro.ipc")
                .fields()
                .name("id").type("int").noDefault()
                .name("number").type("int").noDefault()
                .name("name").type().stringType().noDefault()
                .name("password").type().stringType().noDefault()

p.apply(JdbcIO.<GenericRecord> read()
            .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
                    "org.postgresql.Driver", "jdbc:postgresql://localhost:port/database")
                    .withUsername("username")
                    .withPassword("password"))
                .withQuery("select * from table")
                .withRowMapper((JdbcIO.RowMapper<GenericRecord>) resultSet -> {
                        GenericRecord record = new GenericData.Record(schema);
                        ResultSetMetaData metadata = resultSet.getMetaData();
                        int columnsNumber = metadata.getColumnCount();
                        for(int i=0; i<columnsNumber; i++) {
                            String columnValue = resultSet.getString(i+1);
                            record.put(i, columnValue);
                        }
                    return record;
                })
                .withCoder(AvroCoder.of(schema)))
            .apply(FileIO.<GenericRecord>write()
                    .via(ParquetIO.sink(schema).withCompressionCodec(CompressionCodecName.SNAPPY))
                    .to("somethingg.parquet")
                    );
p.run()

我收到这个错误

Exception in thread "main" java.lang.IllegalArgumentException: unable to serialize DoFnWithExecutionInformation{doFn=org.apache.beam.sdk.io.jdbc.JdbcIO$ReadFn@4393593c, mainOutputTag=Tag<output>, schemaInformation=DoFnSchemaInformation{elementConverters=[]}}
    at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:55)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ParDoTranslation.translateDoFn(ParDoTranslation.java:564)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ParDoTranslation.translateDoFn(ParDoTranslation.java:212)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ParDoTranslation.payloadForParDoLike(ParDoTranslation.java:705)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ParDoTranslation.translateParDo(ParDoTranslation.java:208)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ParDoTranslation.translateParDo(ParDoTranslation.java:187)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ParDoTranslation$ParDoTranslator.translate(ParDoTranslation.java:125)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.PTransformTranslation.toProto(PTransformTranslation.java:155)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ParDoTranslation.getParDoPayload(ParDoTranslation.java:651)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ParDoTranslation.isSplittable(ParDoTranslation.java:666)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.PTransformMatchers.matches(PTransformMatchers.java:269)
    at org.apache.beam.sdk.Pipeline.visitPrimitiveTransform(Pipeline.java:280)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:665)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.access0(TransformHierarchy.java:317)
    at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251)
    at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
    at org.apache.beam.sdk.Pipeline.replace(Pipeline.java:258)
    at org.apache.beam.sdk.Pipeline.replaceAll(Pipeline.java:208)
    at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:170)
    at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)
    at com.click.example.StarterPipeline.main(StarterPipeline.java:196)
Caused by: java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema
    at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1185)
    at java.base/java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1379)
    at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175)
    at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
    at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
    at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
    at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
    at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
    at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
    at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
    at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
    at java.base/java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1379)
    at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175)
    at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
    at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
    at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
    at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
    at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
    at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
    at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
    at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
    at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
    at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
    at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
    at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
    at java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:349)
    at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:51)
    ... 26 more

错误在堆栈跟踪中得到了很好的解释:Caused by: java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema

withRowMapper() 采用可序列化的 RowMapper<> 功能接口。并且在需要时由 Beam 对其进行序列化和反序列化。但是,在您的 lambda 中,您还使用了在 lambda(闭包)之外定义的 Schema 实例。因此,当序列化您的 lambda Java 时,也必须序列化 schema,因为它在那里使用。但是 Schema 不可序列化,所以它失败了。

我能想到的解决方法很少:

  • 在 lambda 中创建模式:

    • 在这种情况下,模式实例将不会被序列化;
    • 每次调用 lambda 时都会创建它;
  • 在 lambda 外部将模式(例如,Json 字符串)序列化为可序列化对象,然后在 lambda 内部反序列化它:

    • 它与上面的基本相同,但有一个额外的序列化步骤;
    • 在 lambda 中,它仍然需要在每次调用时反序列化;
  • find/write 一个可序列化的 Schema 实现:

    • 可能不可能或很难做到;
    • 与上述方法相比,开销可能会更少,因为反序列化只会在创建 RowMapper<>;
    • 的实例时发生

我认为在 lambda 中创建模式的新实例是完全可以的,除非它会导致问题。