如何使用 kafka producer API 将一条大消息写入 kafka?
How do I write a big message into kafka with kafka producer API?
我正在尝试向 kafka 中写入一条大消息(大约 15mb),但它没有被写入,程序完成就好像一切正常,但主题中没有任何消息。
确实会写入小消息。
代码如下:
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class Main {
private final static String TOPIC = "rpdc_21596_in2";
private final static String BOOTSTRAP_SERVERS = "host:port";
private static KafkaProducer<String, String> createProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "20971520");
props.put("test.whatever", "fdsfdsf");
return new KafkaProducer<>(props);
}
public static void main(String[] args) throws ExecutionException, InterruptedException, IOException {
ProducerRecord<String, String> record =
new ProducerRecord<String, String>(TOPIC,
0,
123L,
"fdsfdsdsdssss",
new String(Files.readAllBytes(Paths.get("/Users/user/Desktop/value1.json")))
);
KafkaProducer<String, String> producer = createProducer();
RecordMetadata recordMetadata = producer.send(record).get();
producer.flush();
producer.close();
System.out.println(recordMetadata);
}
}
主题已配置为接受大消息,我已经能够用 python 写入其中。这是 python 代码:
from kafka import KafkaProducer
from kafka.errors import KafkaError
producer = KafkaProducer(bootstrap_servers=['host:port'], max_request_size=20971520, request_timeout_ms=100000)
with open('/Users/user/Desktop/value1.json', 'rb') as f:
lines = f.read()
print(type(lines))
# produce keyed messages to enable hashed partitioning
future = producer.send('rpdc_21596_in2', key=b'foo', value=lines)
# Block for 'synchronous' sends
try:
record_metadata = future.get(timeout=50)
except KafkaError:
# Decide what to do if produce request failed...
pass
# Successful result returns assigned partition and offset
print (record_metadata.topic)
print (record_metadata.partition)
print (record_metadata.offset)
producer.flush()
但是 java 版本不起作用。
您需要在创建主题时适当配置您的主题:
https://docs.confluent.io/platform/current/installation/configuration/topic-configs.html#topicconfigs_max.message.bytes
$ kafka-topics.sh --create --bootstrap-servers ... --config max.message.bytes=20971520
更新:
也许可以添加更多属性,我一直在用这个来推动大的 base64 blob:
// Only one in-flight messages per Kafka broker connection
// - max.in.flight.requests.per.connection (default 5)
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
// Set the number of retries - retries
props.put(ProducerConfig.RETRIES_CONFIG, "3");
// Request timeout - request.timeout.ms
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "15000");
// Only retry after one second.
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "1000");
// set max block to one minute by default
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "60000");
// set transaction timeout to one minute
props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "60000");
// set delivery timeout to two minutes
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "120000");
//time to wait before sending messages out to Kafka, should not be too high
props.put(ProducerConfig.LINGER_MS_CONFIG, "100");
// maximum amount of data to be collected before sending the batch, you will always hit that
props.put(ProducerConfig.BATCH_SIZE_CONFIG, "65536");
//those ones are not neccessary but useful for your usecase
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
props.put(ProducerConfig.CLIENT_ID_CONFIG, "myClient");
我正在尝试向 kafka 中写入一条大消息(大约 15mb),但它没有被写入,程序完成就好像一切正常,但主题中没有任何消息。
确实会写入小消息。
代码如下:
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class Main {
private final static String TOPIC = "rpdc_21596_in2";
private final static String BOOTSTRAP_SERVERS = "host:port";
private static KafkaProducer<String, String> createProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "20971520");
props.put("test.whatever", "fdsfdsf");
return new KafkaProducer<>(props);
}
public static void main(String[] args) throws ExecutionException, InterruptedException, IOException {
ProducerRecord<String, String> record =
new ProducerRecord<String, String>(TOPIC,
0,
123L,
"fdsfdsdsdssss",
new String(Files.readAllBytes(Paths.get("/Users/user/Desktop/value1.json")))
);
KafkaProducer<String, String> producer = createProducer();
RecordMetadata recordMetadata = producer.send(record).get();
producer.flush();
producer.close();
System.out.println(recordMetadata);
}
}
主题已配置为接受大消息,我已经能够用 python 写入其中。这是 python 代码:
from kafka import KafkaProducer
from kafka.errors import KafkaError
producer = KafkaProducer(bootstrap_servers=['host:port'], max_request_size=20971520, request_timeout_ms=100000)
with open('/Users/user/Desktop/value1.json', 'rb') as f:
lines = f.read()
print(type(lines))
# produce keyed messages to enable hashed partitioning
future = producer.send('rpdc_21596_in2', key=b'foo', value=lines)
# Block for 'synchronous' sends
try:
record_metadata = future.get(timeout=50)
except KafkaError:
# Decide what to do if produce request failed...
pass
# Successful result returns assigned partition and offset
print (record_metadata.topic)
print (record_metadata.partition)
print (record_metadata.offset)
producer.flush()
但是 java 版本不起作用。
您需要在创建主题时适当配置您的主题: https://docs.confluent.io/platform/current/installation/configuration/topic-configs.html#topicconfigs_max.message.bytes
$ kafka-topics.sh --create --bootstrap-servers ... --config max.message.bytes=20971520
更新:
也许可以添加更多属性,我一直在用这个来推动大的 base64 blob:
// Only one in-flight messages per Kafka broker connection
// - max.in.flight.requests.per.connection (default 5)
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
// Set the number of retries - retries
props.put(ProducerConfig.RETRIES_CONFIG, "3");
// Request timeout - request.timeout.ms
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "15000");
// Only retry after one second.
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "1000");
// set max block to one minute by default
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "60000");
// set transaction timeout to one minute
props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "60000");
// set delivery timeout to two minutes
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "120000");
//time to wait before sending messages out to Kafka, should not be too high
props.put(ProducerConfig.LINGER_MS_CONFIG, "100");
// maximum amount of data to be collected before sending the batch, you will always hit that
props.put(ProducerConfig.BATCH_SIZE_CONFIG, "65536");
//those ones are not neccessary but useful for your usecase
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
props.put(ProducerConfig.CLIENT_ID_CONFIG, "myClient");