如何从一个 DoFn 输出单个值并将其用作另一个 DoFn 中的参数?
How do I output a single value from a DoFn and use that as a parameter in another DoFn?
我有一个 pub/sub 用行分隔的 json。每条 pub/sub 消息都有一个属性值,其中包含要写入的 bigquery table 名称。
如何获取个人 table 名称值,并将其传递给新管道?
是否可以创建一个新的 PCollection 并应用它...从 DoFn 本身内部?
您可以应用转换来检索 DoFn
中的 table 名称,并向下游传递 KV
对 <tableName, record>
。然后使用 BigQueryIO
中的动态目标支持将每条记录路由到正确的目标。或者,您也可以在 BigQuery.withFormatFunction()
中检索 table 属性。下面是一个这样做的例子。
这是整个管道结构,其中 JSON 消息从 Pub/Sub 中被消耗,然后根据 Pub/Sub 消息属性路由到正确的 table 目的地。同样,您可以更改 getTableDestination(..)
逻辑以从 JSON 记录中检索 table 名称。
您可以查看整个示例 here。
/**
* Runs the pipeline to completion with the specified options. This method does not wait until the
* pipeline is finished before returning. Invoke {@code result.waitUntilFinish()} on the result
* object to block until the pipeline is finished running if blocking programmatic execution is
* required.
*
* @param options The execution options.
* @return The pipeline result.
*/
public static PipelineResult run(Options options) {
// Create the pipeline
Pipeline pipeline = Pipeline.create(options);
// Retrieve non-serializable parameters
String tableNameAttr = options.getTableNameAttr();
String outputTableProject = options.getOutputTableProject();
String outputTableDataset = options.getOutputTableDataset();
// Build & execute pipeline
pipeline
.apply(
"ReadMessages",
PubsubIO.readMessagesWithAttributes().fromSubscription(options.getSubscription()))
.apply(
"WriteToBigQuery",
BigQueryIO.<PubsubMessage>write()
.to(
input ->
getTableDestination(
input,
tableNameAttr,
outputTableProject,
outputTableDataset))
.withFormatFunction(
(PubsubMessage msg) -> convertJsonToTableRow(new String(msg.getPayload())))
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withWriteDisposition(WriteDisposition.WRITE_APPEND));
return pipeline.run();
}
/**
* Retrieves the {@link TableDestination} for the {@link PubsubMessage} by extracting and
* formatting the value of the {@code tableNameAttr} attribute. If the message is null, a {@link
* RuntimeException} will be thrown because the message is unable to be routed.
*
* @param value The message to extract the table name from.
* @param tableNameAttr The name of the attribute within the message which contains the table
* name.
* @param outputProject The project which the table resides.
* @param outputDataset The dataset which the table resides.
* @return The destination to route the input message to.
*/
@VisibleForTesting
static TableDestination getTableDestination(
ValueInSingleWindow<PubsubMessage> value,
String tableNameAttr,
String outputProject,
String outputDataset) {
PubsubMessage message = value.getValue();
TableDestination destination;
if (message != null) {
destination =
new TableDestination(
String.format(
"%s:%s.%s",
outputProject, outputDataset, message.getAttributeMap().get(tableNameAttr)),
null);
} else {
throw new RuntimeException(
"Cannot retrieve the dynamic table destination of an null message!");
}
return destination;
}
我有一个 pub/sub 用行分隔的 json。每条 pub/sub 消息都有一个属性值,其中包含要写入的 bigquery table 名称。
如何获取个人 table 名称值,并将其传递给新管道?
是否可以创建一个新的 PCollection 并应用它...从 DoFn 本身内部?
您可以应用转换来检索 DoFn
中的 table 名称,并向下游传递 KV
对 <tableName, record>
。然后使用 BigQueryIO
中的动态目标支持将每条记录路由到正确的目标。或者,您也可以在 BigQuery.withFormatFunction()
中检索 table 属性。下面是一个这样做的例子。
这是整个管道结构,其中 JSON 消息从 Pub/Sub 中被消耗,然后根据 Pub/Sub 消息属性路由到正确的 table 目的地。同样,您可以更改 getTableDestination(..)
逻辑以从 JSON 记录中检索 table 名称。
您可以查看整个示例 here。
/**
* Runs the pipeline to completion with the specified options. This method does not wait until the
* pipeline is finished before returning. Invoke {@code result.waitUntilFinish()} on the result
* object to block until the pipeline is finished running if blocking programmatic execution is
* required.
*
* @param options The execution options.
* @return The pipeline result.
*/
public static PipelineResult run(Options options) {
// Create the pipeline
Pipeline pipeline = Pipeline.create(options);
// Retrieve non-serializable parameters
String tableNameAttr = options.getTableNameAttr();
String outputTableProject = options.getOutputTableProject();
String outputTableDataset = options.getOutputTableDataset();
// Build & execute pipeline
pipeline
.apply(
"ReadMessages",
PubsubIO.readMessagesWithAttributes().fromSubscription(options.getSubscription()))
.apply(
"WriteToBigQuery",
BigQueryIO.<PubsubMessage>write()
.to(
input ->
getTableDestination(
input,
tableNameAttr,
outputTableProject,
outputTableDataset))
.withFormatFunction(
(PubsubMessage msg) -> convertJsonToTableRow(new String(msg.getPayload())))
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withWriteDisposition(WriteDisposition.WRITE_APPEND));
return pipeline.run();
}
/**
* Retrieves the {@link TableDestination} for the {@link PubsubMessage} by extracting and
* formatting the value of the {@code tableNameAttr} attribute. If the message is null, a {@link
* RuntimeException} will be thrown because the message is unable to be routed.
*
* @param value The message to extract the table name from.
* @param tableNameAttr The name of the attribute within the message which contains the table
* name.
* @param outputProject The project which the table resides.
* @param outputDataset The dataset which the table resides.
* @return The destination to route the input message to.
*/
@VisibleForTesting
static TableDestination getTableDestination(
ValueInSingleWindow<PubsubMessage> value,
String tableNameAttr,
String outputProject,
String outputDataset) {
PubsubMessage message = value.getValue();
TableDestination destination;
if (message != null) {
destination =
new TableDestination(
String.format(
"%s:%s.%s",
outputProject, outputDataset, message.getAttributeMap().get(tableNameAttr)),
null);
} else {
throw new RuntimeException(
"Cannot retrieve the dynamic table destination of an null message!");
}
return destination;
}