SQLAlchemy 切换到 python 多处理

SQLAlchemy switch to python multiprocessing

我目前正在研究网络爬虫。它工作正常,但我想最大化我试图切换到多处理的资源。但是第二次我尝试 运行 进入一堵追溯墙,我无法找到我做错了什么,因为我仍然是 SQLAlchemy 和 Python 多处理的新手。

这是父循环的样子:

...
def crawler(url=False):
    ...
    while url:
        crawl(url.id)
        url = get_new_url()

我正在尝试将它变成一个并行处理函数,这样我就不必等待前面的 crawl/scrape 完成:

from multiprocessing import Process
...
def crawler(url=False):
    while url:
        p = Process(target=crawl, args=(url.id,))
        p.start()
        url = get_new_url()

这是我建立数据库连接的方式:

engine = create_engine('mysql://user:password@domain:3306/mdb01?charset=utf8mb4', pool_recycle=3600)
Session = sessionmaker(bind=engine, autoflush=True)

Base = declarative_base()

这是执行爬行数据库交互和导入数据库工厂的模块(我删除了大部分,因为我觉得问题在于我如何与 sqlalchemy 交互,而不是其余代码):

from news_models.base import Base, Session, engine
database = Session()

def crawl(urlid):
    url = database.query(Url).filter_by(id=urlid).first()

    print(f"Starting to work on {url.id}: {url.url}")

    ... scrape page ....
    scrape = scrape_url(url)

    ... running beautifull soup ...

    # Retrieve all of the anchor tags
    tags = soup('a')

    for tag in tags:
        ... validation ...
        make_url(url)

 def make_url(url):
    ...
    #domain = ex. abc.com
    domain = database.query(Domain).filter_by(domain=domain).first()
    database.add(Url(url, domain, vetted))
    database.commit()

 def scrape_url(url):
    scrape = Scrape(page = html, url = url)
    database.add(scrape)
    database.commit()
    return scrape

对话框如下:

