对同一域具有有限并发请求的反应式网络爬虫

Reactive web-crawler with limited concurrent request to the same domain

我正在开发开源 web crawling project。我注意到该应用程序偶尔会用请求淹没它正在抓取的网站(我回来了 429 Too Many Requests)。因此,我想将并发请求数限制为一个,同一域的请求之间有一秒的延迟。

我想出了这个代码来做到这一点:

Flux.generate(downloaderQueueConsumer)
    .doFirst(this::initializeProcessing)
    .flatMap(this::evaluateDocumentLocation)
    .groupBy(this::parseDocumentDomain, 100000)
    .flatMap(documentSourceItem1 -> documentSourceItem1
            .delayElements(Duration.ofSeconds(1))
            .doOnNext(this::incrementProcessedCount)
            .flatMap(this::downloadDocument)
            .flatMap(this::archiveDocument)
            .doOnNext(this::incrementArchivedCount)
    )
    .doFinally(this::finishProcessing)
    .subscribe();

我对这段代码的问题是它没有将一个域的并行请求数限制为一个。有办法实现吗?

如果您想这样做,您可能需要在 Flux 外部维护某种状态 - 没有明显的方法来存储和更改 Flux 本身内的此类可变数据。

话虽这么说,这不是我推荐的速率限制方法 - 我已经做了类似于以下的事情,这是一个更好、更稳健的解决方案:

  • 将 429 状态代码映射到 "rate limit" 异常(您可能需要自己定义此异常类型)
  • 引入 reactor-extra,然后使用 Retry 使用带抖动的指数退避(或您喜欢的任何退避策略)。

这将使您能够更好地控制特定的重试策略,并可能使您的代码更具可读性。