对同一域具有有限并发请求的反应式网络爬虫
Reactive web-crawler with limited concurrent request to the same domain
我正在开发开源 web crawling project。我注意到该应用程序偶尔会用请求淹没它正在抓取的网站(我回来了 429 Too Many Requests
)。因此,我想将并发请求数限制为一个,同一域的请求之间有一秒的延迟。
我想出了这个代码来做到这一点:
Flux.generate(downloaderQueueConsumer)
.doFirst(this::initializeProcessing)
.flatMap(this::evaluateDocumentLocation)
.groupBy(this::parseDocumentDomain, 100000)
.flatMap(documentSourceItem1 -> documentSourceItem1
.delayElements(Duration.ofSeconds(1))
.doOnNext(this::incrementProcessedCount)
.flatMap(this::downloadDocument)
.flatMap(this::archiveDocument)
.doOnNext(this::incrementArchivedCount)
)
.doFinally(this::finishProcessing)
.subscribe();
我对这段代码的问题是它没有将一个域的并行请求数限制为一个。有办法实现吗?
如果您想这样做,您可能需要在 Flux 外部维护某种状态 - 没有明显的方法来存储和更改 Flux 本身内的此类可变数据。
话虽这么说,这不是我推荐的速率限制方法 - 我已经做了类似于以下的事情,这是一个更好、更稳健的解决方案:
- 将 429 状态代码映射到 "rate limit" 异常(您可能需要自己定义此异常类型)
- 引入 reactor-extra,然后使用
Retry
使用带抖动的指数退避(或您喜欢的任何退避策略)。
这将使您能够更好地控制特定的重试策略,并可能使您的代码更具可读性。
我正在开发开源 web crawling project。我注意到该应用程序偶尔会用请求淹没它正在抓取的网站(我回来了 429 Too Many Requests
)。因此,我想将并发请求数限制为一个,同一域的请求之间有一秒的延迟。
我想出了这个代码来做到这一点:
Flux.generate(downloaderQueueConsumer)
.doFirst(this::initializeProcessing)
.flatMap(this::evaluateDocumentLocation)
.groupBy(this::parseDocumentDomain, 100000)
.flatMap(documentSourceItem1 -> documentSourceItem1
.delayElements(Duration.ofSeconds(1))
.doOnNext(this::incrementProcessedCount)
.flatMap(this::downloadDocument)
.flatMap(this::archiveDocument)
.doOnNext(this::incrementArchivedCount)
)
.doFinally(this::finishProcessing)
.subscribe();
我对这段代码的问题是它没有将一个域的并行请求数限制为一个。有办法实现吗?
如果您想这样做,您可能需要在 Flux 外部维护某种状态 - 没有明显的方法来存储和更改 Flux 本身内的此类可变数据。
话虽这么说,这不是我推荐的速率限制方法 - 我已经做了类似于以下的事情,这是一个更好、更稳健的解决方案:
- 将 429 状态代码映射到 "rate limit" 异常(您可能需要自己定义此异常类型)
- 引入 reactor-extra,然后使用
Retry
使用带抖动的指数退避(或您喜欢的任何退避策略)。
这将使您能够更好地控制特定的重试策略,并可能使您的代码更具可读性。