数据流 + 数据存储 = DatastoreException:I/O 错误
Dataflow + Datastore = DatastoreException: I/O error
我正在尝试使用 com.google.cloud.datastore
从 DataFlow 写入 DataStore。
我的代码如下所示(灵感来自 [1] 中的示例):
public void processElement(ProcessContext c) {
LocalDatastoreHelper HELPER = LocalDatastoreHelper.create(1.0);
Datastore datastore = HELPER.options().toBuilder().namespace("ghijklmnop").build().service();
Key taskKey = datastore.newKeyFactory()
.ancestors(PathElement.of("TaskList", "default"))
.kind("Task")
.newKey("sampleTask");
Entity task = Entity.builder(taskKey)
.set("category", "Personal")
.set("done", false)
.set("priority", 4)
.set("description", "Learn Cloud Datastore")
.build();
datastore.put(task);
}
我收到这个错误:
exception: "java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: com.google.cloud.datastore.DatastoreException: I/O error
at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.output(SimpleParDoFn.java:162)
at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContext.sideOutputWindowedValue(DoFnRunnerBase.java:314)
at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnProcessContext.sideOutput(DoFnRunnerBase.java:470)
at com.google.cloud.dataflow.sdk.transforms.Partition$PartitionDoFn.processElement(Partition.java:172)
我已经尝试使用 DatastoreIO
接收器,但流式运行器目前似乎不支持它。
如何避免该错误?或者从 DataFlow 写入 DataStore 的最佳方式是什么?
按照@Sam McVeety 的建议,我尝试将我的 Datastore 代码隔离在 Dataflow 之外。我确实遇到了同样的错误!
但这也让我看到了异常的原因,这是我在 Dataflow 日志中没有看到的:
Caused by: java.net.ConnectException: Connection refused
线索在我使用的导入行中:com.google.cloud.datastore.testing.LocalDatastoreHelper
。
它是一个测试助手,负责在本地基本模拟数据存储 API。哎呀
这是我在本地调试后得到的代码:
public void processElement(ProcessContext c) {
final Datastore datastore = DatastoreOptions.defaultInstance().service();
final KeyFactory keyFactory = datastore.newKeyFactory().kind("Task");
Key key = datastore.allocateId(keyFactory.newKey());
Entity task = Entity.builder(key)
.set("description", StringValue.builder(":D").excludeFromIndexes(true).build())
.set("created", DateTime.now())
.set("done", false)
.build();
datastore.put(task);
}
主要区别是:
LocalDatastoreHelper.create(1.0).options().toBuilder().namespace("ghijklmnop").build().service()
变成
DatastoreOptions.defaultInstance().service();
我正在尝试使用 com.google.cloud.datastore
从 DataFlow 写入 DataStore。
我的代码如下所示(灵感来自 [1] 中的示例):
public void processElement(ProcessContext c) {
LocalDatastoreHelper HELPER = LocalDatastoreHelper.create(1.0);
Datastore datastore = HELPER.options().toBuilder().namespace("ghijklmnop").build().service();
Key taskKey = datastore.newKeyFactory()
.ancestors(PathElement.of("TaskList", "default"))
.kind("Task")
.newKey("sampleTask");
Entity task = Entity.builder(taskKey)
.set("category", "Personal")
.set("done", false)
.set("priority", 4)
.set("description", "Learn Cloud Datastore")
.build();
datastore.put(task);
}
我收到这个错误:
exception: "java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: com.google.cloud.datastore.DatastoreException: I/O error
at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.output(SimpleParDoFn.java:162)
at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContext.sideOutputWindowedValue(DoFnRunnerBase.java:314)
at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnProcessContext.sideOutput(DoFnRunnerBase.java:470)
at com.google.cloud.dataflow.sdk.transforms.Partition$PartitionDoFn.processElement(Partition.java:172)
我已经尝试使用 DatastoreIO
接收器,但流式运行器目前似乎不支持它。
如何避免该错误?或者从 DataFlow 写入 DataStore 的最佳方式是什么?
按照@Sam McVeety 的建议,我尝试将我的 Datastore 代码隔离在 Dataflow 之外。我确实遇到了同样的错误!
但这也让我看到了异常的原因,这是我在 Dataflow 日志中没有看到的:
Caused by: java.net.ConnectException: Connection refused
线索在我使用的导入行中:com.google.cloud.datastore.testing.LocalDatastoreHelper
。
它是一个测试助手,负责在本地基本模拟数据存储 API。哎呀
这是我在本地调试后得到的代码:
public void processElement(ProcessContext c) {
final Datastore datastore = DatastoreOptions.defaultInstance().service();
final KeyFactory keyFactory = datastore.newKeyFactory().kind("Task");
Key key = datastore.allocateId(keyFactory.newKey());
Entity task = Entity.builder(key)
.set("description", StringValue.builder(":D").excludeFromIndexes(true).build())
.set("created", DateTime.now())
.set("done", false)
.build();
datastore.put(task);
}
主要区别是:
LocalDatastoreHelper.create(1.0).options().toBuilder().namespace("ghijklmnop").build().service()
变成
DatastoreOptions.defaultInstance().service();