如何在 windows 上使用 C++ 代码在 librdkafka 中使用 KAFKA 的生产者 API
How to use KAFKA's producer API in librdkafka with c++ code on windows
我正在尝试将客户端编写为制作人。
我按照示例创建了一个新的 win32 控制台项目。
我发现 API 对我不起作用,除非我在程序末尾添加 getline() 函数。
如果我删除 getline(),produce(..) 方法仍然是 returns 成功的结果。
但是,我在 kafka-console-consumer
的命令 window 中看不到任何响应
我有点困惑。那正确吗?
如何在不使用 getline() 的情况下发送消息?
有人知道吗?
我找到了它不起作用的原因。
删除生产者对象似乎太快了
导致生产者无法向代理发送消息。
当我在生产方法和删除生产者对象之间添加睡眠 1000 时,
生产者可以正确发送消息。
所以,问题是如何立即发送消息。
在销毁生产者对象之前,如何确保这些消息已完全发送?
如何解决这个问题,实际上我不喜欢在我的源代码中添加一些 sleep()。
win10+vs2015+kafka_2.10-0.9.0.1+zookeeper-3.4.6+librdkafka
请查看以下代码
// kafka_test_win32_nomfc.cpp
//
#include "stdafx.h"
#include <iostream>
#include "librdkafka/rdkafkacpp.h"
int static producer_1()
{
std::string brokers = "127.0.0.1";
std::string errstr;
std::string topic_str = "linli";
std::string mode;
std::string debug;
int32_t partition = RdKafka::Topic::PARTITION_UA;
int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING;
bool do_conf_dump = false;
int opt;
// MyHashPartitionerCb hash_partitioner;
int use_ccb = 0;
/*
* Create configuration objects
*/
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
conf->set("metadata.broker.list", brokers, errstr);
RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);
if (!producer) {
std::cerr << "Failed to create producer: " << errstr << std::endl;
exit(1);
}
std::cout << "% Created producer " << producer->name() << std::endl;
/*
* Create topic handle.
*/
RdKafka::Topic *topic = NULL;
if (!topic_str.empty()) {
topic = RdKafka::Topic::create(producer, topic_str, tconf, errstr);
if (!topic) {
std::cerr << "Failed to create topic: " << errstr << std::endl;
exit(1);
}
}
RdKafka::ErrorCode resp = producer->produce(topic, partition,
RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
const_cast<char *>("hello worlf"), 11,
NULL, NULL);
delete topic;
delete producer;
return 0;
}
int static producer_2()
{
std::string brokers = "127.0.0.1";
std::string errstr;
std::string topic_str = "linli";
std::string mode;
std::string debug;
int32_t partition = RdKafka::Topic::PARTITION_UA;
int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING;
bool do_conf_dump = false;
int opt;
// MyHashPartitionerCb hash_partitioner;
int use_ccb = 0;
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
conf->set("metadata.broker.list", brokers, errstr);
RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);
if (!producer) {
std::cerr << "Failed to create producer: " << errstr << std::endl;
exit(1);
}
std::cout << "% Created producer " << producer->name() << std::endl;
RdKafka::ErrorCode resp = producer->produce(topic_str, partition,
RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
(void *)"hi", 2,
NULL, 0, 0, NULL);
std::string errs(RdKafka::err2str(resp));
std::cout << errs << std::endl;
//producer->poll(0);
delete producer;
return 0;
}
int main()
{
producer_2();
return 0;
}
librdkafka produce() API(C 和 C++)是异步的,您的消息最初只会在内部生产者队列中排队,稍后才会排队(参见 queue.buffering.max.ms
配置 属性 - 默认 1 秒) 与其他消息组合成消息批处理 (MessageSet) 并从后台线程发送到代理。
您的程序调用 produce()
然后迅速退出,早在后台生产者线程有机会向代理发送消息之前,更不用说收到来自代理的确认。
为确保已发送所有未完成的消息,请在终止您的应用程序之前致电 flush()
。
如果您的应用程序是长期存在的,您应该定期调用 poll()
来为您注册的任何交付报告回调提供服务。
我正在尝试将客户端编写为制作人。 我按照示例创建了一个新的 win32 控制台项目。 我发现 API 对我不起作用,除非我在程序末尾添加 getline() 函数。
如果我删除 getline(),produce(..) 方法仍然是 returns 成功的结果。 但是,我在 kafka-console-consumer
的命令 window 中看不到任何响应我有点困惑。那正确吗? 如何在不使用 getline() 的情况下发送消息? 有人知道吗?
我找到了它不起作用的原因。 删除生产者对象似乎太快了 导致生产者无法向代理发送消息。
当我在生产方法和删除生产者对象之间添加睡眠 1000 时, 生产者可以正确发送消息。
所以,问题是如何立即发送消息。 在销毁生产者对象之前,如何确保这些消息已完全发送?
如何解决这个问题,实际上我不喜欢在我的源代码中添加一些 sleep()。
win10+vs2015+kafka_2.10-0.9.0.1+zookeeper-3.4.6+librdkafka 请查看以下代码
// kafka_test_win32_nomfc.cpp
//
#include "stdafx.h"
#include <iostream>
#include "librdkafka/rdkafkacpp.h"
int static producer_1()
{
std::string brokers = "127.0.0.1";
std::string errstr;
std::string topic_str = "linli";
std::string mode;
std::string debug;
int32_t partition = RdKafka::Topic::PARTITION_UA;
int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING;
bool do_conf_dump = false;
int opt;
// MyHashPartitionerCb hash_partitioner;
int use_ccb = 0;
/*
* Create configuration objects
*/
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
conf->set("metadata.broker.list", brokers, errstr);
RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);
if (!producer) {
std::cerr << "Failed to create producer: " << errstr << std::endl;
exit(1);
}
std::cout << "% Created producer " << producer->name() << std::endl;
/*
* Create topic handle.
*/
RdKafka::Topic *topic = NULL;
if (!topic_str.empty()) {
topic = RdKafka::Topic::create(producer, topic_str, tconf, errstr);
if (!topic) {
std::cerr << "Failed to create topic: " << errstr << std::endl;
exit(1);
}
}
RdKafka::ErrorCode resp = producer->produce(topic, partition,
RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
const_cast<char *>("hello worlf"), 11,
NULL, NULL);
delete topic;
delete producer;
return 0;
}
int static producer_2()
{
std::string brokers = "127.0.0.1";
std::string errstr;
std::string topic_str = "linli";
std::string mode;
std::string debug;
int32_t partition = RdKafka::Topic::PARTITION_UA;
int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING;
bool do_conf_dump = false;
int opt;
// MyHashPartitionerCb hash_partitioner;
int use_ccb = 0;
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
conf->set("metadata.broker.list", brokers, errstr);
RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);
if (!producer) {
std::cerr << "Failed to create producer: " << errstr << std::endl;
exit(1);
}
std::cout << "% Created producer " << producer->name() << std::endl;
RdKafka::ErrorCode resp = producer->produce(topic_str, partition,
RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
(void *)"hi", 2,
NULL, 0, 0, NULL);
std::string errs(RdKafka::err2str(resp));
std::cout << errs << std::endl;
//producer->poll(0);
delete producer;
return 0;
}
int main()
{
producer_2();
return 0;
}
librdkafka produce() API(C 和 C++)是异步的,您的消息最初只会在内部生产者队列中排队,稍后才会排队(参见 queue.buffering.max.ms
配置 属性 - 默认 1 秒) 与其他消息组合成消息批处理 (MessageSet) 并从后台线程发送到代理。
您的程序调用 produce()
然后迅速退出,早在后台生产者线程有机会向代理发送消息之前,更不用说收到来自代理的确认。
为确保已发送所有未完成的消息,请在终止您的应用程序之前致电 flush()
。
如果您的应用程序是长期存在的,您应该定期调用 poll()
来为您注册的任何交付报告回调提供服务。