AIOKafka:以前的工作代码现在在 send_and_wait 处失败
AIOKafka: Formerly working code now fails at send_and_wait
我已经使用 AIOKafka 一段时间了,直到今天我都没有遇到任何问题。
St运行ge TypeError
在我尝试使用 AIOKafkaProducer.send_and_wait
发送消息时出现。我还在 AIOKafka 的 github 存储库中将此问题作为问题发布,但看起来它们有点不活跃。也许这里有人可以帮助我。
代码如下:
import asyncio
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
loop = asyncio.get_event_loop()
producer = AIOKafkaProducer(loop=loop, bootstrap_servers="localhost:9092")
async def _initialize(prod, future):
await prod.start()
await prod.send_and_wait("main_topic", str.encode("hello!!"))
future = asyncio.Future()
task = asyncio.ensure_future(_initialize(producer, future))
loop.run_until_complete(task)
print("loop ended!")
loop.close()
这是我收到的错误消息:
yilmazali@yilmazali:~$ python3 aiokafkatest.py
Unexpected error in sender routine
Traceback (most recent call last):
File "/usr/local/lib/python3.6/dist-packages/aiokafka/producer/producer.py", line 374, in _sender_routine
task.result()
File "/usr/local/lib/python3.6/dist-packages/aiokafka/producer/producer.py", line 418, in _send_produce_req
response = yield from self.client.send(node_id, request)
File "/usr/local/lib/python3.6/dist-packages/aiokafka/client.py", line 415, in send
request, expect_response=expect_response)
File "/usr/local/lib/python3.6/dist-packages/aiokafka/conn.py", line 165, in send
message = header.encode() + request.encode()
File "/usr/local/lib/python3.6/dist-packages/kafka/util.py", line 159, in __call__
return self.method()(self.target(), *args, **kwargs)
File "/usr/local/lib/python3.6/dist-packages/kafka/protocol/struct.py", line 42, in _encode_self
[self.__dict__[name] for name in self.SCHEMA.names]
File "/usr/local/lib/python3.6/dist-packages/kafka/protocol/types.py", line 132, in encode
for i, field in enumerate(self.fields)
File "/usr/local/lib/python3.6/dist-packages/kafka/protocol/types.py", line 132, in <listcomp>
for i, field in enumerate(self.fields)
File "/usr/local/lib/python3.6/dist-packages/kafka/protocol/types.py", line 170, in encode
[self.array_of.encode(item) for item in items]
File "/usr/local/lib/python3.6/dist-packages/kafka/protocol/types.py", line 170, in <listcomp>
[self.array_of.encode(item) for item in items]
File "/usr/local/lib/python3.6/dist-packages/kafka/protocol/types.py", line 132, in encode
for i, field in enumerate(self.fields)
File "/usr/local/lib/python3.6/dist-packages/kafka/protocol/types.py", line 132, in <listcomp>
for i, field in enumerate(self.fields)
File "/usr/local/lib/python3.6/dist-packages/kafka/protocol/types.py", line 170, in encode
[self.array_of.encode(item) for item in items]
File "/usr/local/lib/python3.6/dist-packages/kafka/protocol/types.py", line 170, in <listcomp>
[self.array_of.encode(item) for item in items]
File "/usr/local/lib/python3.6/dist-packages/kafka/protocol/types.py", line 132, in encode
for i, field in enumerate(self.fields)
File "/usr/local/lib/python3.6/dist-packages/kafka/protocol/types.py", line 132, in <listcomp>
for i, field in enumerate(self.fields)
File "/usr/local/lib/python3.6/dist-packages/kafka/protocol/types.py", line 93, in encode
return Int32.encode(len(value)) + value
TypeError: object of type '_io.BytesIO' has no len()
我没有更改我的 kafka 结构或库。我的卡夫卡经纪人看起来不错。我可以使用 shell 脚本 produce/consume 消息。
在过去的 2-3 个月里,我对 AIOKafka 没有任何问题,上面的代码运行良好。莫名其妙地出现了这个错误,我想知道问题出在哪里。
任何帮助将不胜感激。
谨致问候,
阿里
--
更新:我们运行朋友电脑上的这段代码,运行良好。我对外宣传我的kafka,她用上面的代码成功写到我本地的kafka主题。两台机器上的 AIOKafka 库版本都是 0.4.0。两台机器上的 asyncio 版本都是 3.4.3。简而言之,问题不在于我的 kafka 或图书馆。我的机器出了点问题,但天知道具体是什么原因造成的。
终于在我的机器上工作了。我只是卸载并安装了 aiokafka
模块。
虽然我对这个解决方案并不满意,我想更深入地探讨问题的核心,但我很高兴我现在可以继续我的工作。
希望这对遇到同样问题的陌生人有所帮助。
可能是您已经将 kafka-python 更新到高于 1.3.5 的版本。我这样做了,aiokafka 开始失败了。我在 kafka-python 1.3.5 返回,看起来还可以
我已经使用 AIOKafka 一段时间了,直到今天我都没有遇到任何问题。
St运行ge TypeError
在我尝试使用 AIOKafkaProducer.send_and_wait
发送消息时出现。我还在 AIOKafka 的 github 存储库中将此问题作为问题发布,但看起来它们有点不活跃。也许这里有人可以帮助我。
代码如下:
import asyncio
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
loop = asyncio.get_event_loop()
producer = AIOKafkaProducer(loop=loop, bootstrap_servers="localhost:9092")
async def _initialize(prod, future):
await prod.start()
await prod.send_and_wait("main_topic", str.encode("hello!!"))
future = asyncio.Future()
task = asyncio.ensure_future(_initialize(producer, future))
loop.run_until_complete(task)
print("loop ended!")
loop.close()
这是我收到的错误消息:
yilmazali@yilmazali:~$ python3 aiokafkatest.py
Unexpected error in sender routine
Traceback (most recent call last):
File "/usr/local/lib/python3.6/dist-packages/aiokafka/producer/producer.py", line 374, in _sender_routine
task.result()
File "/usr/local/lib/python3.6/dist-packages/aiokafka/producer/producer.py", line 418, in _send_produce_req
response = yield from self.client.send(node_id, request)
File "/usr/local/lib/python3.6/dist-packages/aiokafka/client.py", line 415, in send
request, expect_response=expect_response)
File "/usr/local/lib/python3.6/dist-packages/aiokafka/conn.py", line 165, in send
message = header.encode() + request.encode()
File "/usr/local/lib/python3.6/dist-packages/kafka/util.py", line 159, in __call__
return self.method()(self.target(), *args, **kwargs)
File "/usr/local/lib/python3.6/dist-packages/kafka/protocol/struct.py", line 42, in _encode_self
[self.__dict__[name] for name in self.SCHEMA.names]
File "/usr/local/lib/python3.6/dist-packages/kafka/protocol/types.py", line 132, in encode
for i, field in enumerate(self.fields)
File "/usr/local/lib/python3.6/dist-packages/kafka/protocol/types.py", line 132, in <listcomp>
for i, field in enumerate(self.fields)
File "/usr/local/lib/python3.6/dist-packages/kafka/protocol/types.py", line 170, in encode
[self.array_of.encode(item) for item in items]
File "/usr/local/lib/python3.6/dist-packages/kafka/protocol/types.py", line 170, in <listcomp>
[self.array_of.encode(item) for item in items]
File "/usr/local/lib/python3.6/dist-packages/kafka/protocol/types.py", line 132, in encode
for i, field in enumerate(self.fields)
File "/usr/local/lib/python3.6/dist-packages/kafka/protocol/types.py", line 132, in <listcomp>
for i, field in enumerate(self.fields)
File "/usr/local/lib/python3.6/dist-packages/kafka/protocol/types.py", line 170, in encode
[self.array_of.encode(item) for item in items]
File "/usr/local/lib/python3.6/dist-packages/kafka/protocol/types.py", line 170, in <listcomp>
[self.array_of.encode(item) for item in items]
File "/usr/local/lib/python3.6/dist-packages/kafka/protocol/types.py", line 132, in encode
for i, field in enumerate(self.fields)
File "/usr/local/lib/python3.6/dist-packages/kafka/protocol/types.py", line 132, in <listcomp>
for i, field in enumerate(self.fields)
File "/usr/local/lib/python3.6/dist-packages/kafka/protocol/types.py", line 93, in encode
return Int32.encode(len(value)) + value
TypeError: object of type '_io.BytesIO' has no len()
我没有更改我的 kafka 结构或库。我的卡夫卡经纪人看起来不错。我可以使用 shell 脚本 produce/consume 消息。
在过去的 2-3 个月里,我对 AIOKafka 没有任何问题,上面的代码运行良好。莫名其妙地出现了这个错误,我想知道问题出在哪里。
任何帮助将不胜感激。
谨致问候,
阿里
--
更新:我们运行朋友电脑上的这段代码,运行良好。我对外宣传我的kafka,她用上面的代码成功写到我本地的kafka主题。两台机器上的 AIOKafka 库版本都是 0.4.0。两台机器上的 asyncio 版本都是 3.4.3。简而言之,问题不在于我的 kafka 或图书馆。我的机器出了点问题,但天知道具体是什么原因造成的。
终于在我的机器上工作了。我只是卸载并安装了 aiokafka
模块。
虽然我对这个解决方案并不满意,我想更深入地探讨问题的核心,但我很高兴我现在可以继续我的工作。
希望这对遇到同样问题的陌生人有所帮助。
可能是您已经将 kafka-python 更新到高于 1.3.5 的版本。我这样做了,aiokafka 开始失败了。我在 kafka-python 1.3.5 返回,看起来还可以