试图理解作为 api 网关 websockets 连接逻辑一部分的 lambda 函数

Trying to understand lambda function that is part of the logic of api gateway websockets connection

TLDR:我如何通过 api 网关将短负载从 mqtt 请求发送到 aws iot 到具有开放连接的 aws lambda 到 [=45] 中的本地电子应用程序 运行 =].

我有一个带有以下代码的 esp8266 init.js 此代码成功将其消息发送到 aws iot,并设置了一个规则来触发名为 sendmessage 的 lambda。现在这个 sendmessage lambda 通过 websockets 连接到我 linux 机器上的本地 Electon 应用程序。我能够通过 websockets 从 Electron 应用程序发送消息到 api 网关 wss url。我按照这个例子 here 设置了所有带有 api 网关和 aws lambdas(一个是 sendmessage lambda)的 websockets。

load("api_config.js");
load("api_gpio.js");
load("api_mqtt.js");
load("api_sys.js");
load("api_timer.js");

let pin = 0;
GPIO.set_button_handler(
  pin,
  GPIO.PULL_UP,
  GPIO.INT_EDGE_NEG,
  50,
  function (x) {
    let res = MQTT.pub(
      "mOS/topic1",
      JSON.stringify({ action: "sendmessage", data: "pushed" }),
      1
    );

    print(res);
    print("Published:", res ? "yes" : "no");
    let connected = MQTT.isConnected();

    print(connected);
  },
  true
);
print("Flash button is configured on GPIO pin", pin);
print("Press the flash button now!");

我知道从 iot 到 sendmessage lambda 的消息需要是 websockets 消息,但它只有 {"action":"sendmessage","data":"hello world"} 的最小对象,它缺少 websocket 需要的一堆信息。但是我不需要 aws iot 和 sendmessage lambda 之间的 websocket 连接,我需要它从 IOT -> 具有最小有效载荷的 sendmessage lambda -> electron app 通过带有来自 IOT 的有效载荷的 websockets。

发送消息 LAMBDA

// Copyright 2018-2020Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT-0

const AWS = require('aws-sdk');

const ddb = new AWS.DynamoDB.DocumentClient({ apiVersion: '2012-08-10', region: process.env.AWS_REGION });

const { TABLE_NAME } = process.env;

exports.handler = async event => {
  let connectionData;
  
  try {
    connectionData = await ddb.scan({ TableName: TABLE_NAME, ProjectionExpression: 'connectionId' }).promise();
  } catch (e) {
    return { statusCode: 500, body: e.stack };
  }
  
  const apigwManagementApi = new AWS.ApiGatewayManagementApi({
    apiVersion: '2018-11-29',
    endpoint: event.requestContext.domainName + '/' + event.requestContext.stage
  });
  
  const postData = JSON.parse(event.body).data;
  
  const postCalls = connectionData.Items.map(async ({ connectionId }) => {
    try {
      await apigwManagementApi.postToConnection({ ConnectionId: connectionId, Data: postData }).promise();
    } catch (e) {
      if (e.statusCode === 410) {
        console.log(`Found stale connection, deleting ${connectionId}`);
        await ddb.delete({ TableName: TABLE_NAME, Key: { connectionId } }).promise();
      } else {
        throw e;
      }
    }
  });
  
  try {
    await Promise.all(postCalls);
  } catch (e) {
    return { statusCode: 500, body: e.stack };
  }

  return { statusCode: 200, body: 'Data sent.' };
};

onconnect lambda

// SPDX-License-Identifier: MIT-0

const AWS = require('aws-sdk');

const ddb = new AWS.DynamoDB.DocumentClient({ apiVersion: '2012-08-10', region: process.env.AWS_REGION });

exports.handler = async event => {
  const putParams = {
    TableName: process.env.TABLE_NAME,
    Item: {
      connectionId: event.requestContext.connectionId
    }
  };

  try {
    await ddb.put(putParams).promise();
  } catch (err) {
    return { statusCode: 500, body: 'Failed to connect: ' + JSON.stringify(err) };
  }

  return { statusCode: 200, body: 'Connected.' };
};

