从 PCollection<TableRow> 中获取单个字段

Get individual field from PCollection<TableRow>

我正在 tableA 中使用 Apache Beam 读取数据流并写入 BigQuery。我的行存储在 <TableRow>.

类型的数据集中

目前我正在按原样读取行并写入 table。但是,我想根据 timestamp 列过滤行,并在 FirstNameLastName 中的结果 Name 列中拆分数据,然后再使用新模式写入数据一个名为 tableB

的新 table

我不知道如何从 PCollection 数据集中获取单个字段,我正在寻求有关从我的 PCollection<TableRow> 中获取行字段 Name 的正确语法的帮助

这是我的代码:

PCollection<TableRow> rows =
                        transformedRows.apply("Get rows", BeamIO.getRows());
    
       /*Split a row here and name it rowsAfterColumnSplit
         --
         --
       */
        
        //Write the original set of rows
        WriteResult writeResult1 =
                        rows.apply("write rows",
                                BeamIO.getBigQueryIOWriter(schema, "tableA"));
    
        //Write rowsAfterColumnSplit with new schema
        WriteResult writeResult2 =
                        rowsAfterColumnSplit.apply("write rows after column split",
                                BeamIO.getBigQueryIOWriter(newSchema, "tableB"));

这是示例数据:

| timestamp                    | Name           | City    |

|  2020-07-14 20:12:01.342 UTC | Karl Streisand | Berlin  | 
|  2020-07-14 22:10:10.234 UTC | Anna Karlstad  | Munich  | 

我想通过过滤时间戳在 22.00.00 之后的行来拆分名称“Anna Karlstad”

这里是你如何做到的:

PCollection<TableRow> transformedRows = rows.apply(ParDo.of(Transform.splitColumn()));

这里是 Transform class:

import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.transforms.DoFn;

public class Transform extends DoFn<TableRow, TableRow> {

    private Transform() {
    }

    public static Transform splitColumn() {
        return new Transform();
    }

    @ProcessElement
    public void processElement(@Element TableRow input, OutputReceiver<TableRow> output) {


        if(input.get("Name").toString()!=null){
            input.set("FirstName", input.get("Name").toString().split(" ")[0]);
            input.set("LastName", input.get("Name").toString().split(" ")[1]);
        }
        output.output(input);

    }
}

然后您创建一个包含额外字段的新架构,即 FirstNameLastName 并使用 WriteResult

写入 BigQuery