使用 acks=all 和 flush 的 Kafka 生产者消息传递
Kakfa Producer Message Delivery with acks=all and flush
正在使用 "acks=all" 配置创建 Kafka Producer。
用上面的配置调用 flush 有什么意义吗?
是否会在发送到代理之前等待刷新被调用。
AS
acks=all This means the leader will wait for the full set of in-sync
replicas to acknowledge the record. This guarantees that the record
will not be lost as long as at least one in-sync replica remains
alive. This is the strongest available guarantee. This is equivalent
to the acks=-1 setting.
冲洗():
Invoking this method makes all buffered records immediately available
to send (even if linger_ms is greater than 0) and blocks on the
completion of the requests associated with these records. The
post-condition of flush() is that any previously sent record will have
completed (e.g. Future.is_done() == True). A request is considered
completed when either it is successfully acknowledged according to the
‘acks’ configuration for the producer, or it results in an error.
Other threads can continue sending messages while one thread is
blocked waiting for a flush call to complete; however, no guarantee is
made about the completion of messages sent after the flush call
begins.
flush() 仍然会阻塞客户端应用程序,直到所有消息发送完毕,即使 ack=0。唯一的就是它不会等待ack,block只会等到buffer被发送出去。
带有 ack=all 的 flush() 保证消息已发送并已在具有所需复制因子的集群上复制。
最后,回答你的问题:它会在发送到代理之前等待刷新被调用吗?
答:不一定。生产者以 的间隔或 按批量大小 (The buffer.memory controls the total amount of memory available to the producer for buffering) 持续发送消息。但是,flush() 总是好的,以确保您发送所有消息。
有关详细信息,请参阅此 link。
让我先试着说出 flush()
和 acks
之间的区别,然后再回答这两个问题。
flush()
- 这是一种在生产者中调用的方法,用于将消息从生产者端维护的缓冲区(可配置)推送到代理。您可以调用此方法或 close()
将消息从生产者缓冲区发送到代理。如果生产者可用的缓冲区内存已满(如 Manoj 在其回答中所述),则会自动调用此方法。
然而,acks=ALL
是代理的责任,即在消息根据生产者中请求的设置同步复制到其他代理后,将确认发送回生产者。您将使用此设置来调整您的消息传递语义。在这种情况下,一旦消息被复制到指定的 in-sync 个副本,代理就会向生产者发送确认消息 - "I got your messages".
现在,关于您的问题,即使用 acks 设置调用 flush 是否有任何意义,以及生产者是否会在发送到代理之前等待调用 flush。
好吧,生产者的异步特性将确保生产者不等待。但是,如果您显式调用 flush()
或者如果它自己被调用,那么任何进一步的发送都将被阻止,直到生产者从代理获得确认。所以,这两者之间的关系非常微妙。
希望对您有所帮助!
正在使用 "acks=all" 配置创建 Kafka Producer。
用上面的配置调用 flush 有什么意义吗?
是否会在发送到代理之前等待刷新被调用。
AS
acks=all This means the leader will wait for the full set of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee. This is equivalent to the acks=-1 setting.
冲洗():
Invoking this method makes all buffered records immediately available to send (even if linger_ms is greater than 0) and blocks on the completion of the requests associated with these records. The post-condition of flush() is that any previously sent record will have completed (e.g. Future.is_done() == True). A request is considered completed when either it is successfully acknowledged according to the ‘acks’ configuration for the producer, or it results in an error.
Other threads can continue sending messages while one thread is blocked waiting for a flush call to complete; however, no guarantee is made about the completion of messages sent after the flush call begins.
flush() 仍然会阻塞客户端应用程序,直到所有消息发送完毕,即使 ack=0。唯一的就是它不会等待ack,block只会等到buffer被发送出去。
带有 ack=all 的 flush() 保证消息已发送并已在具有所需复制因子的集群上复制。
最后,回答你的问题:它会在发送到代理之前等待刷新被调用吗?
答:不一定。生产者以 的间隔或 按批量大小 (The buffer.memory controls the total amount of memory available to the producer for buffering) 持续发送消息。但是,flush() 总是好的,以确保您发送所有消息。
有关详细信息,请参阅此 link。
让我先试着说出 flush()
和 acks
之间的区别,然后再回答这两个问题。
flush()
- 这是一种在生产者中调用的方法,用于将消息从生产者端维护的缓冲区(可配置)推送到代理。您可以调用此方法或 close()
将消息从生产者缓冲区发送到代理。如果生产者可用的缓冲区内存已满(如 Manoj 在其回答中所述),则会自动调用此方法。
acks=ALL
是代理的责任,即在消息根据生产者中请求的设置同步复制到其他代理后,将确认发送回生产者。您将使用此设置来调整您的消息传递语义。在这种情况下,一旦消息被复制到指定的 in-sync 个副本,代理就会向生产者发送确认消息 - "I got your messages".
现在,关于您的问题,即使用 acks 设置调用 flush 是否有任何意义,以及生产者是否会在发送到代理之前等待调用 flush。
好吧,生产者的异步特性将确保生产者不等待。但是,如果您显式调用 flush()
或者如果它自己被调用,那么任何进一步的发送都将被阻止,直到生产者从代理获得确认。所以,这两者之间的关系非常微妙。
希望对您有所帮助!