如何使用 kafka producer API 将一条大消息写入 kafka?

How do I write a big message into kafka with kafka producer API?

我正在尝试向 kafka 中写入一条大消息(大约 15mb),但它没有被写入,程序完成就好像一切正​​常,但主题中没有任何消息。

确实会写入小消息。

代码如下:

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class Main {
    private final static String TOPIC = "rpdc_21596_in2";
    private final static String BOOTSTRAP_SERVERS = "host:port";

    private static KafkaProducer<String, String> createProducer() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());
        props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "20971520");
        props.put("test.whatever", "fdsfdsf");

        return new KafkaProducer<>(props);
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException, IOException {
        ProducerRecord<String, String> record =
                new ProducerRecord<String, String>(TOPIC,
                        0,
                        123L,
                        "fdsfdsdsdssss",
                        new String(Files.readAllBytes(Paths.get("/Users/user/Desktop/value1.json")))
                );
        KafkaProducer<String, String> producer = createProducer();
        RecordMetadata recordMetadata = producer.send(record).get();
        producer.flush();
        producer.close();

        System.out.println(recordMetadata);
    }
}

主题已配置为接受大消息,我已经能够用 python 写入其中。这是 python 代码:

from kafka import KafkaProducer
from kafka.errors import KafkaError

producer = KafkaProducer(bootstrap_servers=['host:port'], max_request_size=20971520, request_timeout_ms=100000)

with open('/Users/user/Desktop/value1.json', 'rb') as f:
    lines = f.read()
    print(type(lines))

    # produce keyed messages to enable hashed partitioning
    future = producer.send('rpdc_21596_in2', key=b'foo', value=lines)

    # Block for 'synchronous' sends
    try:
        record_metadata = future.get(timeout=50)
    except KafkaError:
        # Decide what to do if produce request failed...
        pass

    # Successful result returns assigned partition and offset
    print (record_metadata.topic)
    print (record_metadata.partition)
    print (record_metadata.offset)


    producer.flush()

但是 java 版本不起作用。

您需要在创建主题时适当配置您的主题: https://docs.confluent.io/platform/current/installation/configuration/topic-configs.html#topicconfigs_max.message.bytes

$ kafka-topics.sh --create --bootstrap-servers ... --config max.message.bytes=20971520

更新:

也许可以添加更多属性,我一直在用这个来推动大的 base64 blob:

    // Only one in-flight messages per Kafka broker connection
    // - max.in.flight.requests.per.connection (default 5)
    props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
    // Set the number of retries - retries
    props.put(ProducerConfig.RETRIES_CONFIG, "3");

    // Request timeout - request.timeout.ms
    props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "15000");

    // Only retry after one second.
    props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "1000");
   
    // set max block to one minute by default
    props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "60000");
   
    // set transaction timeout to one minute
    props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "60000");
   
    // set delivery timeout to two minutes
    props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "120000");

    //time to wait before sending messages out to Kafka, should not be too high
    props.put(ProducerConfig.LINGER_MS_CONFIG, "100");
    // maximum amount of data to be collected before sending the batch, you will always hit that
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, "65536");

    //those ones are not neccessary but useful for your usecase
    props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
    props.put(ProducerConfig.CLIENT_ID_CONFIG, "myClient");