为什么 multiprocessing.Pool 中的进程会停止工作/产生?
Why would processes in multiprocessing.Pool stop working / spawning?
出于某种原因,在脚本开始执行时 (macOS),我在多处理池中生成并工作了 8 个进程,但在开始后的几分钟内,只剩下 1 个进程在工作。
我有这段代码(它比那个大得多,但它会解释图片):
def GetStatesDataset(dataset):
df_states = pd.read_csv(dataset)
return df_states
def UploadDataFrameToBigQuery(table_name, prefix_name, project_id, if_exists, df):
table_name = table_name + prefix_name
pd.DataFrame.to_gbq(df,
table_name,
project_id=project_id,
if_exists=if_exists)
def InitGetDataFromGCP(data, prefix):
client = storage.Client()
files = []
blobs = client.list_blobs(data, prefix=prefix)
for blob in blobs:
files.append(f'{data}/{blob.name}')
return files
def GetDataFromGCP(file):
fs = gcsfs.GCSFileSystem() # GCP's Google Cloud Storage (GCS) File System (FS)
with fs.open(file, 'r') as f:
# Reading json into Pandas DataFrame
gcs_data = [json.loads(line) for line in f]
data = [gcs_data] if isinstance(gcs_data, dict) else gcs_data
df = pd.DataFrame(data)
df = pd.merge_asof(df,
df_states,
left_on="start_time",
right_on="state_reached_at",
by="car_id",
direction="backward")
UploadDataFrameToBigQuery(table_name, prefix_name, project_id, if_exists, df)
logging.info(str(multiprocessing.current_process()) + 'Finished: execution time: ' + str(exec_time))
#######################
df_states = GetStatesDataset('gs://link-to-my.csv')
dataset_name = 'one'
prefix_name = 'two'
# config for uploading data to BigQuery
table_name = 'one-two.'
project_id = 'one-two-three'
if_exists = 'append'
def main():
files = InitGetDataFromGCP(dataset_name, prefix_name)
with multiprocessing.Pool(processes=8) as pool:
pool.map(GetDataFromGCP, files)
if __name__ == '__main__':
main()
因为我记录了所有内容,所以我可以在开始时看到所有进程(一切正常):
2020-08-29 15:55:13,957 <SpawnProcess name='SpawnPoolWorker-8' parent=1420 started daemon>Finished: execution time: 22.53874
2020-08-29 15:55:15,947 <SpawnProcess name='SpawnPoolWorker-7' parent=1420 started daemon>Finished: execution time: 23.259828000000002
2020-08-29 15:55:17,219 <SpawnProcess name='SpawnPoolWorker-3' parent=1420 started daemon>Finished: execution time: 8.758934000000004
2020-08-29 15:55:19,094 <SpawnProcess name='SpawnPoolWorker-6' parent=1420 started daemon>Finished: execution time: 7.409976
2020-08-29 15:55:21,755 <SpawnProcess name='SpawnPoolWorker-6' parent=1420 started daemon>Finished: execution time: 0.25443099999999674
但过了一段时间我明白了:
2020-08-29 16:24:28,494 <SpawnProcess name='SpawnPoolWorker-1' parent=1420 started daemon>Finished: execution time: 10.398635000000013
2020-08-29 16:24:36,077 <SpawnProcess name='SpawnPoolWorker-1' parent=1420 started daemon>Finished: execution time: 4.782628999999929
2020-08-29 16:24:40,220 <SpawnProcess name='SpawnPoolWorker-1' parent=1420 started daemon>Finished: execution time: 1.1638890000000401
2020-08-29 16:24:44,032 <SpawnProcess name='SpawnPoolWorker-1' parent=1420 started daemon>Finished: execution time: 1.519871999999964
2020-08-29 16:24:50,449 <SpawnProcess name='SpawnPoolWorker-1' parent=1420 started daemon>Finished: execution time: 3.1979730000000473
我还可以通过查看我的 CPU activity 确认只有 1 个进程在工作。生成了 8 个 Python 个进程,但只有 1 个进程接近 100%。我是多处理的新手,也许我不知道自己在做什么,但我希望所有 8 个工作人员都执行任务,直到我的“文件”结束。
太明显了。我只需要指定 chunksize。由于我有将近 17000 个文件需要处理,一次一个文件,chunksize=1 似乎很有魅力:
with multiprocessing.Pool(processes=8) as pool:
result = pool.map(GetDataFromGCP, files, chunksize=1)
出于某种原因,在脚本开始执行时 (macOS),我在多处理池中生成并工作了 8 个进程,但在开始后的几分钟内,只剩下 1 个进程在工作。
我有这段代码(它比那个大得多,但它会解释图片):
def GetStatesDataset(dataset):
df_states = pd.read_csv(dataset)
return df_states
def UploadDataFrameToBigQuery(table_name, prefix_name, project_id, if_exists, df):
table_name = table_name + prefix_name
pd.DataFrame.to_gbq(df,
table_name,
project_id=project_id,
if_exists=if_exists)
def InitGetDataFromGCP(data, prefix):
client = storage.Client()
files = []
blobs = client.list_blobs(data, prefix=prefix)
for blob in blobs:
files.append(f'{data}/{blob.name}')
return files
def GetDataFromGCP(file):
fs = gcsfs.GCSFileSystem() # GCP's Google Cloud Storage (GCS) File System (FS)
with fs.open(file, 'r') as f:
# Reading json into Pandas DataFrame
gcs_data = [json.loads(line) for line in f]
data = [gcs_data] if isinstance(gcs_data, dict) else gcs_data
df = pd.DataFrame(data)
df = pd.merge_asof(df,
df_states,
left_on="start_time",
right_on="state_reached_at",
by="car_id",
direction="backward")
UploadDataFrameToBigQuery(table_name, prefix_name, project_id, if_exists, df)
logging.info(str(multiprocessing.current_process()) + 'Finished: execution time: ' + str(exec_time))
#######################
df_states = GetStatesDataset('gs://link-to-my.csv')
dataset_name = 'one'
prefix_name = 'two'
# config for uploading data to BigQuery
table_name = 'one-two.'
project_id = 'one-two-three'
if_exists = 'append'
def main():
files = InitGetDataFromGCP(dataset_name, prefix_name)
with multiprocessing.Pool(processes=8) as pool:
pool.map(GetDataFromGCP, files)
if __name__ == '__main__':
main()
因为我记录了所有内容,所以我可以在开始时看到所有进程(一切正常):
2020-08-29 15:55:13,957 <SpawnProcess name='SpawnPoolWorker-8' parent=1420 started daemon>Finished: execution time: 22.53874
2020-08-29 15:55:15,947 <SpawnProcess name='SpawnPoolWorker-7' parent=1420 started daemon>Finished: execution time: 23.259828000000002
2020-08-29 15:55:17,219 <SpawnProcess name='SpawnPoolWorker-3' parent=1420 started daemon>Finished: execution time: 8.758934000000004
2020-08-29 15:55:19,094 <SpawnProcess name='SpawnPoolWorker-6' parent=1420 started daemon>Finished: execution time: 7.409976
2020-08-29 15:55:21,755 <SpawnProcess name='SpawnPoolWorker-6' parent=1420 started daemon>Finished: execution time: 0.25443099999999674
但过了一段时间我明白了:
2020-08-29 16:24:28,494 <SpawnProcess name='SpawnPoolWorker-1' parent=1420 started daemon>Finished: execution time: 10.398635000000013
2020-08-29 16:24:36,077 <SpawnProcess name='SpawnPoolWorker-1' parent=1420 started daemon>Finished: execution time: 4.782628999999929
2020-08-29 16:24:40,220 <SpawnProcess name='SpawnPoolWorker-1' parent=1420 started daemon>Finished: execution time: 1.1638890000000401
2020-08-29 16:24:44,032 <SpawnProcess name='SpawnPoolWorker-1' parent=1420 started daemon>Finished: execution time: 1.519871999999964
2020-08-29 16:24:50,449 <SpawnProcess name='SpawnPoolWorker-1' parent=1420 started daemon>Finished: execution time: 3.1979730000000473
我还可以通过查看我的 CPU activity 确认只有 1 个进程在工作。生成了 8 个 Python 个进程,但只有 1 个进程接近 100%。我是多处理的新手,也许我不知道自己在做什么,但我希望所有 8 个工作人员都执行任务,直到我的“文件”结束。
太明显了。我只需要指定 chunksize。由于我有将近 17000 个文件需要处理,一次一个文件,chunksize=1 似乎很有魅力:
with multiprocessing.Pool(processes=8) as pool:
result = pool.map(GetDataFromGCP, files, chunksize=1)