Plotly Dash/Apache Kafka - 我希望看到实时绘制的图表,而不是在完成绘制后出现
Plotly Dash/ Apache Kafka - I would like to see a graph plotting live rather than appearing when finished plotting
我已经在 plotly 中查看了流图的实时更新,但这并不是我想要做的。我有一个图形组件,它在单击按钮组件时被填充。在更新图形的回调中,创建了一个对象,并使用 apache kafka streaming producer 从 class returns 15 个值的流调用了一个方法。然后我在回调中有 apache kafka 消费者,它一个一个地接收值。但是,我也试图让这些值也一一出现在我的破折号图中。我尝试使用间隔组件,但这似乎只是每秒调用一次函数,这意味着脚本在调用函数时卡住了,什么也没做。
如有任何帮助,我们将不胜感激,如有必要,可以提供更多详细信息。
消费者循环是无限的,不能为同一组记录迭代同一个消费者对象两次。您不能使用列表理解,因此您需要单独存储列表
以下内容未经测试,但显示了您需要执行的操作的大致思路
x = []
y = []
LIMIT = 15
for r in consumer_obj:
if len(x) >= LIMIT:
del x[0]
if len(y) >= LIMIT:
del y[0]
x.append(r.value['day'])
y.append(r.value['biomass'])
# TODO: Update graph
data = {"data": [
{"x": x,"y": y,
"type": "lines",
}]
这里有一个项目使用 Bokeh 而不是 Plotly - https://github.com/Aakash282/kafka-bokeh-dashboard
或者您可以只使用 Kafka Connect 写入 SQLite 数据库,让您的绘图使用它并定期更新
使用 Streams 时,您需要将使用者置于其自己的线程中。你已经将生产者放在一个线程中,我也建议不要将生产者加入主线程
你想要做的是将消费者放在一个线程中并使用队列或线程安全数据结构或锁定,队列是线程安全的,因此你可以根据需要提取数据
您还想 运行 def 或 startup 中的线程,而不是回调中的线程,每次回调中您只需要请求队列中的最新项目,并且可以设置一个时间间隔100 毫秒左右取决于 pref
下面是一个简单的例子:
while 循环将是您的回调间隔 - 我正在使用 while 作为时间
您必须将消费者设置在回调之外 - 否则您会一次又一次地构建它,将消费者置于回调范围之外并 运行 在启动时将其设置,下面我使用了 main .
from time import sleep
from json import dumps
from kafka import KafkaConsumer
from multiprocessing import Process, Queue
import os
def consumer(q):
consumer_obj = KafkaConsumer('test')
for message in consumer_obj:
q.put(message.value)
if __name__ == '__main__':
q = Queue()
p = Process(target=consumer, args=(q,))
p.start()
while True:
print(q.get())
sleep(1)
我已经在 plotly 中查看了流图的实时更新,但这并不是我想要做的。我有一个图形组件,它在单击按钮组件时被填充。在更新图形的回调中,创建了一个对象,并使用 apache kafka streaming producer 从 class returns 15 个值的流调用了一个方法。然后我在回调中有 apache kafka 消费者,它一个一个地接收值。但是,我也试图让这些值也一一出现在我的破折号图中。我尝试使用间隔组件,但这似乎只是每秒调用一次函数,这意味着脚本在调用函数时卡住了,什么也没做。
如有任何帮助,我们将不胜感激,如有必要,可以提供更多详细信息。
消费者循环是无限的,不能为同一组记录迭代同一个消费者对象两次。您不能使用列表理解,因此您需要单独存储列表
以下内容未经测试,但显示了您需要执行的操作的大致思路
x = []
y = []
LIMIT = 15
for r in consumer_obj:
if len(x) >= LIMIT:
del x[0]
if len(y) >= LIMIT:
del y[0]
x.append(r.value['day'])
y.append(r.value['biomass'])
# TODO: Update graph
data = {"data": [
{"x": x,"y": y,
"type": "lines",
}]
这里有一个项目使用 Bokeh 而不是 Plotly - https://github.com/Aakash282/kafka-bokeh-dashboard
或者您可以只使用 Kafka Connect 写入 SQLite 数据库,让您的绘图使用它并定期更新
使用 Streams 时,您需要将使用者置于其自己的线程中。你已经将生产者放在一个线程中,我也建议不要将生产者加入主线程
你想要做的是将消费者放在一个线程中并使用队列或线程安全数据结构或锁定,队列是线程安全的,因此你可以根据需要提取数据
您还想 运行 def 或 startup 中的线程,而不是回调中的线程,每次回调中您只需要请求队列中的最新项目,并且可以设置一个时间间隔100 毫秒左右取决于 pref
下面是一个简单的例子:
while 循环将是您的回调间隔 - 我正在使用 while 作为时间
您必须将消费者设置在回调之外 - 否则您会一次又一次地构建它,将消费者置于回调范围之外并 运行 在启动时将其设置,下面我使用了 main .
from time import sleep
from json import dumps
from kafka import KafkaConsumer
from multiprocessing import Process, Queue
import os
def consumer(q):
consumer_obj = KafkaConsumer('test')
for message in consumer_obj:
q.put(message.value)
if __name__ == '__main__':
q = Queue()
p = Process(target=consumer, args=(q,))
p.start()
while True:
print(q.get())
sleep(1)