侦听器仅收到第一条消息

Only the first message is being received by the listener

我在 Python 中编写了一个多处理脚本,主要基于对这个问题的回答:Python multiprocessing safely writing to a file。当工作人员被很好地触发并且所有工作都返回正确的响应时,侦听器只收到第一条消息,几乎就像 while 循环被完全忽略一样。我是 Python 的新手,我绝对可以使用一些专家建议来解决这个问题。

代码:

import multiprocessing as mp
import time
import datetime
import csv
from audio import prepare

fileName = "uuid_file_mapping_" + datetime.datetime.today().strftime('%Y%m%d%H%M%S') + ".csv"

def worker(files, q):
    res = prepare(files)
    # This line is printed successfully for every response
    print("MESSAGE ABOUT TO BE LOGGED: ", res)
    q.put(res)
    return res

def listener(q):
    '''listens for messages on the q, writes to file. '''
    with open(fileName, 'w', newline='') as f:
        writer = csv.writer(f)
        writer.writerow(["Process ID","Source Files","Archive","Preview","Logo"])
        while True:
            m = q.get()
            # This line is printed only once!!!!
            print("MESSAGE RECEIVED BY LISTENER: ", m)
            if not m:
                print("Message is empty. Skipping to the next message")
            elif m["kill"]:
                print("Completed processing all jobs")
                break
            elif m["error"]:
                print("Error in job id: " + m["processId"] + ". Error is: " + m["error"])
            else:
                if m["warn"]:
                    print("Job id: " + m["processId"] + " has the following warnings: " + m["warn"])
                row = list()
                row.append(m["key1"])
                row.append(m["key2"])
                row.append(m["key3"])
                row.append(m["key4"])
                row.append(m["key5"])
                print("Row to be written is: " + row)
                writer.writerow(row)

def getFilenamesFromCSV(csvFile):
    filenames = list()
    if not csvFile:
        return None
    with open(csvFile, 'r', newline='') as f:
        reader = csv.reader(f)
        for row in reader:
            if row[0]:
                filenames.append(list(row[0].split(",")))
    print(filenames)
    return filenames

def main():
    #must use Manager queue here, or will not work
    manager = mp.Manager()
    q = manager.Queue()    
    pool = mp.Pool(mp.cpu_count() + 2)

    #put listener to work first
    watcher = pool.apply_async(listener, (q,))

    #fire off workers
    filesToProcess = getFilenamesFromCSV("input.csv")
    jobs = []
    for files in filesToProcess:
        job = pool.apply_async(worker, (files, q))
        jobs.append(job)

    # collect results from the workers through the pool result queue
    for job in jobs: 
        job.get()

    #now we are done, kill the listener
    killCommand = dict()
    killCommand["kill"] = "KILL"
    q.put(killCommand)
    pool.close()
    pool.join()


if __name__ == "__main__":
   main()

我最终在没有使用队列的情况下重写了代码,这个版本运行良好。队列实现仅在工作进程快速响应时才有效(并且它与超时无关,我没有收到任何错误并且我也尝试设置显式超时)。

import multiprocessing as mp
import time
import datetime
import csv
from audio import prepare

fileName = "uuid_file_mapping_" + datetime.datetime.today().strftime('%Y%m%d%H%M%S') + ".csv"

# This is where the actual processing is done
def worker(files):
    return prepare(files)

# Read filenames from input CSV for processing
def getFilenamesFromCSV(csvFile):
    filenames = list()
    if not csvFile:
        return None
    with open(csvFile, 'r', newline='') as f:
        reader = csv.reader(f)
        for row in reader:
            if row[0]:
                filenames.append(list(row[0].split(",")))
    print(filenames)
    return filenames

# Decides what, if any, needs to be written to the output file
def whatShouldIWriteToOutputCSV(audioPrepareResult):
    if not audioPrepareResult:
        return None
    elif audioPrepareResult.get("error", None):
        print("Error in job id: " + audioPrepareResult["processId"] + ". Error is: " + audioPrepareResult["error"])
        return None
    else:
        row = list()
        row.append(audioPrepareResult["key1"])
        row.append(audioPrepareResult["key2"])
        row.append(audioPrepareResult["key3"])
        row.append(audioPrepareResult["key4"])
        row.append(audioPrepareResult["key5"])
        return row

def main():

    pool = mp.Pool(mp.cpu_count())

    #fire off workers
    filesToProcess = getFilenamesFromCSV("input.csv")
    jobs = []
    for files in filesToProcess:
        jobs.append(pool.apply_async(worker, ([files])))

    with open(fileName, 'w', newline='') as f:
        writer = csv.writer(f)
        writer.writerow(["Process ID","Source Files","Archive","Preview","Logo"])
        for job in jobs:
            result = whatShouldIWriteToOutputCSV(job.get())
            if(result):
                writer.writerow(result)

    pool.close()
    pool.join()


if __name__ == "__main__":
   main()