Apache Beam:为什么它在 REPORT_FAILURES 模式下两次写入 Spanner?
Apache Beam: Why does it write to Spanner twice on REPORT_FAILURES mode?
在看SpannerIO的时候发现写操作代码很有意思,想了解一下原因。
在写入(WriteToSpannerFn
)和REPORT_FAILURES
失败模式下,似乎尝试写入失败的突变两次。
我认为这是为了记录每个突变的异常。这是一个正确的假设吗?有什么解决方法吗?
下面,为了简单起见,我删除了一些行。
public void processElement(ProcessContext c) {
Iterable<MutationGroup> mutations = c.element();
boolean tryIndividual = false;
try {
Iterable<Mutation> batch = Iterables.concat(mutations);
spannerAccessor.getDatabaseClient().writeAtLeastOnce(batch);
} catch (SpannerException e) {
if (failureMode == FailureMode.REPORT_FAILURES) {
tryIndividual = true;
} else {
...
}
}
if (tryIndividual) {
for (MutationGroup mg : mutations) {
try {
spannerAccessor.getDatabaseClient().writeAtLeastOnce(mg);
} catch (SpannerException e) {
LOG.warn("Failed to submit the mutation group", e);
c.output(failedTag, mg);
}
}
}
}
因此,SpannerIO.write() 连接器尝试在单个事务中写入一批 Mutation 以提高效率,而不是将每个 Mutation 单独写入数据库。
如果批处理中只有一个 Mutation 失败,则整个事务都会失败,因此在 REPORT_FAILURES 模式下,将单独重新尝试 mutation 以找出哪些 Mutation 是有问题的。 ..
在看SpannerIO的时候发现写操作代码很有意思,想了解一下原因。
在写入(WriteToSpannerFn
)和REPORT_FAILURES
失败模式下,似乎尝试写入失败的突变两次。
我认为这是为了记录每个突变的异常。这是一个正确的假设吗?有什么解决方法吗?
下面,为了简单起见,我删除了一些行。
public void processElement(ProcessContext c) {
Iterable<MutationGroup> mutations = c.element();
boolean tryIndividual = false;
try {
Iterable<Mutation> batch = Iterables.concat(mutations);
spannerAccessor.getDatabaseClient().writeAtLeastOnce(batch);
} catch (SpannerException e) {
if (failureMode == FailureMode.REPORT_FAILURES) {
tryIndividual = true;
} else {
...
}
}
if (tryIndividual) {
for (MutationGroup mg : mutations) {
try {
spannerAccessor.getDatabaseClient().writeAtLeastOnce(mg);
} catch (SpannerException e) {
LOG.warn("Failed to submit the mutation group", e);
c.output(failedTag, mg);
}
}
}
}
因此,SpannerIO.write() 连接器尝试在单个事务中写入一批 Mutation 以提高效率,而不是将每个 Mutation 单独写入数据库。
如果批处理中只有一个 Mutation 失败,则整个事务都会失败,因此在 REPORT_FAILURES 模式下,将单独重新尝试 mutation 以找出哪些 Mutation 是有问题的。 ..