Beam sql udf 将一列拆分为多列

Beam sql udf to split one column into multiple columns

如何实现一个beam sql udf函数将一列拆分为多列? 我已经在 bigquery udf 函数中实现了这个:

CREATE TEMP FUNCTION parseDescription(description STRING)
RETURNS STRUCT<msg STRING, ip STRING, source_region STRING, user_name STRING>
LANGUAGE js AS """
var arr = description.substring(0, description.length - 1).split(",");
var firstIndex = arr[0].indexOf(".");
this.msg = arr[0].substring(0, firstIndex);
this.ip = arr[0].substring(firstIndex + 2).split(": ")[1];
this.source_region = arr[1].split(": ")[1];
this.user_name = arr[2].split(": ")[1];
return this;
""";
INSERT INTO `table1` (parseDescription(event_description).* FROM `table2`;

beam sqludf函数是否也支持这种操作? 我试图 return beam udf 函数中的一个对象,但 beam sql 似乎不支持 object.* 语法。我也尝试 return 一个地图或一个数组,但仍然出错。 有没有办法在 beam 中实现相同的 udf?

我尝试使用 MapElement 方法但出现错误,似乎输出行需要与输入行相同的架构,示例:

import org.apache.beam.runners.direct.DirectOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.sql.SqlTransform;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;

public class BeamMain2 {
    public static void main(String[] args) {

        DirectOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
                .as(DirectOptions.class);
        Pipeline p = Pipeline.create(options);
        // Define the schema for the records.
        Schema appSchema = Schema.builder().addStringField("string1").addInt32Field("int1").build();
        Row row1 = Row.withSchema(appSchema).addValues("aaa,bbb", 1).build();
        Row row2 = Row.withSchema(appSchema).addValues("ccc,ddd", 2).build();
        Row row3 = Row.withSchema(appSchema).addValues("ddd,eee", 3).build();
        PCollection<Row> inputTable =
                PBegin.in(p).apply(Create.of(row1, row2, row3).withRowSchema(appSchema));
        Schema newSchema =
                Schema.builder()
                        .addNullableField("string2", Schema.FieldType.STRING)
                        .addInt32Field("int1")
                        .addNullableField("string3", Schema.FieldType.STRING)
                        .build();

        PCollection<Row> outputStream = inputTable.apply(
                SqlTransform.query(
                        "SELECT * "
                                + "FROM PCOLLECTION where int1 > 1"))
        .apply(MapElements.via(
                new SimpleFunction<Row, Row>() {
                    @Override
                    public Row apply(Row line) {
                        return Row.withSchema(newSchema).addValues("a", 1, "b").build();
                    }
                }));
        p.run().waitUntilFinish();
    }
}

参考:https://beam.apache.org/documentation/dsls/sql/overview/ 您可以使用转换中的发射 'Row' 元素,稍后可以将其用作 table

管道看起来像

架构

Schema schema =
    Schema.of(Schema.Field.of("f0", FieldType.INT64), Schema.Field.of("f1", FieldType.INT64));

变换

private static MapElements<Row, Row> rowsToStrings() {
return MapElements.into(TypeDescriptor.of(Row.class))
    .via(
        row -> Row.withSchema(schema).addValue(1L).addValue(2L).build(););
}

管道:

  pipeline
      .apply(
          "SQL Query 1",
          SqlTransform.query(<Query string 1>))
      .apply("Transform column", rowsToStrings())
      .apply(
          "SQL Query 2",
          SqlTransform.query(<Query string 2>))