Couchbase Java DCP 客户端未使用恢复文件从所有存储桶开始加载

Couchbase Java DCP client doesn't start the load from all bucket using Recovery File

我正在使用 Couchbase DCP java 客户端将一些数据从 Couchbase 复制到其他地方。当我开始加载数据时,它工作得很好,没有任何问题。但是我的桶现在已经很大了,如果在执行过程中出现任何问题,我会从头开始重新加载所有内容。现在这个负载需要 3 个多小时。

所以我开始每 N 分钟使用一次状态文件,它使用以下代码创建一个状态文件:

private def saveState(): String ={

    val filename = stateFilePath +
      s"couchbase-$bucket-${LocalDateTime.now.format(DateTimeFormatter.ofPattern("YYYYMMdd_HHmmss"))}"

    val state: Array[Byte] = client.sessionState.export(StateFormat.JSON)

    // Write it to a file
    new File(stateFilePath).mkdirs()
    val output: FileOutputStream = new FileOutputStream(new File(filename))
    IOUtils.write(state, output)
    output.close()
    filename
}

然后我加载文件并像这样开始流式传输:

client.connect().await()
client.recoverOrInitializeState(StateFormat.JSON, persistedFilePath, StreamFrom.BEGINNING, StreamTo.INFINITY).await()
client.startStreaming().await()

在我的代码中,我检查是否所有 Vbucket 都像这样正确连接:

logger.info(s"Number of documents processed: $objectCount")
var vbid = 0
while (vbid < client.numPartitions) {
  val open = client.streamIsOpen(vbid.toShort)
  if (!open) {
    logger.warn("Stream is not open for vBucket: {}", vbid)
  }

  vbid += 1

因此,出于某种原因,当我从状态文件中读取时,我开始有一些没有打开连接的 vBuckets。 最好的方法应该是什么?

我找到了与之相关的 Issue。

解决方案有效,但有一点。如果作业在开始读取每个 vBucket 中的至少 1 个文档之前失败,则为 0 的 vBuckets 的重新加载将失败。代码不会启动 vBucket 的使用者,因此您需要手动启动这些人。

尽管如此,当您从所有 vBucket 中读取一些数据时,它工作得很好。

我的问题是,我正在测试 DCP 的故障和恢复。我花了太多时间在这件事上,我开始复制并等待如果出现任何问题我们能够从头重新加载。一个好处是,这项工作没有失败。我们不得不重新启动机器,然后重新启动作业,一切都恢复得很好。 vBuckets 重新启动,现在复制正常。

感谢所有试图提供帮助的评论。