AWS Lambda 函数如何获取并行函数的结果?

How can the AWS Lambda function get the results of parallel functions?

在我的 AWS Lambda 函数 (Python 3.8 运行time) 中,我正在尝试运行 三个不同的不相关函数。这三个函数中的每一个 return 都有不同的结果,具有不同的数据类型。我需要 return 这三个与主函数并行工作的函数的结果 lambda_handler。您认为最有效的方法是什么?

现在我正在考虑使用一个全局变量来记录三个函数的结果。但在我看来,我无法在主函数 lambda_handler 中获得这些结果,因为它们在不同的进程中工作。不是吗?

我认为使用Pipe会更合适。我发现 Queue 在 AWS Lambda 中不受支持。你不这么认为吗?

from multiprocessing import Process


results = dict()

def first_function(event):
    # Do something
    global results
    results["first_function"] = True

def second_function():
    # Do something
    global results
    results["second_function"] = 30


def third_function():
    # Do something
    global results
    results["third_function"] = ["q", "w", "e", "r", "t"]


def execute_parallel_processes(*functions):
    # Create an empty list to keep all parallel processes.
    processes = list()

    # Create a process per function.
    for function in functions:
        process = Process(target=function)
        processes.append(process)

    # Start all parallel processes.
    for process in processes:
        process.start()

    # Wait until all parallel processes are finished.
    for process in processes:
        process.join()


def lambda_handler(event, context):
    # Execute 3 different processes in parallel.
    execute_parallel_processes(
        first_function(event),
        second_function,
        third_function
    )

    print(results)

    return None

我用Pipe解决了这个问题。这是一个有效的代码片段:

import json
from multiprocessing import Process, Pipe
from typing import *
from functools import wraps

logger = logging.getLogger(__name__)
logger.setLevel(logging.ERROR)


def first_function(*args, **kwargs):
    print("first_function")
    pipe = kwargs["pipe"]
    pipe.send({"first_function": True})
    pipe.close()


def second_function(*args, **kwargs):
    print("second_function")
    pipe = kwargs["pipe"]
    pipe.send({"second_function": 30})
    pipe.close()


def third_function(*args, **kwargs):
    print("third_function")
    pipe = kwargs["pipe"]
    pipe.send({"third_function": ["item1", "item2"]})
    pipe.close()


def execute_parallel_processes(functions):
    # Create an empty list to keep all parallel processes.
    processes = []

    # Create an empty list of pipes to keep all connections.
    pipes = []

    # Create a process per function.
    for index, function in enumerate(functions):
        # Check whether the input arguments have keys in their dictionaries.
        try:
            target = function["object"]
        except KeyError as error:
            logger.error(error)
            raise Exception(error)
        try:
            kwargs = function["arguments"]
        except KeyError as error:
            logger.error(error)
            raise Exception(error)

        # Create the pipe for communication.
        parent_pipe, child_pipe = Pipe()
        pipes.append(parent_pipe)
        kwargs["pipe"] = child_pipe

        # Create the process.
        process = Process(target=target, kwargs=kwargs)
        processes.append(process)

    # Start all parallel processes.
    for process in processes:
        process.start()

    # Wait until all parallel processes are finished.
    for process in processes:
        process.join()

    # Get the results of all the processes.
    results = {}
    for index, pipe in enumerate(pipes):
        results = {**results, **pipe.recv()}

    # Return the results of all processes.
    return results


def lambda_handler(event, context):
    functions = [
        {
            "object": first_function,
            "arguments": {}
        },
        {
            "object": second_function,
            "arguments": {"event": event}
        },
        {
            "object": third_function,
            "arguments": {}
        }
    ]
    results = execute_parallel_processes(functions=functions)
    print(results)
    return {
        'statusCode': 200,
        'body': json.dumps('Hello from Lambda!')
    }