Alpakka 消费者不使用来自 Kafka 运行 通过 Docker 撰写的消息

Alpakka Consumer not consuming messages from Kafka running via Docker compose

我通过 Docker 编写了 Kafka 和 Zookeeper 运行。我能够使用 Kafka 终端向主题发送 send/consume 消息,并且能够通过 Conduktor 监控所有内容。但不幸的是,我无法使用 Alpakka 连接器通过我的 Scala 应用程序使用消息。该应用程序连接到该主题,但每当我向该主题发送消息时,都没有任何反应。

只有 Kafka 和 Zookeeper 是 运行 通过 docker-compose。我直接在主机中 运行 Scala 消费者应用程序。

Docker撰写

version: '3'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      HOSTNAME_COMMAND: "route -n | awk '/UG[ \t]/{print $}'"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    depends_on:
      - zookeeper

Scala 应用程序

object Main extends App {
  implicit val actorSystem = ActorSystem()

  import actorSystem.dispatcher

  val kafkaConsumerSettings = ConsumerSettings(actorSystem, new StringDeserializer, new StringDeserializer)
    .withGroupId("new_id")
    .withCommitRefreshInterval(1.seconds)
    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
    .withBootstrapServers("localhost:9092")

  Consumer
    .plainSource(kafkaConsumerSettings, Subscriptions.topics("test1"))
    .map(msg => msg.value())
    .runWith(Sink.foreach(println)).onComplete {
    case Failure(exception) => exception.printStackTrace()
    case Success(value) => println("done")
  }
}

应用程序 - 控制台输出

16:58:33.877 INFO  [akka.event.slf4j.Slf4jLogger]                     Slf4jLogger started
16:58:34.470 INFO  [akka.kafka.internal.SingleSourceLogic]            [1955f] Starting. StageActor Actor[akka://default/system/Materializers/StreamSupervisor-0/$$a#-591284224]
16:58:34.516 INFO  [org.apache.kafka.clients.consumer.ConsumerConfig] ConsumerConfig values: 
    allow.auto.create.topics = true
    auto.commit.interval.ms = 5000
    auto.offset.reset = earliest
    bootstrap.servers = [localhost:9092]
    check.crcs = true
    client.dns.lookup = default
    client.id = 
    client.rack = 
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = novo_id
    group.instance.id = null
    heartbeat.interval.ms = 3000
    interceptor.classes = []
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

16:58:34.701 INFO  [org.apache.kafka.common.utils.AppInfoParser]      Kafka version: 2.4.0
16:58:34.702 INFO  [org.apache.kafka.common.utils.AppInfoParser]      Kafka commitId: 77a89fcf8d7fa018
16:58:34.702 INFO  [org.apache.kafka.common.utils.AppInfoParser]      Kafka startTimeMs: 1585256314699
16:58:34.715 INFO  [org.apache.kafka.clients.consumer.KafkaConsumer]  [Consumer clientId=consumer-novo_id-1, groupId=novo_id] Subscribed to topic(s): test1
16:58:35.308 INFO  [org.apache.kafka.clients.Metadata]                [Consumer clientId=consumer-novo_id-1, groupId=novo_id] Cluster ID: c2XBuDIJTI-gBs9guTvG

导出KAFKA_ADVERTISED_LISTENERS

Describes how the host name that is advertised and can be reached by clients. The value is published to ZooKeeper for clients to use.

If using the SSL or SASL protocol, the endpoint value must specify the protocols in the following formats:

  • SSL: SSL:// or SASL_SSL://

  • SASL: SASL_PLAINTEXT:// or SASL_SSL://

KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:29092

现在您的消费者可以使用端口 29092

.withBootstrapServers("localhost:29092")