Starting to work on 179226: https://bbc.co.uk/sport/football/53891604
Starting to work on 110232: https://theweathernetwork.com/ca/weather/saskatchewan/carragana
Starting to work on 152054: https://ca.images.search.yahoo.com/search/images?p=barack+obama&fr=fp-tts&th=110.1&tw=162.6&imgurl=https%3a%2f%2fimage.cnbcfm.com%2fapi%2fv1%2fimage%2f105055178-gettyimages-680143744rr.jpg%3fv%3d1576513702%26w%3d1400%26h%3d950&rurl=https%3a%2f%2fwww.cnbc.com%2f2019%2f12%2f16%2fbarack-obama-how-women-are-better-leaders-than-men.html&size=123kb&name=barack+obama%3a+how+women+are+better+leaders+than+men&oid=1&h=950&w=1400&turl=https%3a%2f%2ftse1.mm.bing.net%2fth%3fid%3doip.btjoweh9kdcuxxcdksvoiwhafb%26amp%3bpid%3dapi%26rs%3d1%26c%3d1%26qlt%3d95%26w%3d162%26h%3d110&tt=barack+obama%3a+how+women+are+better+leaders+than+men&sigr=4nejz_6_wyyo&sigit=.iypm9cqprc9&sigi=9sv3ee5szhdl&sign=eqzxpc3ps9fm&sigt=eqzxpc3ps9fm
Exception during reset or similar
Traceback (most recent call last):
  File "/home/fabrice/workbench/news/news_crawler/crawl_tools.py", line 321, in scrape_url
    database.add(scrape)
  File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 2008, in add
    self._save_or_update_state(state)
  File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 2021, in _save_or_update_state
    self._save_or_update_impl(state)
  File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 2371, in _save_or_update_impl
    self._save_impl(state)
  File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 2324, in _save_impl
    to_attach = self._before_attach(state, obj)
  File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 2441, in _before_attach
    raise sa_exc.InvalidRequestError(
sqlalchemy.exc.InvalidRequestError: Object '<Scrape at 0x7f4f7e1975b0>' is already attached to session '3' (this is '2')

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/pool/base.py", line 697, in _finalize_fairy
    fairy._reset(pool)
  File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/pool/base.py", line 893, in _reset
    pool._dialect.do_rollback(self)
  File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/dialects/mysql/base.py", line 2475, in do_rollback
    dbapi_connection.rollback()
MySQLdb._exceptions.ProgrammingError: (2014, "Commands out of sync; you can't run this command now")
Traceback (most recent call last):
  File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1276, in _execute_context
Process Process-3:
Process Process-1:
    self.dialect.do_execute(
  File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 593, in do_execute
    cursor.execute(statement, parameters)
  File "/home/fabrice/.local/lib/python3.8/site-packages/MySQLdb/cursors.py", line 206, in execute
    res = self._query(query)
  File "/home/fabrice/.local/lib/python3.8/site-packages/MySQLdb/cursors.py", line 319, in _query
    db.query(q)
  File "/home/fabrice/.local/lib/python3.8/site-packages/MySQLdb/connections.py", line 259, in query
    _mysql.connection.query(self, query)
MySQLdb._exceptions.OperationalError: (2013, 'Lost connection to MySQL server during query')

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "./crawler.py", line 138, in <module>
    main()
  File "./crawler.py", line 49, in main
    crawler(url=url)
  File "./crawler.py", line 135, in crawler
    url = get_new_url()
  File "/home/fabrice/workbench/news/news_crawler/crawl_tools.py", line 482, in get_new_url
    url = database.query(Url).filter_by(scrape=None, error=False).order_by(sqlalchemy.func.rand()).first()
  File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/orm/query.py", line 3402, in first
    ret = list(self[0:1])
  File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/orm/query.py", line 3176, in __getitem__
Traceback (most recent call last):
  File "/home/fabrice/workbench/news/news_crawler/crawl_tools.py", line 321, in scrape_url
    database.add(scrape)
  File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 2008, in add
    self._save_or_update_state(state)
  File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 2021, in _save_or_update_state
    self._save_or_update_impl(state)
  File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 2371, in _save_or_update_impl
    self._save_impl(state)
  File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 2324, in _save_impl
    to_attach = self._before_attach(state, obj)
  File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 2441, in _before_attach
    raise sa_exc.InvalidRequestError(
sqlalchemy.exc.InvalidRequestError: Object '<Scrape at 0x7f4f7e1e3790>' is already attached to session '3' (this is '2')

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 749, in _rollback_impl
    self.engine.dialect.do_rollback(self.connection)
  File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/dialects/mysql/base.py", line 2475, in do_rollback
    dbapi_connection.rollback()
MySQLdb._exceptions.OperationalError: (2013, 'Lost connection to MySQL server during query')
    return list(res)

The above exception was the direct cause of the following exception:

  File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/orm/query.py", line 3508, in __iter__
Traceback (most recent call last):
  File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/home/fabrice/workbench/news/news_crawler/crawl_tools.py", line 62, in crawl
    soup = scrape_and_soup(url)
  File "/home/fabrice/workbench/news/news_crawler/crawl_tools.py", line 331, in scrape_and_soup
    scrape = scrape_url(url)
  File "/home/fabrice/workbench/news/news_crawler/crawl_tools.py", line 325, in scrape_url
    database.rollback()
  File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 1006, in rollback
    self.transaction.rollback()
  File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 574, in rollback
    util.raise_(rollback_err[1], with_traceback=rollback_err[2])
  File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
    raise exception
  File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 534, in rollback
    t[1].rollback()
  File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1753, in rollback
    self._do_rollback()
  File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1791, in _do_rollback
    self.connection._rollback_impl()
  File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 751, in _rollback_impl
    self._handle_dbapi_exception(e, None, None, None, None)
  File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1510, in _handle_dbapi_exception
    util.raise_(
  File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
    raise exception
  File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 749, in _rollback_impl
    self.engine.dialect.do_rollback(self.connection)
  File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/dialects/mysql/base.py", line 2475, in do_rollback
    dbapi_connection.rollback()
sqlalchemy.exc.OperationalError: (MySQLdb._exceptions.OperationalError) (2013, 'Lost connection to MySQL server during query')
(Background on this error at: http://sqlalche.me/e/13/e3q8)
Traceback (most recent call last):
  File "/home/fabrice/workbench/news/news_crawler/crawl_tools.py", line 321, in scrape_url
    database.add(scrape)
  File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 2008, in add
    self._save_or_update_state(state)
  File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 2021, in _save_or_update_state
    self._save_or_update_impl(state)
  File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 2371, in _save_or_update_impl
    self._save_impl(state)
  File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 2324, in _save_impl
    to_attach = self._before_attach(state, obj)
  File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 2441, in _before_attach
    raise sa_exc.InvalidRequestError(
sqlalchemy.exc.InvalidRequestError: Object '<Scrape at 0x7f4f7e1e3a60>' is already attached to session '3' (this is '2')

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 749, in _rollback_impl
    self.engine.dialect.do_rollback(self.connection)
  File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/dialects/mysql/base.py", line 2475, in do_rollback
    dbapi_connection.rollback()
MySQLdb._exceptions.OperationalError: (2013, 'Lost connection to MySQL server during query')

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/home/fabrice/workbench/news/news_crawler/crawl_tools.py", line 62, in crawl
    soup = scrape_and_soup(url)
  File "/home/fabrice/workbench/news/news_crawler/crawl_tools.py", line 331, in scrape_and_soup
    scrape = scrape_url(url)
  File "/home/fabrice/workbench/news/news_crawler/crawl_tools.py", line 325, in scrape_url
    database.rollback()
  File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 1006, in rollback
    self.transaction.rollback()
  File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 574, in rollback
    util.raise_(rollback_err[1], with_traceback=rollback_err[2])
  File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
    raise exception
  File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 534, in rollback
    t[1].rollback()
  File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1753, in rollback
    self._do_rollback()
  File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1791, in _do_rollback
    self.connection._rollback_impl()
  File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 751, in _rollback_impl
    self._handle_dbapi_exception(e, None, None, None, None)
  File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1510, in _handle_dbapi_exception
    util.raise_(
  File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
    raise exception
  File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 749, in _rollback_impl
    self.engine.dialect.do_rollback(self.connection)
  File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/dialects/mysql/base.py", line 2475, in do_rollback
    dbapi_connection.rollback()
sqlalchemy.exc.OperationalError: (MySQLdb._exceptions.OperationalError) (2013, 'Lost connection to MySQL server during query')
(Background on this error at: http://sqlalche.me/e/13/e3q8)
    return self._execute_and_instances(context)
  File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/orm/query.py", line 3533, in _execute_and_instances
    result = conn.execute(querycontext.statement, self._params)
  File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1011, in execute
    return meth(self, multiparams, params)
  File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/sql/elements.py", line 298, in _execute_on_connection
    return connection._execute_clauseelement(self, multiparams, params)
  File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1124, in _execute_clauseelement
    ret = self._execute_context(
  File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1316, in _execute_context
    self._handle_dbapi_exception(
  File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1510, in _handle_dbapi_exception
    util.raise_(
  File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
    raise exception
  File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1276, in _execute_context
    self.dialect.do_execute(
  File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 593, in do_execute
    cursor.execute(statement, parameters)
  File "/home/fabrice/.local/lib/python3.8/site-packages/MySQLdb/cursors.py", line 206, in execute
    res = self._query(query)
  File "/home/fabrice/.local/lib/python3.8/site-packages/MySQLdb/cursors.py", line 319, in _query
    db.query(q)
  File "/home/fabrice/.local/lib/python3.8/site-packages/MySQLdb/connections.py", line 259, in query
    _mysql.connection.query(self, query)
sqlalchemy.exc.OperationalError: (MySQLdb._exceptions.OperationalError) (2013, 'Lost connection to MySQL server during query')
[SQL: SELECT urls.id AS urls_id, urls.url AS urls_url, urls.error AS urls_error, urls.vetted AS urls_vetted, urls.useful AS urls_useful, urls.date_discovered AS urls_date_discovered, urls.last_parse AS urls_last_parse, urls.domain_id AS urls_domain_id, urls.publisher_id AS urls_publisher_id 
FROM urls 
WHERE NOT (EXISTS (SELECT 1 
FROM scrapes 
WHERE urls.id = scrapes.url_id)) AND urls.error = false ORDER BY rand() 
 LIMIT %s]
[parameters: (1,)]
(Background on this error at: http://sqlalche.me/e/13/e3q8)

我试过 create_engine 添加 pool_size=20、max_overflow=0 或 autoflush=True/False 但没有成功。

有人可以指出我做错了什么吗?

解决方案是在每个进程中创建一个新的数据库会话,在 crawl 函数的开始处(然后将它传递给 make_urlscrape_url,或者作为单独的参数或通过使它们成为一个对象的所有方法)。您应该使用 with closing(...) 语句来确保会话在 crawl 完成时关闭。

您的代码还有另一个问题:while url 循环还需要等待所有抓取器完成,以防其中一个抓取器发现其他需要抓取的 URL。

作为改进建议,您可以使用 multiprocessing.Pool 而不是直接使用 Process;这将使您可以并行控制抓取器的数量 运行,您最终可能会想要这样做(以避免使 CPU、RAM、网络 and/or 数据库过载)。到那时,您仍然可以为每个 crawl 调用使用一个单独的数据库会话,或者每个池工作人员一个。