Scrapy中使用多爬虫时如何通过管道将数据保存到MongoDB?

How to save data to MongoDB by pipeline when using multi spiders in Scrapy?

我使用2个蜘蛛从网页获取数据,我同时使用Crawler Process()和运行它们。 蜘蛛代码:

class GDSpider(Spider):
name = "GenDis"
allowed_domains = ["gold.jgi.doe.gov"]
base_url ="https://gold.jgi.doe.gov/projects"
stmp = []
term = "man"
for i in range(1, 1000):
    url = "https://gold.jgi.doe.gov/projects?page="+ str(i) +"&Project.Project+Name="+ term+ "&count=25"
    stmp.append(url)

start_urls = stmp

def parse(self, response):
    sel = Selector(response)
    sites = sel.xpath('//tr[@class="odd"]|//tr[@class="even"]')

    for site in sites:
        item = GenDis()
        item['Id'] = site.xpath('td/a/text()').extract()
        item['Link'] = site.xpath('td/a/@href').extract()
        item['Name'] = map(unicode.strip, site.xpath('td[2]/text()').extract())
        item['Status'] = map(unicode.strip, site.xpath('td[3]/text()').extract())
        item['Add_Date'] = map(unicode.strip, site.xpath('td[4]/text()').extract())
        yield item



class EPGD_spider(Spider):
    name = "EPGD"
    allowed_domains = ["epgd.biosino.org"]
    term = "man"
    start_urls = ["http://epgd.biosino.org/EPGD/search/textsearch.jsp?textquery="+term+"&submit=Feeling+Lucky"]
    MONGODB_DB = name + "_" + term
    MONGODB_COLLECTION = name + "_" + term

def parse(self, response):
    sel = Selector(response)
    sites = sel.xpath('//tr[@class="odd"]|//tr[@class="even"]')
    url_list = []
    base_url = "http://epgd.biosino.org/EPGD"

    for site in sites:
        item = EPGD()
        item['genID'] = map(unicode.strip, site.xpath('td[1]/a/text()').extract())
        item['genID_url'] = base_url+map(unicode.strip, site.xpath('td[1]/a/@href').extract())[0][2:]
        item['taxID'] = map(unicode.strip, site.xpath('td[2]/a/text()').extract())
        item['taxID_url'] = map(unicode.strip, site.xpath('td[2]/a/@href').extract())
        item['familyID'] = map(unicode.strip, site.xpath('td[3]/a/text()').extract())
        item['familyID_url'] = base_url+map(unicode.strip, site.xpath('td[3]/a/@href').extract())[0][2:]
        item['chromosome'] = map(unicode.strip, site.xpath('td[4]/text()').extract())
        item['symbol'] = map(unicode.strip, site.xpath('td[5]/text()').extract())
        item['description'] = map(unicode.strip, site.xpath('td[6]/text()').extract())
        yield item

    sel_tmp = Selector(response)
    link = sel_tmp.xpath('//span[@id="quickPage"]')

    for site in link:
        url_list.append(site.xpath('a/@href').extract())

    for i in range(len(url_list[0])):
        if cmp(url_list[0][i], "#") == 0:
            if i+1 < len(url_list[0]):
                print url_list[0][i+1]
                actual_url = "http://epgd.biosino.org/EPGD/search/"+ url_list[0][i+1]
                yield Request(actual_url, callback=self.parse)
                break
            else:
                print "The index is out of range!"

process = CrawlerProcess()
process.crawl(EPGD_spider)
process.crawl(GDSpider)
process.start() # the script will block here until all crawling jobs are finished

我想将数据保存到MongoDB数据库。这是我的管道代码:

class EPGD_pipeline(object):
    def __init__(self):
        connection = pymongo.MongoClient(
            settings['MONGODB_SERVER'],
            settings['MONGODB_PORT']
        )
        db = connection[settings['MONGODB_DB']]
        self.collection = db[settings['MONGODB_COLLECTION']]

    def process_item(self, item, spider):
        valid = True
        for data in item:
            if not data:
                valid = False
                raise DropItem("Missing {0}!".format(data))
        if valid:
            self.collection.insert(dict(item))
            log.msg("Item wrote to MongoDB database {}, collection {}, at host {}, port {}".format(
            settings['MONGODB_DB'],
            settings['MONGODB_COLLECTION'],
            settings['MONGODB_SERVER'],
            settings['MONGODB_PORT']))
        return item

