用它自己的结果喂芹菜队列

Feeding celery queue with it's own result

我正在为我的 SPA 应用程序编写爬虫程序。由于它是一个 SPA,我不能使用 wget/curl 或任何其他基于非浏览器的解决方案进行抓取,因为我需要一个浏览器才能 运行 我的 SPA 中的 javascript 。

我使用 python 和 selenium 对此进行了编码。它将从主页开始,扫描所有 href 元素,将它们保存在 set 中,丢弃我已经访问过的元素(visitedopened with selenium and collected all the href elements 中一样),并从集合中取出下一个 URL 并访问它。然后它会一遍又一遍地重复这个过程,直到它访问了所有链接。

代码如下所示:

def main():

    ...

    # Here we will be saving all the links that we can find in the DOM of
    # each visited URL
    collected = set()
    collected.add(crawler.start_url)

    # Here we will be saving all the URLs that we have already visited
    visited = set()

    base_netloc = urlparse(crawler.start_url).netloc

    while len(collected):
        url = collected.pop()

        urls = self.collect_urls(url)
        urls = [x for x in urls if x not in visited and urlparse(x).netloc == base_netloc]

        collected = collected.union(urls)
        visited.add(url)

    crawler.links = list(visited)
    crawler.save()

def collect_urls(self, url):
    browser = Browser()
    browser.fetch(url)

    urls = set()
    elements = browser.get_xpath_elements("//a[@href]")
    for element in elements:
        link = browser.get_element_attribute(element, "href")

        if link != url:
            urls.add(link)

    browser.stop()
    return urls

我想让每次调用 collect_urls 都成为一个 Celery 任务,这样它可以在失败时重试,并使整个过程更快(使用多个 worker)。问题是 collect_urls 是从 while 内部调用的,这取决于 collected 集,该集由 collect_urls 的结果填充。

我知道我可以使用 delay() 调用 Celery 任务并使用 get() 等待结果,所以我的代码将如下所示:

    while len(collected):
        url = collected.pop()

        task = self.collect_urls.delay(url)
        urls = task.get(timeout=30)

这会将我对 collect_urls 的调用转换为 Celery 任务,如果出现问题,它将允许我重试,但我仍然无法使用多个 worker,因为我需要等待 delay() 的结果。

如何重构我的代码,使我可以为 collect_urls 使用多个 worker?

简短的回答,如果你想为了速度而分发它,你必须把已经访问过的网站的集合变成一个跨进程安全的结构。例如,您可以将其作为一个集合存储在 Redis 或数据库中 table。完成后,您可以更新代码以执行以下操作:

# kick off initial set of tasks:
result_id = uuid.uuid4()
for x in collected:
    task = self.collect_urls.delay(x, result_id)
return result_id

您可以使用 result_id 定期检查已访问的 url 集合。一旦该集合具有相同长度的 n 次调用,您就认为它已经完成。

在 collect_urls 函数中,您基本上是这样做的:

def collect_urls(self, url, result_id):
    # for example, you can use redis smember to check if the 
    # set at result_id contains url
    if url has been visited:
        return
    # you can do this in redis using sadd
    add url to the set of visited
    # collect urls as before
    ...
    # but instead of returning the urls, you kick off new tasks
    for x in urls:
        collect_urls.delay(x, result_id)

如果您使用 redis,所有收集/访问的 urls 将包含在由 result_id 标识的 redis 键中。您不必使用 redis,您可以轻松地使用数据库中的行来执行此操作,其中 result_id 作为一列,而 url 在另一列中。