ondisconnect lambda

// Copyright 2018-2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT-0

// https://docs.aws.amazon.com/apigateway/latest/developerguide/apigateway-websocket-api-route-keys-connect-disconnect.html
// The $disconnect route is executed after the connection is closed.
// The connection can be closed by the server or by the client. As the connection is already closed when it is executed, 
// $disconnect is a best-effort event. 
// API Gateway will try its best to deliver the $disconnect event to your integration, but it cannot guarantee delivery.

const AWS = require('aws-sdk');

const ddb = new AWS.DynamoDB.DocumentClient({ apiVersion: '2012-08-10', region: process.env.AWS_REGION });

exports.handler = async event => {
  const deleteParams = {
    TableName: process.env.TABLE_NAME,
    Key: {
      connectionId: event.requestContext.connectionId
    }
  };

  try {
    await ddb.delete(deleteParams).promise();
  } catch (err) {
    return { statusCode: 500, body: 'Failed to disconnect: ' + JSON.stringify(err) };
  }

  return { statusCode: 200, body: 'Disconnected.' };
};

在我的 electron 应用程序中,我有以下代码来测试 websocket,但我收到了一个禁止的错误。然而与 wscat 一起工作...

"use strict";
const { app, BrowserWindow } = require("electron");
const { Notification } = require("electron");
const WebSocket = require("ws");

function createWindow() {
  const win = new BrowserWindow({
    width: 800,
    height: 600,
    webPreferences: {
      nodeIntegration: true,
    },
  });

  win.loadFile("index.html");
  win.webContents.openDevTools();
}

app.whenReady().then(createWindow);

app.on("window-all-closed", () => {
  if (process.platform !== "darwin") {
    app.quit();
  }
});

app.on("activate", () => {
  if (BrowserWindow.getAllWindows().length === 0) {
    createWindow();
  }
});

// Tell express to use the body-parser middleware and to not parse extended bodies

const url = "wss://random.execute-api.us-east-1.amazonaws.com/Prod";
const connection = new WebSocket(url);

connection.onopen = () => {
  connection.send("hello world");
};

connection.onmessage = (e) => {
  console.log(e.data);
};

connection.onerror = (error) => {
  console.log(`WebSocket error: ${error}`);
};

function showNotification() {
  const notification = {
    title: "Basic Notification",
    body: `notification`,
  };

  new Notification(notification).show();
}

app.whenReady().then(createWindow).then(showNotification);

我现在设置我的 mqtt 事件以将相同的数据发送到 lambda 但我在 lambda 中收到以下错误

{
    "errorType": "TypeError",
    "errorMessage": "Cannot read property 'domainName' of undefined",
    "stack": [
        "TypeError: Cannot read property 'domainName' of undefined",
        "    at Runtime.exports.handler (/var/task/app.js:29:28)",
        "    at processTicksAndRejections (internal/process/task_queues.js:97:5)"
    ]
}

更新: 这是我的最后一个 lambda,在收到来自 IOT 的事件后,我向 wss 地址发送了一条消息,但它不起作用,控制台记录了事件,但没有触发任何 ws.on 函数

// const axios = require('axios')
// const url = 'http://checkip.amazonaws.com/';
const WebSocket = require("ws");
let response;

/**
 *
 * Event doc: https://docs.aws.amazon.com/apigateway/latest/developerguide/set-up-lambda-proxy-integrations.html#api-gateway-simple-proxy-for-lambda-input-format
 * @param {Object} event - API Gateway Lambda Proxy Input Format
 *
 * Context doc: https://docs.aws.amazon.com/lambda/latest/dg/nodejs-prog-model-context.html
 * @param {Object} context
 *
 * Return doc: https://docs.aws.amazon.com/apigateway/latest/developerguide/set-up-lambda-proxy-integrations.html
 * @returns {Object} object - API Gateway Lambda Proxy Output Format
 *
 */
