Beam sql udf 将一列拆分为多列
Beam sql udf to split one column into multiple columns
如何实现一个beam sql udf函数将一列拆分为多列?
我已经在 bigquery udf 函数中实现了这个:
CREATE TEMP FUNCTION parseDescription(description STRING)
RETURNS STRUCT<msg STRING, ip STRING, source_region STRING, user_name STRING>
LANGUAGE js AS """
var arr = description.substring(0, description.length - 1).split(",");
var firstIndex = arr[0].indexOf(".");
this.msg = arr[0].substring(0, firstIndex);
this.ip = arr[0].substring(firstIndex + 2).split(": ")[1];
this.source_region = arr[1].split(": ")[1];
this.user_name = arr[2].split(": ")[1];
return this;
""";
INSERT INTO `table1` (parseDescription(event_description).* FROM `table2`;
beam sqludf函数是否也支持这种操作?
我试图 return beam udf 函数中的一个对象,但 beam sql 似乎不支持 object.* 语法。我也尝试 return 一个地图或一个数组,但仍然出错。
有没有办法在 beam 中实现相同的 udf?
我尝试使用 MapElement 方法但出现错误,似乎输出行需要与输入行相同的架构,示例:
import org.apache.beam.runners.direct.DirectOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.sql.SqlTransform;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
public class BeamMain2 {
public static void main(String[] args) {
DirectOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(DirectOptions.class);
Pipeline p = Pipeline.create(options);
// Define the schema for the records.
Schema appSchema = Schema.builder().addStringField("string1").addInt32Field("int1").build();
Row row1 = Row.withSchema(appSchema).addValues("aaa,bbb", 1).build();
Row row2 = Row.withSchema(appSchema).addValues("ccc,ddd", 2).build();
Row row3 = Row.withSchema(appSchema).addValues("ddd,eee", 3).build();
PCollection<Row> inputTable =
PBegin.in(p).apply(Create.of(row1, row2, row3).withRowSchema(appSchema));
Schema newSchema =
Schema.builder()
.addNullableField("string2", Schema.FieldType.STRING)
.addInt32Field("int1")
.addNullableField("string3", Schema.FieldType.STRING)
.build();
PCollection<Row> outputStream = inputTable.apply(
SqlTransform.query(
"SELECT * "
+ "FROM PCOLLECTION where int1 > 1"))
.apply(MapElements.via(
new SimpleFunction<Row, Row>() {
@Override
public Row apply(Row line) {
return Row.withSchema(newSchema).addValues("a", 1, "b").build();
}
}));
p.run().waitUntilFinish();
}
}
参考:https://beam.apache.org/documentation/dsls/sql/overview/
您可以使用转换中的发射 'Row' 元素,稍后可以将其用作 table
管道看起来像
架构
Schema schema =
Schema.of(Schema.Field.of("f0", FieldType.INT64), Schema.Field.of("f1", FieldType.INT64));
变换
private static MapElements<Row, Row> rowsToStrings() {
return MapElements.into(TypeDescriptor.of(Row.class))
.via(
row -> Row.withSchema(schema).addValue(1L).addValue(2L).build(););
}
管道:
pipeline
.apply(
"SQL Query 1",
SqlTransform.query(<Query string 1>))
.apply("Transform column", rowsToStrings())
.apply(
"SQL Query 2",
SqlTransform.query(<Query string 2>))
如何实现一个beam sql udf函数将一列拆分为多列? 我已经在 bigquery udf 函数中实现了这个:
CREATE TEMP FUNCTION parseDescription(description STRING)
RETURNS STRUCT<msg STRING, ip STRING, source_region STRING, user_name STRING>
LANGUAGE js AS """
var arr = description.substring(0, description.length - 1).split(",");
var firstIndex = arr[0].indexOf(".");
this.msg = arr[0].substring(0, firstIndex);
this.ip = arr[0].substring(firstIndex + 2).split(": ")[1];
this.source_region = arr[1].split(": ")[1];
this.user_name = arr[2].split(": ")[1];
return this;
""";
INSERT INTO `table1` (parseDescription(event_description).* FROM `table2`;
beam sqludf函数是否也支持这种操作? 我试图 return beam udf 函数中的一个对象,但 beam sql 似乎不支持 object.* 语法。我也尝试 return 一个地图或一个数组,但仍然出错。 有没有办法在 beam 中实现相同的 udf?
我尝试使用 MapElement 方法但出现错误,似乎输出行需要与输入行相同的架构,示例:
import org.apache.beam.runners.direct.DirectOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.sql.SqlTransform;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
public class BeamMain2 {
public static void main(String[] args) {
DirectOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(DirectOptions.class);
Pipeline p = Pipeline.create(options);
// Define the schema for the records.
Schema appSchema = Schema.builder().addStringField("string1").addInt32Field("int1").build();
Row row1 = Row.withSchema(appSchema).addValues("aaa,bbb", 1).build();
Row row2 = Row.withSchema(appSchema).addValues("ccc,ddd", 2).build();
Row row3 = Row.withSchema(appSchema).addValues("ddd,eee", 3).build();
PCollection<Row> inputTable =
PBegin.in(p).apply(Create.of(row1, row2, row3).withRowSchema(appSchema));
Schema newSchema =
Schema.builder()
.addNullableField("string2", Schema.FieldType.STRING)
.addInt32Field("int1")
.addNullableField("string3", Schema.FieldType.STRING)
.build();
PCollection<Row> outputStream = inputTable.apply(
SqlTransform.query(
"SELECT * "
+ "FROM PCOLLECTION where int1 > 1"))
.apply(MapElements.via(
new SimpleFunction<Row, Row>() {
@Override
public Row apply(Row line) {
return Row.withSchema(newSchema).addValues("a", 1, "b").build();
}
}));
p.run().waitUntilFinish();
}
}
参考:https://beam.apache.org/documentation/dsls/sql/overview/ 您可以使用转换中的发射 'Row' 元素,稍后可以将其用作 table
管道看起来像
架构
Schema schema =
Schema.of(Schema.Field.of("f0", FieldType.INT64), Schema.Field.of("f1", FieldType.INT64));
变换
private static MapElements<Row, Row> rowsToStrings() {
return MapElements.into(TypeDescriptor.of(Row.class))
.via(
row -> Row.withSchema(schema).addValue(1L).addValue(2L).build(););
}
管道:
pipeline
.apply(
"SQL Query 1",
SqlTransform.query(<Query string 1>))
.apply("Transform column", rowsToStrings())
.apply(
"SQL Query 2",
SqlTransform.query(<Query string 2>))