运行 Azure Cosmos 的多个 ChangeFeedProcessor 实例

Running multiple instances of ChangeFeedProcessor for Azure Cosmos

我正在尝试 运行 ChangeFeedProcessor 的 2 个实例,都指向 相同的集合 并使用 相同的租约集合 在 Cosmos 帐户中。我在两个实例中都指定了唯一的 hostName

我的意图是根据逻辑分区(根据 Microsoft 文档)在实例之间分配 Feed 负载

当我尝试启动第二个实例时,我在控制台中收到以下异常。

有什么不同的方法可以实现吗?

Exception in thread "pool-23-thread-3" java.lang.NullPointerException at com.azure.data.cosmos.internal.changefeed.implementation.ExceptionClassifier.classifyClientException(ExceptionClassifier.java:56) at com.azure.data.cosmos.internal.changefeed.implementation.PartitionProcessorImpl.lambda$run[=14=](PartitionProcessorImpl.java:115) at reactor.core.publisher.MonoRunnable.block(MonoRunnable.java:66) at com.azure.data.cosmos.internal.changefeed.implementation.PartitionSupervisorImpl.run(PartitionSupervisorImpl.java:89) at java.lang.Thread.run(Thread.java:748) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Exception in thread "pool-19-thread-3" java.lang.NullPointerException at com.azure.data.cosmos.internal.changefeed.implementation.ExceptionClassifier.classifyClientException(ExceptionClassifier.java:56) at com.azure.data.cosmos.internal.changefeed.implementation.PartitionProcessorImpl.lambda$run[=14=](PartitionProcessorImpl.java:115) at reactor.core.publisher.MonoRunnable.block(MonoRunnable.java:66) at com.azure.data.cosmos.internal.changefeed.implementation.PartitionSupervisorImpl.run(PartitionSupervisorImpl.java:89) at java.lang.Thread.run(Thread.java:748) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Exception in thread "pool-25-thread-3" java.lang.NullPointerException at com.azure.data.cosmos.internal.changefeed.implementation.ExceptionClassifier.classifyClientException(ExceptionClassifier.java:56) at com.azure.data.cosmos.internal.changefeed.implementation.PartitionProcessorImpl.lambda$run[=14=](PartitionProcessorImpl.java:115)...etc

我使用了下面的maven依赖

<dependency>
    <groupId>com.microsoft.azure</groupId>
    <artifactId>azure-cosmos</artifactId>
    <version>3.0.0</version>
    <exclusions>
        <exclusion>
            <artifactId>slf4j-api</artifactId>
            <groupId>org.slf4j</groupId>
        </exclusion>
        <exclusion>
            <artifactId>slf4j-log4j12</artifactId>
            <groupId>org.slf4j</groupId>
        </exclusion>
        <exclusion>
            <artifactId>guava</artifactId>
            <groupId>com.google.guava</groupId>
        </exclusion>
    </exclusions>
</dependency>

代码片段

  1. 创建 ChangeFeedProcessor 列表(针对数据库中找到的所有容器)

        //FEED DATABASE
        CosmosDatabase feedDatabase = cosmosClient.getDatabase(cosmosDbName);

        //LEASE DATABASE
        CosmosDatabase leaseDatabase = cosmosClient.getDatabase(cosmosDbName + LEASES);

        //List of Containers in Feed Database
        List<CosmosContainerProperties> containerPropertiesList = null;
        try {
            Flux<FeedResponse<CosmosContainerProperties>> containers = feedDatabase.readAllContainers();
            List<FeedResponse<CosmosContainerProperties>> list = containers.toStream().collect(Collectors.toList());//Abhishek Optimize
            containerPropertiesList = list.get(0).results();
        }
        catch (Exception e) {
            System.out.println("Fail to query Containers");
            throw new ServiceException("Fail to query Containers");
        }

containerPropertiesList.parallelStream().forEach(cosmosContainerProperties -> {
                //FEED CONTAINER
                String containerName = cosmosContainerProperties.getString("id");
                CosmosContainer feedContainer = feedDatabase.getContainer(containerName);

                //LEASE CONTAINER
                String leaseContainerName = containerName + "-leases";
                CosmosContainer leaseContainer = leaseDatabase.getContainer(leaseContainerName);

                //Building ChangeFeedProcessor for current Container
                ChangeFeedProcessorOptions changeFeedProcessorOptions = new ChangeFeedProcessorOptions();
                changeFeedProcessorOptions.startTime(OffsetDateTime.now());

                ChangeFeedProcessor changeFeedProcessor = null;
                try {
                    ChangeFeedProcessor.BuilderDefinition builderDefinition = ChangeFeedProcessor.Builder()                           
                            .hostName("Host1")//used Host2 in the other Host
                            .feedContainer(feedContainer)
                            .leaseContainer(leaseContainer)
                            .options(changeFeedProcessorOptions)
                            .handleChanges(docs -> {
                                documentChangeHandler.processChanges(containerName, docs);
                            });
                    changeFeedProcessor = builderDefinition.build();
                }
                catch (Exception e) {
                    System.out.println("Fail to initialize ChangeFeedProcessor for " + containerName);
                }
                resultList.add(changeFeedProcessor);

                System.out.println("processed:  " + leaseContainerName);
            });
  1. 然后返回 resultList 并在下面的方法中启动 ChangeFeedProcessors
public void startChangeFeed() {
        if (null != changeFeedProcessors && !changeFeedProcessors.isEmpty()) {
            changeFeedProcessors.parallelStream().forEach(processor->processor.start().block());
        }
        else {
            System.out.println("changeFeedProcessors list is empty.. probably changeFeedProcessor has not been setup yet");
        }
    }

根据评论,问题与 VPN/Proxy 或某些阻止所需端口范围的问题有关。

直接模式,需要在VPN/Proxy/Firewall:

中打开和配置一定的端口范围

如果无法配置,可以切换到网关/HTTP模式。

Change Feed Processor 使用第二个 Leases 集合来存储状态(此处主要解释 https://docs.microsoft.com/azure/cosmos-db/change-feed-processor#components-of-the-change-feed-processor 以及 .NET 示例,但概念是相同的)。当前模型为每个物理分区创建 1 个租约(我说当前模型是因为此实现可以在未来改进以获得更好的分配),并且每个租约只能由 1 个实例拥有。因此,如果您有 2 个租约和 2 个实例,则每个租约将拥有 1 个租约。

每个实例将根据其拥有的 lease/s 处理 partition/s 中的更改。

90/10 的负载分布意味着您的集合中发生的变化似乎主要发生在一个分区(热分区)中,而不是平均分布。