Python 多处理 For 循环
Python Multiprocessing a For Loop
我的脚本不工作。所有 10 个进程都获取第一个列表项,然后停止。输出 10x 1 列表条目
如何解决这个问题?错误一定在循环中,还是我需要一个队列?
import finanzen_fundamentals.stocks as ff
import mysql.connector
import pandas as pd
import multiprocessing
import time
results = []
def get_list():
try:
mydb = mysql.connector.connect( host="localhost", user="changed", password="changed", database="stockdata")
mycursor = mydb.cursor()
mycursor.execute("select * from url_name")
record = mycursor.fetchall()
return record
except Exception as e:
return str(e)
def create_json(record):
for row in record:
try:
df = ff.get_current_value_lxml(str(row[2])[:-1], exchange = "FSE")
print('Name:' + row[0] + ' WKN:' + df['wkn'].values[0] + ' Preis:' + str(df['price'].values[0]) + ' Currency:' + df['currency'].values[0] + ' Zeit:' + df['time'].values[0])
result = [[row[0], df['wkn'].values[0], df['price'].values[0], df['currency'].values[0], df['time'].values[0]]]
return result
except Exception as e:
print(str(e))
def collect_results(result):
results.extend(result)
if __name__ == '__main__':
record = get_list()
start_time = time.time()
pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
for i in range(10):
pool.apply_async(create_json, args=(record, ), callback=collect_results)
pool.close()
pool.join()
df_out = pd.DataFrame(results, columns=['Name', 'WKN', 'Preis', 'Currency', 'Zeit'])
print(df_out)
输出:
Name WKN Preis Currency Zeit
0 21VIANET GRP ADR A/6 O. A1H9DT 20.0 EUR 23.10.2020
1 21VIANET GRP ADR A/6 O. A1H9DT 20.0 EUR 23.10.2020
2 21VIANET GRP ADR A/6 O. A1H9DT 20.0 EUR 23.10.2020
3 21VIANET GRP ADR A/6 O. A1H9DT 20.0 EUR 23.10.2020
4 21VIANET GRP ADR A/6 O. A1H9DT 20.0 EUR 23.10.2020
5 21VIANET GRP ADR A/6 O. A1H9DT 20.0 EUR 23.10.2020
6 21VIANET GRP ADR A/6 O. A1H9DT 20.0 EUR 23.10.2020
7 21VIANET GRP ADR A/6 O. A1H9DT 20.0 EUR 23.10.2020
8 21VIANET GRP ADR A/6 O. A1H9DT 20.0 EUR 23.10.2020
9 21VIANET GRP ADR A/6 O. A1H9DT 20.0 EUR 23.10.2020
你搞错了循环结构。在 create_json
中,您正在循环 record
的 row
,但您始终使用相同的原始 record
列表和 return
调用它第一次迭代。所以所有的工人总是在第一线工作。您需要更改 worker 函数以在 row
:
上运行
def create_json(row):
try:
df = ff.get_current_value_lxml(str(row[2])[:-1], exchange = "FSE")
print('Name:' + row[0] + ' WKN:' + df['wkn'].values[0] + ' Preis:' + str(df['price'].values[0]) + ' Currency:' + df['currency'].values[0] + ' Zeit:' + df['time'].values[0])
result = [[row[0], df['wkn'].values[0], df['price'].values[0], df['currency'].values[0], df['time'].values[0]]]
return result
except Exception as e:
print(str(e))
然后在主代码中用每一行调用它:
if __name__ == '__main__':
...
for row in record:
pool.apply_async(create_json, args=(row, ), callback=collect_results)
...
请注意,在这种情况下,不是循环调用 apply_async
, you can just use map
。它甚至已经 returns 结果列表,所以 你甚至不需要 callback
了 ,比如:
def create_json(row):
try:
df = ff.get_current_value_lxml(str(row[2])[:-1], exchange = "FSE")
print('Name:' + row[0] + ' WKN:' + df['wkn'].values[0] + ' Preis:' + str(df['price'].values[0]) + ' Currency:' + df['currency'].values[0] + ' Zeit:' + df['time'].values[0])
result = [row[0], df['wkn'].values[0], df['price'].values[0], df['currency'].values[0], df['time'].values[0]]
# NOTE THAT NOW IT'S A 1-D LIST!
return result
except Exception as e:
print(str(e))
if __name__ == '__main__':
record = get_list()
start_time = time.time()
with multiprocessing.Pool(processes=multiprocessing.cpu_count()) as pool:
results = pool.map(create_json, record)
df_out = pd.DataFrame(results, columns=['Name', 'WKN', 'Preis', 'Currency', 'Zeit'])
print(df_out)
我的脚本不工作。所有 10 个进程都获取第一个列表项,然后停止。输出 10x 1 列表条目 如何解决这个问题?错误一定在循环中,还是我需要一个队列?
import finanzen_fundamentals.stocks as ff
import mysql.connector
import pandas as pd
import multiprocessing
import time
results = []
def get_list():
try:
mydb = mysql.connector.connect( host="localhost", user="changed", password="changed", database="stockdata")
mycursor = mydb.cursor()
mycursor.execute("select * from url_name")
record = mycursor.fetchall()
return record
except Exception as e:
return str(e)
def create_json(record):
for row in record:
try:
df = ff.get_current_value_lxml(str(row[2])[:-1], exchange = "FSE")
print('Name:' + row[0] + ' WKN:' + df['wkn'].values[0] + ' Preis:' + str(df['price'].values[0]) + ' Currency:' + df['currency'].values[0] + ' Zeit:' + df['time'].values[0])
result = [[row[0], df['wkn'].values[0], df['price'].values[0], df['currency'].values[0], df['time'].values[0]]]
return result
except Exception as e:
print(str(e))
def collect_results(result):
results.extend(result)
if __name__ == '__main__':
record = get_list()
start_time = time.time()
pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
for i in range(10):
pool.apply_async(create_json, args=(record, ), callback=collect_results)
pool.close()
pool.join()
df_out = pd.DataFrame(results, columns=['Name', 'WKN', 'Preis', 'Currency', 'Zeit'])
print(df_out)
输出:
Name WKN Preis Currency Zeit
0 21VIANET GRP ADR A/6 O. A1H9DT 20.0 EUR 23.10.2020
1 21VIANET GRP ADR A/6 O. A1H9DT 20.0 EUR 23.10.2020
2 21VIANET GRP ADR A/6 O. A1H9DT 20.0 EUR 23.10.2020
3 21VIANET GRP ADR A/6 O. A1H9DT 20.0 EUR 23.10.2020
4 21VIANET GRP ADR A/6 O. A1H9DT 20.0 EUR 23.10.2020
5 21VIANET GRP ADR A/6 O. A1H9DT 20.0 EUR 23.10.2020
6 21VIANET GRP ADR A/6 O. A1H9DT 20.0 EUR 23.10.2020
7 21VIANET GRP ADR A/6 O. A1H9DT 20.0 EUR 23.10.2020
8 21VIANET GRP ADR A/6 O. A1H9DT 20.0 EUR 23.10.2020
9 21VIANET GRP ADR A/6 O. A1H9DT 20.0 EUR 23.10.2020
你搞错了循环结构。在 create_json
中,您正在循环 record
的 row
,但您始终使用相同的原始 record
列表和 return
调用它第一次迭代。所以所有的工人总是在第一线工作。您需要更改 worker 函数以在 row
:
def create_json(row):
try:
df = ff.get_current_value_lxml(str(row[2])[:-1], exchange = "FSE")
print('Name:' + row[0] + ' WKN:' + df['wkn'].values[0] + ' Preis:' + str(df['price'].values[0]) + ' Currency:' + df['currency'].values[0] + ' Zeit:' + df['time'].values[0])
result = [[row[0], df['wkn'].values[0], df['price'].values[0], df['currency'].values[0], df['time'].values[0]]]
return result
except Exception as e:
print(str(e))
然后在主代码中用每一行调用它:
if __name__ == '__main__':
...
for row in record:
pool.apply_async(create_json, args=(row, ), callback=collect_results)
...
请注意,在这种情况下,不是循环调用 apply_async
, you can just use map
。它甚至已经 returns 结果列表,所以 你甚至不需要 callback
了 ,比如:
def create_json(row):
try:
df = ff.get_current_value_lxml(str(row[2])[:-1], exchange = "FSE")
print('Name:' + row[0] + ' WKN:' + df['wkn'].values[0] + ' Preis:' + str(df['price'].values[0]) + ' Currency:' + df['currency'].values[0] + ' Zeit:' + df['time'].values[0])
result = [row[0], df['wkn'].values[0], df['price'].values[0], df['currency'].values[0], df['time'].values[0]]
# NOTE THAT NOW IT'S A 1-D LIST!
return result
except Exception as e:
print(str(e))
if __name__ == '__main__':
record = get_list()
start_time = time.time()
with multiprocessing.Pool(processes=multiprocessing.cpu_count()) as pool:
results = pool.map(create_json, record)
df_out = pd.DataFrame(results, columns=['Name', 'WKN', 'Preis', 'Currency', 'Zeit'])
print(df_out)