Python AsyncIO 阻塞

Python AsyncIO blocking

我有一个基本脚本,可以使用 SQL Alchemy 将英国邮政编码导入数据库。为了提高效率,我尝试按照文档和一些 "guide" 博客文章使用 AsyncIO 来做到这一点。

下面的工作正常(没有抛出异常,正确地导入到数据库中)但是它看起来是同步的 - 当我期望所有三个文件和行时,文件和相应的行都是有序的。我不明白为什么。如何修复,以便导入给定 CSV 中的每一行都不会阻止下一行的导入?

import csv
import os
import asyncio
from db.db import Session, engine, Base
from Models.PostCode import PostCode
from sqlalchemy.exc import IntegrityError

Base.metadata.create_all(engine)

session = Session()
csv_path = os.path.dirname(os.path.realpath(__file__)) + '/postcode_lists/'

def runImport(fname):
    with open(csv_path + fname + '_postcodes.csv', newline='') as csvfile:
        reader = csv.DictReader(csvfile)

        tasks = [asyncio.ensure_future(saveRow(row)) for row in reader]

        loop = asyncio.get_event_loop()

        responses = loop.run_until_complete(asyncio.gather(*tasks, return_exceptions=True))
        loop.close()

        return responses


async def saveRow(row):
    if ('In Use?' not in row) or (row['In Use?']=='Yes'):
        await persist(row)

def persist(row):
    EXISTS = len(session.query(PostCode).filter(PostCode.postcode == row['Postcode']).all())
    if EXISTS == 0:
        pc = PostCode(
            row['Postcode'],
            )

        session.add(pc)
        session.commit()
        print(pc)
        return pc


datasets = ['CM', 'CO']
for d in datasets:
    runImport(d)

print(Done)

输出示例

<PostCode(postcode='CA7 5HU')>
<PostCode(postcode='CA7 5HW')>
<PostCode(postcode='CA7 5HX')>
<PostCode(postcode='CA7 5HY')>
<PostCode(postcode='CA7 5HZ')>
<PostCode(postcode='CA7 5JB')>

我希望输出有点混乱,而不是按照 CSV 的 alpha 顺序。

基本上你的问题是你的代码中没有真正的异步操作。

Async 实际上是作为一个基于回调的事件循环工作的。您的异步操作将导致切换,这意味着当前任务挂起,事件循环将切换到另一个任务。

但是由于你所有的任务都是完全阻塞的,所以你的none的任务会导致挂断和切换。这意味着您的代码片段与有序任务队列完全相同。