Kafka 生产者以 1 条消息(881 字节)仍在队列或传输中终止

Kafka Producer terminating with 1 message (881 bytes) still in queue or transit

我是 Kafka 的新手,在向生产者推送价值时收到此消息

func Produce(topic string, key string, message interface{}) {

    headers := map[string][]byte{
        tenant_kafka.MSG_HEADER_KEY_CORRELATIONID: []byte("1234"),
        tenant_kafka.MSG_HEADER_KEY_REQUESTID:     []byte(uuid.NewString()),
        tenant_kafka.MSG_HEADER_KEY_TENANTID:      []byte("456"),
        tenant_kafka.MSG_HEADER_KEY_MESSAGETYPE:   []byte("TenantLookupRequest"),
    }

    kheaders := make([]kafka.Header, 0, len(headers))
    for k, v := range headers {
        kheaders = append(kheaders, kafka.Header{Key: k, Value: v})
    }


    var err error

    servers := "XXXXXX"
    protocol := "SASL_SSL"
    mechanisms := "PLAIN"
    username := "XXXXXXX"
    password := "XXXXXXX"


    Producer, err = kafka.NewProducer(&kafka.ConfigMap{
        "bootstrap.servers": servers,
        "security.protocol": protocol,
        "sasl.username":     username,
        "sasl.password":     password,
        "sasl.mechanism":    mechanisms,
    })

    if err != nil {
        panic(err)
    }
    defer Producer.Close()

    value, _ := json.Marshal(message)

    err = Producer.Produce(&kafka.Message{
        TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
        Key:           []byte("12345"),
        Headers:       kheaders,
        Value:         value,
        Timestamp:     time.Now().UTC(),
        TimestampType: kafka.TimestampCreateTime,
    }, nil)

    if err != nil {
        panic(err)
    }
    Producer.Flush(30)
}

%4|1641074998.615|终止|rdkafka#producer-1| [thrd:app]:生产者终止,1 条消息(881 字节)仍在队列或传输中:使用 flush() 等待未完成的消息传递

关于如何解决这个问题的任何帮助?

请在Flush()时尝试更长的超时时间; 30ms 可能还不够。或尝试使用此示例中的频道: https://github.com/confluentinc/confluent-kafka-go/blob/80c58f81b6cc32d3ed046609bf660a41a061b23d/examples/producer_example/producer_example.go