exports.lambdaHandler = async (event, context) => {
  try {
    // const ret = await axios(url);

    console.log(event);

    const url = "wss://obsf.execute-api.us-east-1.amazonaws.com/Prod";
    const ws = new WebSocket(url);

    var test = { action: "sendmessage", data: "hello world from button" };

    ws.on("open", function open() {
      ws.send(JSON.stringify(test));
    });

    ws.on("message", function incoming(data) {
      console.log(data);
    });

    response = {
      statusCode: 200,
      body: JSON.stringify({
        message: "hello world",
        // location: ret.data.trim()
      }),
    };
  } catch (err) {
    console.log(err);
    return err;
  }

  return response;
};

更新:最后我试过了,我什至没有得到错误,我知道 ws 在那里,因为如果我控制它 returns 一个有很多功能的大对象

    console.log(ws); this returns a large object

    ws.on("error", console.error); this does nothing

您好像设置了 1 个 lambda 来处理 2 个触发源,一个是 IoT 服务,另一个是 API Gateway Websocket。由于您使用 1 个 lambda,因此您必须处理请求来自以下来源的情况:

  1. 当请求从 API 网关触发时 event.requestContext 可用,当请求从 IoT 服务触发时它不可用(在此处检查 IoT 事件对象 https://docs.aws.amazon.com/lambda/latest/dg/services-iotevents.html ).所以你遇到的错误(Cannot read property 'domainName' of undefined")就是关于这个的。您应该关闭来自 IoT 服务的 lambda 触发器或处理来自 IoT 服务的请求。
  2. 我不确定禁止的错误,但更像是你向 API 网关 WS 发送了非结构化消息,它应该是 connection.send(JSON.stringify({ action: "sendmessage", data: "hello world" })); 而不是 connection.send("hello world");

编辑基于post更新:

I know ws is there because if I console it it returns a big object with a bunch of functions

Lambda 函数并不是真正的服务器,它是一个实例 Node 环境(这就是它被称为 FUNCTION 的原因),Lambda 函数并不像您认为的普通 Nodejs 应用程序那样工作,它的容器(节点环境)通常在其工作完成时停止(或冻结),因此您不能像普通服务器一样保持其容器活动。 这就是为什么你可以控制台记录 Websocket 对象,但你不能让它保持活动状态,每当你 return/response.

时,NodeJS 容器已经停止

由于您不能使用 Websocket 对象在 Lambda 中打开 WS 连接,Amazon 提供了一种通过 API 网关来实现的方法。我们使用 API Gateway Websocket 的方式也不同于普通服务器,它类似于:

  • 用户 -> 请求 API 网关连接到 websocket -> 调用 Lambda 1(onconnect 函数)
  • 用户 -> 请求 API 网关通过 Websocket 发送消息 -> 调用 Lambda 2(发送消息函数)
  • 用户 -> 请求 API 网关关闭连接 -> 调用 Lambda 3(ondisconnect 函数)

上面的3个设置是在API网关(https://docs.aws.amazon.com/apigateway/latest/developerguide/apigateway-websocket-api-integrations.html)中配置的,3个功能的逻辑onconnectsendmessageondisconnect可以在ondisconnect中处理1 个 lambda 或 3 个 lambda 函数取决于我们的设计方式,我检查了你的 3 个 lambda 函数,看起来没问题。

我看到你想使用 IoT,但我不确定为什么。您应该首先测试您的 Websocket API,不要与 IoT 相关。如果您能在这里说出您想要实现的目标会更好,因为物联网更像是一个 publish/subscribe/messaging 通道,我认为没有必要在这里使用它。

我发布这个只是为了展示我是如何解决它的。

相反,我创建了一个 node express ec2 服务器来处理来自物联网设备的请求,然后将其传递到我的 wss 服务器,这是它的主要工作部分

这就是我在这里所做的

这是有效代码的主要部分

app.post('/doorbell', (req, res) => {
  var hmm1 = { action: 'sendmessage', data: 'hello world from post request' };

  if (ws.readyState === WebSocket.OPEN) {
    ws.send(JSON.stringify(hmm1));
    res.send('heya hey');
  } else {
    res.send('The WebSocket connection is not open');
  }
});