在数据流管道步骤中获取数据存储实体
Get datastore entity inside dataflow pipeline step
我正在尝试设置
的数据流管道
- 从数据存储中读取实体,
- 对于找到的每个实体,查找不同的实体以提取 属性、
- 并使用该 属性 值更新原始实体。
我如何才能在给定读取的情况下查找特定实体?或者有更好的方法吗?
这是我目前的情况:
static class LookupOtherEntityFn extends DoFn<Entity, Entity> {
private Entity LookupOtherEntityFn(Entity sourceEntity) {
final Key someOtherEntityKey = sourceEntity.getPropertiesMap()
.get("otherEntityKey")
.getKeyValue();
final DatastoreV1.Read read = DatastoreIO.v1()
.read(); // ...... (uses someOtherEntityKey in query)
// **************************
// How do I retrieve the entity given a Read?
// **************************
}
@ProcessElement
public void processElement(ProcessContext c) {
c.output(createBackupEntity(c.element()));
}
}
Pipeline p = Pipeline.create(options);
p.apply(DatastoreIO.v1().read()...)
.apply(ParDo.of(new LookupOtherEntityFn()))
.apply(ParDo.of(new CreateUpdatedEntityFn())
.apply(DatastoreIO.v1().write()...);
p.run();
您将无法使用 DatastoreV1.Read
class 从转换中的 Cloud Datastore 读取数据。 DatastoreV1.Read
专门用作 Apache Beam 管道的输入转换。
您可以使用 Cloud Datastore Java 客户端读取转换实施中的实体。理想情况下,您应该实施转换,以便在工作人员之间仅共享一个连接并且异步进行调用。
来自 API 文档的示例:
import com.google.cloud.datastore.Datastore;
import com.google.cloud.datastore.DatastoreOptions;
import com.google.cloud.datastore.Entity;
import com.google.cloud.datastore.Key;
import com.google.cloud.datastore.KeyFactory;
// Authentication is automatic inside Google Compute Engine
// and Google App Engine.
Datastore datastore = DatastoreOptions.getDefaultInstance().getService();
KeyFactory keyFactory = datastore.newKeyFactory().setKind(KIND);
Key key = keyFactory.newKey(keyName);
Entity entity = datastore.get(key);
参考文献:
1. https://cloud.google.com/datastore/docs/reference/libraries#client-libraries-install-java
2. https://googlecloudplatform.github.io/google-cloud-java/0.42.1/index.html
3.https://googlecloudplatform.github.io/google-cloud-java/0.42.1/apidocs/com/google/cloud/datastore/Datastore.html
我正在尝试设置
的数据流管道- 从数据存储中读取实体,
- 对于找到的每个实体,查找不同的实体以提取 属性、
- 并使用该 属性 值更新原始实体。
我如何才能在给定读取的情况下查找特定实体?或者有更好的方法吗?
这是我目前的情况:
static class LookupOtherEntityFn extends DoFn<Entity, Entity> {
private Entity LookupOtherEntityFn(Entity sourceEntity) {
final Key someOtherEntityKey = sourceEntity.getPropertiesMap()
.get("otherEntityKey")
.getKeyValue();
final DatastoreV1.Read read = DatastoreIO.v1()
.read(); // ...... (uses someOtherEntityKey in query)
// **************************
// How do I retrieve the entity given a Read?
// **************************
}
@ProcessElement
public void processElement(ProcessContext c) {
c.output(createBackupEntity(c.element()));
}
}
Pipeline p = Pipeline.create(options);
p.apply(DatastoreIO.v1().read()...)
.apply(ParDo.of(new LookupOtherEntityFn()))
.apply(ParDo.of(new CreateUpdatedEntityFn())
.apply(DatastoreIO.v1().write()...);
p.run();
您将无法使用 DatastoreV1.Read
class 从转换中的 Cloud Datastore 读取数据。 DatastoreV1.Read
专门用作 Apache Beam 管道的输入转换。
您可以使用 Cloud Datastore Java 客户端读取转换实施中的实体。理想情况下,您应该实施转换,以便在工作人员之间仅共享一个连接并且异步进行调用。
来自 API 文档的示例:
import com.google.cloud.datastore.Datastore;
import com.google.cloud.datastore.DatastoreOptions;
import com.google.cloud.datastore.Entity;
import com.google.cloud.datastore.Key;
import com.google.cloud.datastore.KeyFactory;
// Authentication is automatic inside Google Compute Engine
// and Google App Engine.
Datastore datastore = DatastoreOptions.getDefaultInstance().getService();
KeyFactory keyFactory = datastore.newKeyFactory().setKind(KIND);
Key key = keyFactory.newKey(keyName);
Entity entity = datastore.get(key);
参考文献:
1. https://cloud.google.com/datastore/docs/reference/libraries#client-libraries-install-java
2. https://googlecloudplatform.github.io/google-cloud-java/0.42.1/index.html
3.https://googlecloudplatform.github.io/google-cloud-java/0.42.1/apidocs/com/google/cloud/datastore/Datastore.html