无法使用aws lambda函数将消息生成到kafka主题中

Not able to produce message into kafka topic using aws lambda function

所以我正在尝试在 S3 事件上编写一个 lambda 函数,它将消息放入 kafka 主题。 我的 aws lambda 函数正在触发并且也没有收到任何错误。 但是我无法在 Kafka 主题中看到这些消息。

这是我的 lambda 函数

String srcBucket = record.getS3().getBucket().getName();

        String srcKey = record.getS3().getObject().getUrlDecodedKey();

        System.out.println("Bucket is " + srcBucket + "  and Key is " + srcKey);
        // Assign topicName to string variable
        String topicName = "AWSKafkaTutorialTopic";

        // create instance for properties to access producer configs
        Properties props = new Properties();

        props.put("bootstrap.servers",
                "b-3.205147-riskaudit.rtyrty.c5.kafka.us-east-1.amazonaws.com:9092,b-4.205147-riskaudit.rtyt.c5.kafka.us-east-1.amazonaws.com:9092,b-5.205147-tryrt.xt08nj.c5.kafka.us-east-1.amazonaws.com:9092");
        System.out.println("bootstrap.servers successfully");
        // Set acknowledgements for producer requests.
        props.put("acks", "all");

        // If the request fails, the producer can automatically retry,
        props.put("retries", 0);

        // Specify buffer size in config
        props.put("batch.size", 16384);

        // Reduce the no of requests less than 0
        props.put("linger.ms", 1);

        // The buffer.memory controls the total amount of memory available to the
        // producer for buffering.
        props.put("buffer.memory", 33554432);

        System.out.println("before key.serializer successfully");

        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        System.out.println("after  key.serializer successfully");
        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        System.out.println("Inside loop successfully");
        for (int i = 0; i < 10; i++)

            producer.send(new ProducerRecord<String, String>(topicName, Integer.toString(i), Integer.toString(i)));
        System.out.println("Message sent successfully");
        producer.close();

        return "Message Pushed success fully";

我的 lambda 函数是 运行 直到 for 循环,但看不到之后会发生什么。 请帮助

我觉得一切正常 只需添加 props.put("producer.type", "async"); 您可能 运行 来自启动 MSK 的 vpc 的 lmbda 函数。 另外请注意 IAM 政策。 试试这个 AWSLambdaVPCAccessExecutionRole 和安全组 .

如果您设置了所有这些代码将开始工作。