使用 google 数据存储以流模式检查项目
Using google datastore to check items in streaming mode
我想将 Google 数据存储中不存在的项目插入其中。
我写了一个数据流流作业。
工作
static class RawToObjectConverter extends DoFn<String, Entity> {
@Override
public void processElement(ProcessContext c) {
Query<Entity> query = Query.entityQueryBuilder().kind("Post").filter(PropertyFilter.eq("postid", rq.postid))
.build();
QueryResults<Entity> posts = datastore.run(query);
if (posts == null || !posts.hasNext()) {
Entity post = Entity.builder(datastore.newKeyFactory().newKey("Post"))
.set("postid", "1")
.set("title", "p1")
.build();
c.output(post);
}
}
}
问题
lines.apply(ParDo.of(new RawToObjectConverter()))
.apply(DatastoreIO.v1().write().withProjectId(projectid));
类型 PCollection<Entity>
中的方法 apply(PTransform<? super PCollection<Entity>,OutputT>)
不适用于参数 (DatastoreV1.Write
)
我还应该使用 com.google.cloud.datastore.Datastore
还是 com.google.datastore.v1.Entity
?
您需要将 com.google.cloud.datastore.Entity 转换为 com.google.datastore。v1.Entity =14=]RawToObjectConverter 在使用 DatastoreIO.v1().write() 之前,
使用 toPb 方法
c.output(post.toPb());
我想将 Google 数据存储中不存在的项目插入其中。 我写了一个数据流流作业。
工作
static class RawToObjectConverter extends DoFn<String, Entity> {
@Override
public void processElement(ProcessContext c) {
Query<Entity> query = Query.entityQueryBuilder().kind("Post").filter(PropertyFilter.eq("postid", rq.postid))
.build();
QueryResults<Entity> posts = datastore.run(query);
if (posts == null || !posts.hasNext()) {
Entity post = Entity.builder(datastore.newKeyFactory().newKey("Post"))
.set("postid", "1")
.set("title", "p1")
.build();
c.output(post);
}
}
}
问题
lines.apply(ParDo.of(new RawToObjectConverter()))
.apply(DatastoreIO.v1().write().withProjectId(projectid));
类型 PCollection<Entity>
中的方法 apply(PTransform<? super PCollection<Entity>,OutputT>)
不适用于参数 (DatastoreV1.Write
)
我还应该使用 com.google.cloud.datastore.Datastore
还是 com.google.datastore.v1.Entity
?
您需要将 com.google.cloud.datastore.Entity 转换为 com.google.datastore。v1.Entity =14=]RawToObjectConverter 在使用 DatastoreIO.v1().write() 之前, 使用 toPb 方法
c.output(post.toPb());