用它自己的结果喂芹菜队列
Feeding celery queue with it's own result
我正在为我的 SPA 应用程序编写爬虫程序。由于它是一个 SPA,我不能使用 wget/curl 或任何其他基于非浏览器的解决方案进行抓取,因为我需要一个浏览器才能 运行 我的 SPA 中的 javascript 。
我使用 python 和 selenium 对此进行了编码。它将从主页开始,扫描所有 href
元素,将它们保存在 set
中,丢弃我已经访问过的元素(visited
与 opened with selenium and collected all the href elements
中一样),并从集合中取出下一个 URL 并访问它。然后它会一遍又一遍地重复这个过程,直到它访问了所有链接。
代码如下所示:
def main():
...
# Here we will be saving all the links that we can find in the DOM of
# each visited URL
collected = set()
collected.add(crawler.start_url)
# Here we will be saving all the URLs that we have already visited
visited = set()
base_netloc = urlparse(crawler.start_url).netloc
while len(collected):
url = collected.pop()
urls = self.collect_urls(url)
urls = [x for x in urls if x not in visited and urlparse(x).netloc == base_netloc]
collected = collected.union(urls)
visited.add(url)
crawler.links = list(visited)
crawler.save()
def collect_urls(self, url):
browser = Browser()
browser.fetch(url)
urls = set()
elements = browser.get_xpath_elements("//a[@href]")
for element in elements:
link = browser.get_element_attribute(element, "href")
if link != url:
urls.add(link)
browser.stop()
return urls
我想让每次调用 collect_urls
都成为一个 Celery 任务,这样它可以在失败时重试,并使整个过程更快(使用多个 worker)。问题是 collect_urls
是从 while
内部调用的,这取决于 collected
集,该集由 collect_urls
的结果填充。
我知道我可以使用 delay()
调用 Celery 任务并使用 get()
等待结果,所以我的代码将如下所示:
while len(collected):
url = collected.pop()
task = self.collect_urls.delay(url)
urls = task.get(timeout=30)
这会将我对 collect_urls
的调用转换为 Celery 任务,如果出现问题,它将允许我重试,但我仍然无法使用多个 worker,因为我需要等待 delay()
的结果。
如何重构我的代码,使我可以为 collect_urls
使用多个 worker?
简短的回答,如果你想为了速度而分发它,你必须把已经访问过的网站的集合变成一个跨进程安全的结构。例如,您可以将其作为一个集合存储在 Redis 或数据库中 table。完成后,您可以更新代码以执行以下操作:
# kick off initial set of tasks:
result_id = uuid.uuid4()
for x in collected:
task = self.collect_urls.delay(x, result_id)
return result_id
您可以使用 result_id 定期检查已访问的 url 集合。一旦该集合具有相同长度的 n
次调用,您就认为它已经完成。
在 collect_urls 函数中,您基本上是这样做的:
def collect_urls(self, url, result_id):
# for example, you can use redis smember to check if the
# set at result_id contains url
if url has been visited:
return
# you can do this in redis using sadd
add url to the set of visited
# collect urls as before
...
# but instead of returning the urls, you kick off new tasks
for x in urls:
collect_urls.delay(x, result_id)
如果您使用 redis,所有收集/访问的 urls 将包含在由 result_id 标识的 redis 键中。您不必使用 redis,您可以轻松地使用数据库中的行来执行此操作,其中 result_id 作为一列,而 url 在另一列中。
我正在为我的 SPA 应用程序编写爬虫程序。由于它是一个 SPA,我不能使用 wget/curl 或任何其他基于非浏览器的解决方案进行抓取,因为我需要一个浏览器才能 运行 我的 SPA 中的 javascript 。
我使用 python 和 selenium 对此进行了编码。它将从主页开始,扫描所有 href
元素,将它们保存在 set
中,丢弃我已经访问过的元素(visited
与 opened with selenium and collected all the href elements
中一样),并从集合中取出下一个 URL 并访问它。然后它会一遍又一遍地重复这个过程,直到它访问了所有链接。
代码如下所示:
def main():
...
# Here we will be saving all the links that we can find in the DOM of
# each visited URL
collected = set()
collected.add(crawler.start_url)
# Here we will be saving all the URLs that we have already visited
visited = set()
base_netloc = urlparse(crawler.start_url).netloc
while len(collected):
url = collected.pop()
urls = self.collect_urls(url)
urls = [x for x in urls if x not in visited and urlparse(x).netloc == base_netloc]
collected = collected.union(urls)
visited.add(url)
crawler.links = list(visited)
crawler.save()
def collect_urls(self, url):
browser = Browser()
browser.fetch(url)
urls = set()
elements = browser.get_xpath_elements("//a[@href]")
for element in elements:
link = browser.get_element_attribute(element, "href")
if link != url:
urls.add(link)
browser.stop()
return urls
我想让每次调用 collect_urls
都成为一个 Celery 任务,这样它可以在失败时重试,并使整个过程更快(使用多个 worker)。问题是 collect_urls
是从 while
内部调用的,这取决于 collected
集,该集由 collect_urls
的结果填充。
我知道我可以使用 delay()
调用 Celery 任务并使用 get()
等待结果,所以我的代码将如下所示:
while len(collected):
url = collected.pop()
task = self.collect_urls.delay(url)
urls = task.get(timeout=30)
这会将我对 collect_urls
的调用转换为 Celery 任务,如果出现问题,它将允许我重试,但我仍然无法使用多个 worker,因为我需要等待 delay()
的结果。
如何重构我的代码,使我可以为 collect_urls
使用多个 worker?
简短的回答,如果你想为了速度而分发它,你必须把已经访问过的网站的集合变成一个跨进程安全的结构。例如,您可以将其作为一个集合存储在 Redis 或数据库中 table。完成后,您可以更新代码以执行以下操作:
# kick off initial set of tasks:
result_id = uuid.uuid4()
for x in collected:
task = self.collect_urls.delay(x, result_id)
return result_id
您可以使用 result_id 定期检查已访问的 url 集合。一旦该集合具有相同长度的 n
次调用,您就认为它已经完成。
在 collect_urls 函数中,您基本上是这样做的:
def collect_urls(self, url, result_id):
# for example, you can use redis smember to check if the
# set at result_id contains url
if url has been visited:
return
# you can do this in redis using sadd
add url to the set of visited
# collect urls as before
...
# but instead of returning the urls, you kick off new tasks
for x in urls:
collect_urls.delay(x, result_id)
如果您使用 redis,所有收集/访问的 urls 将包含在由 result_id 标识的 redis 键中。您不必使用 redis,您可以轻松地使用数据库中的行来执行此操作,其中 result_id 作为一列,而 url 在另一列中。