侦听器仅收到第一条消息
Only the first message is being received by the listener
我在 Python 中编写了一个多处理脚本,主要基于对这个问题的回答:Python multiprocessing safely writing to a file。当工作人员被很好地触发并且所有工作都返回正确的响应时,侦听器只收到第一条消息,几乎就像 while 循环被完全忽略一样。我是 Python 的新手,我绝对可以使用一些专家建议来解决这个问题。
代码:
import multiprocessing as mp
import time
import datetime
import csv
from audio import prepare
fileName = "uuid_file_mapping_" + datetime.datetime.today().strftime('%Y%m%d%H%M%S') + ".csv"
def worker(files, q):
res = prepare(files)
# This line is printed successfully for every response
print("MESSAGE ABOUT TO BE LOGGED: ", res)
q.put(res)
return res
def listener(q):
'''listens for messages on the q, writes to file. '''
with open(fileName, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(["Process ID","Source Files","Archive","Preview","Logo"])
while True:
m = q.get()
# This line is printed only once!!!!
print("MESSAGE RECEIVED BY LISTENER: ", m)
if not m:
print("Message is empty. Skipping to the next message")
elif m["kill"]:
print("Completed processing all jobs")
break
elif m["error"]:
print("Error in job id: " + m["processId"] + ". Error is: " + m["error"])
else:
if m["warn"]:
print("Job id: " + m["processId"] + " has the following warnings: " + m["warn"])
row = list()
row.append(m["key1"])
row.append(m["key2"])
row.append(m["key3"])
row.append(m["key4"])
row.append(m["key5"])
print("Row to be written is: " + row)
writer.writerow(row)
def getFilenamesFromCSV(csvFile):
filenames = list()
if not csvFile:
return None
with open(csvFile, 'r', newline='') as f:
reader = csv.reader(f)
for row in reader:
if row[0]:
filenames.append(list(row[0].split(",")))
print(filenames)
return filenames
def main():
#must use Manager queue here, or will not work
manager = mp.Manager()
q = manager.Queue()
pool = mp.Pool(mp.cpu_count() + 2)
#put listener to work first
watcher = pool.apply_async(listener, (q,))
#fire off workers
filesToProcess = getFilenamesFromCSV("input.csv")
jobs = []
for files in filesToProcess:
job = pool.apply_async(worker, (files, q))
jobs.append(job)
# collect results from the workers through the pool result queue
for job in jobs:
job.get()
#now we are done, kill the listener
killCommand = dict()
killCommand["kill"] = "KILL"
q.put(killCommand)
pool.close()
pool.join()
if __name__ == "__main__":
main()
我最终在没有使用队列的情况下重写了代码,这个版本运行良好。队列实现仅在工作进程快速响应时才有效(并且它与超时无关,我没有收到任何错误并且我也尝试设置显式超时)。
import multiprocessing as mp
import time
import datetime
import csv
from audio import prepare
fileName = "uuid_file_mapping_" + datetime.datetime.today().strftime('%Y%m%d%H%M%S') + ".csv"
# This is where the actual processing is done
def worker(files):
return prepare(files)
# Read filenames from input CSV for processing
def getFilenamesFromCSV(csvFile):
filenames = list()
if not csvFile:
return None
with open(csvFile, 'r', newline='') as f:
reader = csv.reader(f)
for row in reader:
if row[0]:
filenames.append(list(row[0].split(",")))
print(filenames)
return filenames
# Decides what, if any, needs to be written to the output file
def whatShouldIWriteToOutputCSV(audioPrepareResult):
if not audioPrepareResult:
return None
elif audioPrepareResult.get("error", None):
print("Error in job id: " + audioPrepareResult["processId"] + ". Error is: " + audioPrepareResult["error"])
return None
else:
row = list()
row.append(audioPrepareResult["key1"])
row.append(audioPrepareResult["key2"])
row.append(audioPrepareResult["key3"])
row.append(audioPrepareResult["key4"])
row.append(audioPrepareResult["key5"])
return row
def main():
pool = mp.Pool(mp.cpu_count())
#fire off workers
filesToProcess = getFilenamesFromCSV("input.csv")
jobs = []
for files in filesToProcess:
jobs.append(pool.apply_async(worker, ([files])))
with open(fileName, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(["Process ID","Source Files","Archive","Preview","Logo"])
for job in jobs:
result = whatShouldIWriteToOutputCSV(job.get())
if(result):
writer.writerow(result)
pool.close()
pool.join()
if __name__ == "__main__":
main()
我在 Python 中编写了一个多处理脚本,主要基于对这个问题的回答:Python multiprocessing safely writing to a file。当工作人员被很好地触发并且所有工作都返回正确的响应时,侦听器只收到第一条消息,几乎就像 while 循环被完全忽略一样。我是 Python 的新手,我绝对可以使用一些专家建议来解决这个问题。
代码:
import multiprocessing as mp
import time
import datetime
import csv
from audio import prepare
fileName = "uuid_file_mapping_" + datetime.datetime.today().strftime('%Y%m%d%H%M%S') + ".csv"
def worker(files, q):
res = prepare(files)
# This line is printed successfully for every response
print("MESSAGE ABOUT TO BE LOGGED: ", res)
q.put(res)
return res
def listener(q):
'''listens for messages on the q, writes to file. '''
with open(fileName, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(["Process ID","Source Files","Archive","Preview","Logo"])
while True:
m = q.get()
# This line is printed only once!!!!
print("MESSAGE RECEIVED BY LISTENER: ", m)
if not m:
print("Message is empty. Skipping to the next message")
elif m["kill"]:
print("Completed processing all jobs")
break
elif m["error"]:
print("Error in job id: " + m["processId"] + ". Error is: " + m["error"])
else:
if m["warn"]:
print("Job id: " + m["processId"] + " has the following warnings: " + m["warn"])
row = list()
row.append(m["key1"])
row.append(m["key2"])
row.append(m["key3"])
row.append(m["key4"])
row.append(m["key5"])
print("Row to be written is: " + row)
writer.writerow(row)
def getFilenamesFromCSV(csvFile):
filenames = list()
if not csvFile:
return None
with open(csvFile, 'r', newline='') as f:
reader = csv.reader(f)
for row in reader:
if row[0]:
filenames.append(list(row[0].split(",")))
print(filenames)
return filenames
def main():
#must use Manager queue here, or will not work
manager = mp.Manager()
q = manager.Queue()
pool = mp.Pool(mp.cpu_count() + 2)
#put listener to work first
watcher = pool.apply_async(listener, (q,))
#fire off workers
filesToProcess = getFilenamesFromCSV("input.csv")
jobs = []
for files in filesToProcess:
job = pool.apply_async(worker, (files, q))
jobs.append(job)
# collect results from the workers through the pool result queue
for job in jobs:
job.get()
#now we are done, kill the listener
killCommand = dict()
killCommand["kill"] = "KILL"
q.put(killCommand)
pool.close()
pool.join()
if __name__ == "__main__":
main()
我最终在没有使用队列的情况下重写了代码,这个版本运行良好。队列实现仅在工作进程快速响应时才有效(并且它与超时无关,我没有收到任何错误并且我也尝试设置显式超时)。
import multiprocessing as mp
import time
import datetime
import csv
from audio import prepare
fileName = "uuid_file_mapping_" + datetime.datetime.today().strftime('%Y%m%d%H%M%S') + ".csv"
# This is where the actual processing is done
def worker(files):
return prepare(files)
# Read filenames from input CSV for processing
def getFilenamesFromCSV(csvFile):
filenames = list()
if not csvFile:
return None
with open(csvFile, 'r', newline='') as f:
reader = csv.reader(f)
for row in reader:
if row[0]:
filenames.append(list(row[0].split(",")))
print(filenames)
return filenames
# Decides what, if any, needs to be written to the output file
def whatShouldIWriteToOutputCSV(audioPrepareResult):
if not audioPrepareResult:
return None
elif audioPrepareResult.get("error", None):
print("Error in job id: " + audioPrepareResult["processId"] + ". Error is: " + audioPrepareResult["error"])
return None
else:
row = list()
row.append(audioPrepareResult["key1"])
row.append(audioPrepareResult["key2"])
row.append(audioPrepareResult["key3"])
row.append(audioPrepareResult["key4"])
row.append(audioPrepareResult["key5"])
return row
def main():
pool = mp.Pool(mp.cpu_count())
#fire off workers
filesToProcess = getFilenamesFromCSV("input.csv")
jobs = []
for files in filesToProcess:
jobs.append(pool.apply_async(worker, ([files])))
with open(fileName, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(["Process ID","Source Files","Archive","Preview","Logo"])
for job in jobs:
result = whatShouldIWriteToOutputCSV(job.get())
if(result):
writer.writerow(result)
pool.close()
pool.join()
if __name__ == "__main__":
main()