Kafka - 如何监视新消息
Kafka - how to watch for new messages
我有 PHP 个使用 Kafka 消息的应用程序。
问题是如何知道Kafka中有新消息?
第一个解决方案是在 PHP 中创建消费者,然后在 运行 中循环检查新消息。像这样
<?php
namespace MyAppBundle\Command;
use MyAppBundle\EventSourcing\EventSerializer\JSONEventSerializer;
use MyAppBundle\Service\EventProjectorService;
use MyAppBundle\Service\KafkaService;
use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Process\Exception\RuntimeException;
class EventCommand extends ContainerAwareCommand
{
protected function configure()
{
$this
->setName('events:fetch');
}
protected function execute(InputInterface $input, OutputInterface $output)
{
/** @var KafkaService $kafkaService */
$kafkaService = $this->getContainer()->get('store_locator.kafka_service');
/** @var EventProjectorService $eventProjector */
$eventProjector = $this->getContainer()->get('store_locator.event_projector');
while(1){
$messages = $kafkaService->fetchEvents();
foreach ($messages as $message) {
$eventProjector->aggregate($message);
}
}
$output->writeln("Finish");
}
}
但是我不喜欢...有没有其他办法?
如果没有更好的办法,如何保持呢运行ning?例如当某事失败时。
据我所知,没有比无限循环并不断检查新消息更好的方法了。一种常见的方法是让任务在一定时间或迭代次数后自行终止,然后使用 supervisord
之类的方法检测死亡并复活消费者,以防止它耗尽所有资源。
我有 PHP 个使用 Kafka 消息的应用程序。 问题是如何知道Kafka中有新消息? 第一个解决方案是在 PHP 中创建消费者,然后在 运行 中循环检查新消息。像这样
<?php
namespace MyAppBundle\Command;
use MyAppBundle\EventSourcing\EventSerializer\JSONEventSerializer;
use MyAppBundle\Service\EventProjectorService;
use MyAppBundle\Service\KafkaService;
use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Process\Exception\RuntimeException;
class EventCommand extends ContainerAwareCommand
{
protected function configure()
{
$this
->setName('events:fetch');
}
protected function execute(InputInterface $input, OutputInterface $output)
{
/** @var KafkaService $kafkaService */
$kafkaService = $this->getContainer()->get('store_locator.kafka_service');
/** @var EventProjectorService $eventProjector */
$eventProjector = $this->getContainer()->get('store_locator.event_projector');
while(1){
$messages = $kafkaService->fetchEvents();
foreach ($messages as $message) {
$eventProjector->aggregate($message);
}
}
$output->writeln("Finish");
}
}
但是我不喜欢...有没有其他办法?
如果没有更好的办法,如何保持呢运行ning?例如当某事失败时。
据我所知,没有比无限循环并不断检查新消息更好的方法了。一种常见的方法是让任务在一定时间或迭代次数后自行终止,然后使用 supervisord
之类的方法检测死亡并复活消费者,以防止它耗尽所有资源。