运行 Azure Cosmos 的多个 ChangeFeedProcessor 实例
Running multiple instances of ChangeFeedProcessor for Azure Cosmos
我正在尝试 运行 ChangeFeedProcessor 的 2 个实例,都指向 相同的集合 并使用 相同的租约集合 在 Cosmos 帐户中。我在两个实例中都指定了唯一的 hostName
我的意图是根据逻辑分区(根据 Microsoft 文档)在实例之间分配 Feed 负载
当我尝试启动第二个实例时,我在控制台中收到以下异常。
有什么不同的方法可以实现吗?
Exception in thread "pool-23-thread-3" java.lang.NullPointerException
at
com.azure.data.cosmos.internal.changefeed.implementation.ExceptionClassifier.classifyClientException(ExceptionClassifier.java:56)
at
com.azure.data.cosmos.internal.changefeed.implementation.PartitionProcessorImpl.lambda$run[=14=](PartitionProcessorImpl.java:115)
at reactor.core.publisher.MonoRunnable.block(MonoRunnable.java:66)
at
com.azure.data.cosmos.internal.changefeed.implementation.PartitionSupervisorImpl.run(PartitionSupervisorImpl.java:89)
at java.lang.Thread.run(Thread.java:748) at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748) Exception in thread
"pool-19-thread-3" java.lang.NullPointerException at
com.azure.data.cosmos.internal.changefeed.implementation.ExceptionClassifier.classifyClientException(ExceptionClassifier.java:56)
at
com.azure.data.cosmos.internal.changefeed.implementation.PartitionProcessorImpl.lambda$run[=14=](PartitionProcessorImpl.java:115)
at reactor.core.publisher.MonoRunnable.block(MonoRunnable.java:66)
at
com.azure.data.cosmos.internal.changefeed.implementation.PartitionSupervisorImpl.run(PartitionSupervisorImpl.java:89)
at java.lang.Thread.run(Thread.java:748) at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748) Exception in thread
"pool-25-thread-3" java.lang.NullPointerException at
com.azure.data.cosmos.internal.changefeed.implementation.ExceptionClassifier.classifyClientException(ExceptionClassifier.java:56)
at
com.azure.data.cosmos.internal.changefeed.implementation.PartitionProcessorImpl.lambda$run[=14=](PartitionProcessorImpl.java:115)...etc
我使用了下面的maven依赖
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-cosmos</artifactId>
<version>3.0.0</version>
<exclusions>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
</exclusions>
</dependency>
代码片段
- 创建 ChangeFeedProcessor 列表(针对数据库中找到的所有容器)
//FEED DATABASE
CosmosDatabase feedDatabase = cosmosClient.getDatabase(cosmosDbName);
//LEASE DATABASE
CosmosDatabase leaseDatabase = cosmosClient.getDatabase(cosmosDbName + LEASES);
//List of Containers in Feed Database
List<CosmosContainerProperties> containerPropertiesList = null;
try {
Flux<FeedResponse<CosmosContainerProperties>> containers = feedDatabase.readAllContainers();
List<FeedResponse<CosmosContainerProperties>> list = containers.toStream().collect(Collectors.toList());//Abhishek Optimize
containerPropertiesList = list.get(0).results();
}
catch (Exception e) {
System.out.println("Fail to query Containers");
throw new ServiceException("Fail to query Containers");
}
containerPropertiesList.parallelStream().forEach(cosmosContainerProperties -> {
//FEED CONTAINER
String containerName = cosmosContainerProperties.getString("id");
CosmosContainer feedContainer = feedDatabase.getContainer(containerName);
//LEASE CONTAINER
String leaseContainerName = containerName + "-leases";
CosmosContainer leaseContainer = leaseDatabase.getContainer(leaseContainerName);
//Building ChangeFeedProcessor for current Container
ChangeFeedProcessorOptions changeFeedProcessorOptions = new ChangeFeedProcessorOptions();
changeFeedProcessorOptions.startTime(OffsetDateTime.now());
ChangeFeedProcessor changeFeedProcessor = null;
try {
ChangeFeedProcessor.BuilderDefinition builderDefinition = ChangeFeedProcessor.Builder()
.hostName("Host1")//used Host2 in the other Host
.feedContainer(feedContainer)
.leaseContainer(leaseContainer)
.options(changeFeedProcessorOptions)
.handleChanges(docs -> {
documentChangeHandler.processChanges(containerName, docs);
});
changeFeedProcessor = builderDefinition.build();
}
catch (Exception e) {
System.out.println("Fail to initialize ChangeFeedProcessor for " + containerName);
}
resultList.add(changeFeedProcessor);
System.out.println("processed: " + leaseContainerName);
});
- 然后返回 resultList 并在下面的方法中启动 ChangeFeedProcessors
public void startChangeFeed() {
if (null != changeFeedProcessors && !changeFeedProcessors.isEmpty()) {
changeFeedProcessors.parallelStream().forEach(processor->processor.start().block());
}
else {
System.out.println("changeFeedProcessors list is empty.. probably changeFeedProcessor has not been setup yet");
}
}
根据评论,问题与 VPN/Proxy 或某些阻止所需端口范围的问题有关。
直接模式,需要在VPN/Proxy/Firewall:
中打开和配置一定的端口范围
如果无法配置,可以切换到网关/HTTP模式。
Change Feed Processor 使用第二个 Leases 集合来存储状态(此处主要解释 https://docs.microsoft.com/azure/cosmos-db/change-feed-processor#components-of-the-change-feed-processor 以及 .NET 示例,但概念是相同的)。当前模型为每个物理分区创建 1 个租约(我说当前模型是因为此实现可以在未来改进以获得更好的分配),并且每个租约只能由 1 个实例拥有。因此,如果您有 2 个租约和 2 个实例,则每个租约将拥有 1 个租约。
每个实例将根据其拥有的 lease/s 处理 partition/s 中的更改。
90/10 的负载分布意味着您的集合中发生的变化似乎主要发生在一个分区(热分区)中,而不是平均分布。
我正在尝试 运行 ChangeFeedProcessor 的 2 个实例,都指向 相同的集合 并使用 相同的租约集合 在 Cosmos 帐户中。我在两个实例中都指定了唯一的 hostName
我的意图是根据逻辑分区(根据 Microsoft 文档)在实例之间分配 Feed 负载
当我尝试启动第二个实例时,我在控制台中收到以下异常。
有什么不同的方法可以实现吗?
Exception in thread "pool-23-thread-3" java.lang.NullPointerException at com.azure.data.cosmos.internal.changefeed.implementation.ExceptionClassifier.classifyClientException(ExceptionClassifier.java:56) at com.azure.data.cosmos.internal.changefeed.implementation.PartitionProcessorImpl.lambda$run[=14=](PartitionProcessorImpl.java:115) at reactor.core.publisher.MonoRunnable.block(MonoRunnable.java:66) at com.azure.data.cosmos.internal.changefeed.implementation.PartitionSupervisorImpl.run(PartitionSupervisorImpl.java:89) at java.lang.Thread.run(Thread.java:748) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Exception in thread "pool-19-thread-3" java.lang.NullPointerException at com.azure.data.cosmos.internal.changefeed.implementation.ExceptionClassifier.classifyClientException(ExceptionClassifier.java:56) at com.azure.data.cosmos.internal.changefeed.implementation.PartitionProcessorImpl.lambda$run[=14=](PartitionProcessorImpl.java:115) at reactor.core.publisher.MonoRunnable.block(MonoRunnable.java:66) at com.azure.data.cosmos.internal.changefeed.implementation.PartitionSupervisorImpl.run(PartitionSupervisorImpl.java:89) at java.lang.Thread.run(Thread.java:748) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Exception in thread "pool-25-thread-3" java.lang.NullPointerException at com.azure.data.cosmos.internal.changefeed.implementation.ExceptionClassifier.classifyClientException(ExceptionClassifier.java:56) at com.azure.data.cosmos.internal.changefeed.implementation.PartitionProcessorImpl.lambda$run[=14=](PartitionProcessorImpl.java:115)...etc
我使用了下面的maven依赖
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-cosmos</artifactId>
<version>3.0.0</version>
<exclusions>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
</exclusions>
</dependency>
代码片段
- 创建 ChangeFeedProcessor 列表(针对数据库中找到的所有容器)
//FEED DATABASE
CosmosDatabase feedDatabase = cosmosClient.getDatabase(cosmosDbName);
//LEASE DATABASE
CosmosDatabase leaseDatabase = cosmosClient.getDatabase(cosmosDbName + LEASES);
//List of Containers in Feed Database
List<CosmosContainerProperties> containerPropertiesList = null;
try {
Flux<FeedResponse<CosmosContainerProperties>> containers = feedDatabase.readAllContainers();
List<FeedResponse<CosmosContainerProperties>> list = containers.toStream().collect(Collectors.toList());//Abhishek Optimize
containerPropertiesList = list.get(0).results();
}
catch (Exception e) {
System.out.println("Fail to query Containers");
throw new ServiceException("Fail to query Containers");
}
containerPropertiesList.parallelStream().forEach(cosmosContainerProperties -> {
//FEED CONTAINER
String containerName = cosmosContainerProperties.getString("id");
CosmosContainer feedContainer = feedDatabase.getContainer(containerName);
//LEASE CONTAINER
String leaseContainerName = containerName + "-leases";
CosmosContainer leaseContainer = leaseDatabase.getContainer(leaseContainerName);
//Building ChangeFeedProcessor for current Container
ChangeFeedProcessorOptions changeFeedProcessorOptions = new ChangeFeedProcessorOptions();
changeFeedProcessorOptions.startTime(OffsetDateTime.now());
ChangeFeedProcessor changeFeedProcessor = null;
try {
ChangeFeedProcessor.BuilderDefinition builderDefinition = ChangeFeedProcessor.Builder()
.hostName("Host1")//used Host2 in the other Host
.feedContainer(feedContainer)
.leaseContainer(leaseContainer)
.options(changeFeedProcessorOptions)
.handleChanges(docs -> {
documentChangeHandler.processChanges(containerName, docs);
});
changeFeedProcessor = builderDefinition.build();
}
catch (Exception e) {
System.out.println("Fail to initialize ChangeFeedProcessor for " + containerName);
}
resultList.add(changeFeedProcessor);
System.out.println("processed: " + leaseContainerName);
});
- 然后返回 resultList 并在下面的方法中启动 ChangeFeedProcessors
public void startChangeFeed() {
if (null != changeFeedProcessors && !changeFeedProcessors.isEmpty()) {
changeFeedProcessors.parallelStream().forEach(processor->processor.start().block());
}
else {
System.out.println("changeFeedProcessors list is empty.. probably changeFeedProcessor has not been setup yet");
}
}
根据评论,问题与 VPN/Proxy 或某些阻止所需端口范围的问题有关。
直接模式,需要在VPN/Proxy/Firewall:
中打开和配置一定的端口范围如果无法配置,可以切换到网关/HTTP模式。
Change Feed Processor 使用第二个 Leases 集合来存储状态(此处主要解释 https://docs.microsoft.com/azure/cosmos-db/change-feed-processor#components-of-the-change-feed-processor 以及 .NET 示例,但概念是相同的)。当前模型为每个物理分区创建 1 个租约(我说当前模型是因为此实现可以在未来改进以获得更好的分配),并且每个租约只能由 1 个实例拥有。因此,如果您有 2 个租约和 2 个实例,则每个租约将拥有 1 个租约。
每个实例将根据其拥有的 lease/s 处理 partition/s 中的更改。
90/10 的负载分布意味着您的集合中发生的变化似乎主要发生在一个分区(热分区)中,而不是平均分布。