当我一次使用一个蜘蛛时,它工作正常。但是当我同时 运行 它们时,管道似乎不再工作了。数据库和集合都没有设置。 我看过很多次Scrapy文档的CrawlerProcess()部分,但是它没有提到管道的东西。那么有人能告诉我我的代码有什么问题吗?

这应该可以解决问题:

from scrapy.utils.project import get_project_settings
process = CrawlerProcess(get_project_settings())
process.crawl(EPGD_spider)
process.crawl(GDSpider)
process.start()

您可能还需要重构您的爬虫代码,以便为每个爬虫打开一个连接(此示例使用下面的 "Bonus Tip 2"):

# In your pipeline

class EPGD_pipeline(object):
    def __init__(self):
        self.collections = {
            spider_name: self.setup_db_connection(dj_mongo_database_url.parse(url))
            for spider_name, url in settings['MONGODB_PIPELINE_SETTINGS'].iterItems()
        )
    }

    def process_item(self, item, spider):
        collection = self.collections[spider.name]
        ...


# In settings.py

MONGODB_PIPELINE_SETTINGS = {
    "GenDis": "mongodb://myhost:29297/test_db/collection",
    "EPGD": "mongodb://myhost:29297/test_db/collection2",
}

额外提示 1:使用 txmongo instead of pymongo, otherwise you'll be getting potentially very bad performance (see also here).

额外提示 2:所有这些设置都变得难以管理。考虑使用 django-mongo-database-url to "pack" them all in a single URL and keep them more manage-able (would be more clean if collection was also in the URL).

之类的东西

额外提示 3:您可能做的太多了 writes/transactions。如果用例允许,将结果保存到 .jl 个文件并使用 mongoimport 在抓取完成时批量导入。下面是更详细的操作方法。

假设一个名为 tutorial 的项目和一个名为 example 的蜘蛛创建了 100 个项目,您在 tutorial/extensions.py 中创建了一个扩展:

import logging
import subprocess

from scrapy import signals
from scrapy.exceptions import NotConfigured

logger = logging.getLogger(__name__)


class MyBulkExtension(object):

    @classmethod
    def from_crawler(cls, crawler):
        return cls(crawler)

    def __init__(self, crawler):
        settings = crawler.settings

        self._feed_uri = settings.get('FEED_URI', None)
        if self._feed_uri is None:
            raise NotConfigured('Missing FEED_URI')
        self._db = settings.get('BULK_MONGO_DB', None)
        if self._db is None:
            raise NotConfigured('Missing BULK_MONGO_DB')
        self._collection = settings.get('BULK_MONGO_COLLECTION', None)
        if self._collection is None:
            raise NotConfigured('Missing BULK_MONGO_COLLECTION')

        crawler.signals.connect(self._closed, signal=signals.spider_closed)

    def _closed(self, spider, reason, signal, sender):
        logger.info("writting file %s to db %s, colleciton %s" %
                    (self._feed_uri, self._db, self._collection))
        command = ("mongoimport --db %s --collection %s --drop --file %s" %
                   (self._db, self._collection, self._feed_uri))

        p = subprocess.Popen(command.split())
        p.communicate()

        logger.info('Import done')

在您的 tutorial/settings.py 上,您激活了扩展程序并设置了两个设置:

EXTENSIONS = {
    'tutorial.extensions.MyBulkExtension': 500
}

BULK_MONGO_DB = "test"
BULK_MONGO_COLLECTION = "foobar"

然后您可以 运行 像这样抓取:

$ scrapy crawl -L INFO example -o foobar.jl
...
[tutorial.extensions] INFO: writting file foobar.jl to db test, colleciton foobar
connected to: 127.0.0.1
dropping: test.foobar
check 9 100
imported 100 objects
[tutorial.extensions] INFO: Import done
...