如何使用 python asyncio 从 EC2 实例调用 AWS Lambda 函数

How to invoke an AWS Lambda function from EC2 instances with python asyncio

我最近 post 提出了一个关于 的问题。 我设法通过将具有 "AWS lambda role" 策略的 IAM 角色附加到 EC2 实例来使其工作,现在我可以使用 boto3.

调用 lambda 函数

现在,我想使用 asyncio await 语法异步调用 lambda 函数。我读到 lambda 函数通过设置 InvokeType='Event' 提供了一个异步版本,但实际上在没有得到函数结果的情况下立即调用 return。

由于该函数需要一些时间,而且我想同时启动多个函数,因此我想避免在等待函数 return.

时阻塞执行

我尝试使用 aiobotocore,但它只支持基本的 's3' 服务功能。

解决此问题的最佳方法(依愚见)是使用 AWS API 网关服务通过 GET/POST 请求调用 lambda 函数,该请求可以使用 aiohttp 轻松处理。

尽管如此,我还是没能成功。

我向 EC2 IAM 角色添加了策略 "AmazonAPIGatewayInvokeFullAccess" 但每次我尝试:

import requests
r = requests.get('https://url_to_api_gateway_for_function')

我收到禁止回复 <Response [403]>

我直接使用 lambda 函数中的触发器创建了 API 网关。

我还尝试编辑 API 网关设置,方法是将 post 方法添加到函数路径并设置 "AWS_IAM" 身份验证,然后将其部署为 "prod" 部署...没有运气。仍然是同样的禁止回应。当我通过 "test screen on the API gateway, it works fine".

测试它时

知道如何解决这个问题吗?我错过了一些步骤吗?

经过一番努力,我设法解决了我的问题。

问题在于 curl 和 python 模块(如 python 的请求)不会使用它们所在的 EC2 机器的 IAM 凭据对 http 请求进行签名 运行。必须使用 AWS v4 登录协议对 AWS GATEWAY API 的 http 请求进行签名。

这里有一个例子: http://docs.aws.amazon.com/general/latest/gr/sigv4-signed-request-examples.html

幸运的是,为了简单起见,有一些辅助模块,例如 requests-aws-sign: https://github.com/jmenga/requests-aws-sign

代码最后可能类似于:

import aiohttp
import asyncio
from requests_aws_sign import AWSV4Sign
from boto3 import session

session = session.Session()
credentials = session.get_credentials()
region = session.region_name or 'ap-southeast-2'
service = 'execute-api'
url = "get_it_from_api->stages->your_deployment->invoke_url"
auth=AWSV4Sign(credentials, region, service)

async def invoke_func(loop):
    async with aiohttp.request('GET', url, auth=auth, loop=loop) as resp:
        html = await resp.text()
        print(html)

loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))

希望这会为其他人节省时间!

编辑:

为了完整和帮助他人,我不得不说上面的代码不起作用,因为 requests_aws_sign 与 aiohttp 不兼容。我得到了一些 "auth field error".

我设法通过使用解决了它:

async with session.get(url, headers=update_headers()) as resp:

其中 update_headers() 是一个简单的函数,它模拟了 requests_aws_sign 对 header 所做的事情(这样我就可以使用header 参数)。 它看起来像这样:

def update_headers(sim_id):
    url = urlparse("get_it_from_api->stages->your_deployment->invoke_url")
    path = url.path or '/'
    querystring = ''
    if url.query:
        querystring = '?' + urlencode(parse_qs(url.query), doseq=True)
    safe_url = url.scheme + '://' + url.netloc.split(':')[0] + path + querystring
    request = AWSRequest(method='GET', url=safe_url)
    SigV4Auth(credentials, service, region).add_auth(request)
    return dict(request.headers.items())