Python AsyncIO 阻塞
Python AsyncIO blocking
我有一个基本脚本,可以使用 SQL Alchemy 将英国邮政编码导入数据库。为了提高效率,我尝试按照文档和一些 "guide" 博客文章使用 AsyncIO 来做到这一点。
下面的工作正常(没有抛出异常,正确地导入到数据库中)但是它看起来是同步的 - 当我期望所有三个文件和行时,文件和相应的行都是有序的。我不明白为什么。如何修复,以便导入给定 CSV 中的每一行都不会阻止下一行的导入?
import csv
import os
import asyncio
from db.db import Session, engine, Base
from Models.PostCode import PostCode
from sqlalchemy.exc import IntegrityError
Base.metadata.create_all(engine)
session = Session()
csv_path = os.path.dirname(os.path.realpath(__file__)) + '/postcode_lists/'
def runImport(fname):
with open(csv_path + fname + '_postcodes.csv', newline='') as csvfile:
reader = csv.DictReader(csvfile)
tasks = [asyncio.ensure_future(saveRow(row)) for row in reader]
loop = asyncio.get_event_loop()
responses = loop.run_until_complete(asyncio.gather(*tasks, return_exceptions=True))
loop.close()
return responses
async def saveRow(row):
if ('In Use?' not in row) or (row['In Use?']=='Yes'):
await persist(row)
def persist(row):
EXISTS = len(session.query(PostCode).filter(PostCode.postcode == row['Postcode']).all())
if EXISTS == 0:
pc = PostCode(
row['Postcode'],
)
session.add(pc)
session.commit()
print(pc)
return pc
datasets = ['CM', 'CO']
for d in datasets:
runImport(d)
print(Done)
输出示例
<PostCode(postcode='CA7 5HU')>
<PostCode(postcode='CA7 5HW')>
<PostCode(postcode='CA7 5HX')>
<PostCode(postcode='CA7 5HY')>
<PostCode(postcode='CA7 5HZ')>
<PostCode(postcode='CA7 5JB')>
我希望输出有点混乱,而不是按照 CSV 的 alpha 顺序。
基本上你的问题是你的代码中没有真正的异步操作。
Async 实际上是作为一个基于回调的事件循环工作的。您的异步操作将导致切换,这意味着当前任务挂起,事件循环将切换到另一个任务。
但是由于你所有的任务都是完全阻塞的,所以你的none的任务会导致挂断和切换。这意味着您的代码片段与有序任务队列完全相同。
我有一个基本脚本,可以使用 SQL Alchemy 将英国邮政编码导入数据库。为了提高效率,我尝试按照文档和一些 "guide" 博客文章使用 AsyncIO 来做到这一点。
下面的工作正常(没有抛出异常,正确地导入到数据库中)但是它看起来是同步的 - 当我期望所有三个文件和行时,文件和相应的行都是有序的。我不明白为什么。如何修复,以便导入给定 CSV 中的每一行都不会阻止下一行的导入?
import csv
import os
import asyncio
from db.db import Session, engine, Base
from Models.PostCode import PostCode
from sqlalchemy.exc import IntegrityError
Base.metadata.create_all(engine)
session = Session()
csv_path = os.path.dirname(os.path.realpath(__file__)) + '/postcode_lists/'
def runImport(fname):
with open(csv_path + fname + '_postcodes.csv', newline='') as csvfile:
reader = csv.DictReader(csvfile)
tasks = [asyncio.ensure_future(saveRow(row)) for row in reader]
loop = asyncio.get_event_loop()
responses = loop.run_until_complete(asyncio.gather(*tasks, return_exceptions=True))
loop.close()
return responses
async def saveRow(row):
if ('In Use?' not in row) or (row['In Use?']=='Yes'):
await persist(row)
def persist(row):
EXISTS = len(session.query(PostCode).filter(PostCode.postcode == row['Postcode']).all())
if EXISTS == 0:
pc = PostCode(
row['Postcode'],
)
session.add(pc)
session.commit()
print(pc)
return pc
datasets = ['CM', 'CO']
for d in datasets:
runImport(d)
print(Done)
输出示例
<PostCode(postcode='CA7 5HU')>
<PostCode(postcode='CA7 5HW')>
<PostCode(postcode='CA7 5HX')>
<PostCode(postcode='CA7 5HY')>
<PostCode(postcode='CA7 5HZ')>
<PostCode(postcode='CA7 5JB')>
我希望输出有点混乱,而不是按照 CSV 的 alpha 顺序。
基本上你的问题是你的代码中没有真正的异步操作。
Async 实际上是作为一个基于回调的事件循环工作的。您的异步操作将导致切换,这意味着当前任务挂起,事件循环将切换到另一个任务。
但是由于你所有的任务都是完全阻塞的,所以你的none的任务会导致挂断和切换。这意味着您的代码片段与有序任务队列完全相同。