从 PCollection<TableRow> 中获取单个字段
Get individual field from PCollection<TableRow>
我正在 tableA
中使用 Apache Beam 读取数据流并写入 BigQuery。我的行存储在 <TableRow>
.
类型的数据集中
目前我正在按原样读取行并写入 table。但是,我想根据 timestamp
列过滤行,并在 FirstName
和 LastName
中的结果 Name
列中拆分数据,然后再使用新模式写入数据一个名为 tableB
的新 table
我不知道如何从 PCollection
数据集中获取单个字段,我正在寻求有关从我的 PCollection<TableRow>
中获取行字段 Name
的正确语法的帮助
这是我的代码:
PCollection<TableRow> rows =
transformedRows.apply("Get rows", BeamIO.getRows());
/*Split a row here and name it rowsAfterColumnSplit
--
--
*/
//Write the original set of rows
WriteResult writeResult1 =
rows.apply("write rows",
BeamIO.getBigQueryIOWriter(schema, "tableA"));
//Write rowsAfterColumnSplit with new schema
WriteResult writeResult2 =
rowsAfterColumnSplit.apply("write rows after column split",
BeamIO.getBigQueryIOWriter(newSchema, "tableB"));
这是示例数据:
| timestamp | Name | City |
| 2020-07-14 20:12:01.342 UTC | Karl Streisand | Berlin |
| 2020-07-14 22:10:10.234 UTC | Anna Karlstad | Munich |
我想通过过滤时间戳在 22.00.00 之后的行来拆分名称“Anna Karlstad”
这里是你如何做到的:
PCollection<TableRow> transformedRows = rows.apply(ParDo.of(Transform.splitColumn()));
这里是 Transform
class:
import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.transforms.DoFn;
public class Transform extends DoFn<TableRow, TableRow> {
private Transform() {
}
public static Transform splitColumn() {
return new Transform();
}
@ProcessElement
public void processElement(@Element TableRow input, OutputReceiver<TableRow> output) {
if(input.get("Name").toString()!=null){
input.set("FirstName", input.get("Name").toString().split(" ")[0]);
input.set("LastName", input.get("Name").toString().split(" ")[1]);
}
output.output(input);
}
}
然后您创建一个包含额外字段的新架构,即 FirstName
和 LastName
并使用 WriteResult
写入 BigQuery
我正在 tableA
中使用 Apache Beam 读取数据流并写入 BigQuery。我的行存储在 <TableRow>
.
目前我正在按原样读取行并写入 table。但是,我想根据 timestamp
列过滤行,并在 FirstName
和 LastName
中的结果 Name
列中拆分数据,然后再使用新模式写入数据一个名为 tableB
我不知道如何从 PCollection
数据集中获取单个字段,我正在寻求有关从我的 PCollection<TableRow>
中获取行字段 Name
的正确语法的帮助
这是我的代码:
PCollection<TableRow> rows =
transformedRows.apply("Get rows", BeamIO.getRows());
/*Split a row here and name it rowsAfterColumnSplit
--
--
*/
//Write the original set of rows
WriteResult writeResult1 =
rows.apply("write rows",
BeamIO.getBigQueryIOWriter(schema, "tableA"));
//Write rowsAfterColumnSplit with new schema
WriteResult writeResult2 =
rowsAfterColumnSplit.apply("write rows after column split",
BeamIO.getBigQueryIOWriter(newSchema, "tableB"));
这是示例数据:
| timestamp | Name | City |
| 2020-07-14 20:12:01.342 UTC | Karl Streisand | Berlin |
| 2020-07-14 22:10:10.234 UTC | Anna Karlstad | Munich |
我想通过过滤时间戳在 22.00.00 之后的行来拆分名称“Anna Karlstad”
这里是你如何做到的:
PCollection<TableRow> transformedRows = rows.apply(ParDo.of(Transform.splitColumn()));
这里是 Transform
class:
import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.transforms.DoFn;
public class Transform extends DoFn<TableRow, TableRow> {
private Transform() {
}
public static Transform splitColumn() {
return new Transform();
}
@ProcessElement
public void processElement(@Element TableRow input, OutputReceiver<TableRow> output) {
if(input.get("Name").toString()!=null){
input.set("FirstName", input.get("Name").toString().split(" ")[0]);
input.set("LastName", input.get("Name").toString().split(" ")[1]);
}
output.output(input);
}
}
然后您创建一个包含额外字段的新架构,即 FirstName
和 LastName
并使用 WriteResult