无法使用 Cloud Dataflow 将可为空的整数值写入 BigQuery
Unable to write nullable integer values to BigQuery using Cloud Dataflow
我正在尝试使用 Cloud Dataflow 写入 BigQuery table。此 BigQuery table 有一个设置为可为空的整数列。对于空值,它给出以下错误:
Could not convert value to integer. Field: ITM_QT; Value:
但是当我将同一列的数据类型转换为字符串时,它正在接受空值。
那么有什么方法可以使用 Cloud Dataflow 将空值写入整数列吗?
如果我将列数据类型更改为字符串,则会出现此错误。
不确定您做错了什么,但以下代码工作正常,并且确实允许在 BigQuery 中为 Integer 和 Float 数据类型写入 null
值:
public static void main(String[] args) {
DataflowPipelineOptions options = PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
options.setRunner(DirectPipelineRunner.class);
options.setProject("<project-id>");
Pipeline pipeline = Pipeline.create(options);
PCollection<TableRow> results = pipeline.apply("whatever", BigQueryIO.Read.from("<table-spec>")).apply(ParDo.of(new DoFn<TableRow, TableRow>() {
@Override
public void processElement(ProcessContext c) throws Exception {
System.out.println(c.element());
TableRow row = new TableRow();
row.set("foo", null); //null FLOAT
row.set("bar", null); //null INTEGER
c.output(row);
}
}));
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("foo").setType("FLOAT"));
fields.add(new TableFieldSchema().setName("bar").setType("INTEGER"));
TableSchema schema = new TableSchema().setFields(fields);
results.apply(BigQueryIO.Write
.named("Write")
.to("<project-id>:<dataset-name>.write_null_numbers_test")
.withSchema(schema)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
pipeline.run();
}
我正在尝试使用 Cloud Dataflow 写入 BigQuery table。此 BigQuery table 有一个设置为可为空的整数列。对于空值,它给出以下错误:
Could not convert value to integer. Field: ITM_QT; Value:
但是当我将同一列的数据类型转换为字符串时,它正在接受空值。
那么有什么方法可以使用 Cloud Dataflow 将空值写入整数列吗?
如果我将列数据类型更改为字符串,则会出现此错误。
不确定您做错了什么,但以下代码工作正常,并且确实允许在 BigQuery 中为 Integer 和 Float 数据类型写入 null
值:
public static void main(String[] args) {
DataflowPipelineOptions options = PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
options.setRunner(DirectPipelineRunner.class);
options.setProject("<project-id>");
Pipeline pipeline = Pipeline.create(options);
PCollection<TableRow> results = pipeline.apply("whatever", BigQueryIO.Read.from("<table-spec>")).apply(ParDo.of(new DoFn<TableRow, TableRow>() {
@Override
public void processElement(ProcessContext c) throws Exception {
System.out.println(c.element());
TableRow row = new TableRow();
row.set("foo", null); //null FLOAT
row.set("bar", null); //null INTEGER
c.output(row);
}
}));
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("foo").setType("FLOAT"));
fields.add(new TableFieldSchema().setName("bar").setType("INTEGER"));
TableSchema schema = new TableSchema().setFields(fields);
results.apply(BigQueryIO.Write
.named("Write")
.to("<project-id>:<dataset-name>.write_null_numbers_test")
.withSchema(schema)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
pipeline.run();
}