Alpakka 消费者不使用来自 Kafka 运行 通过 Docker 撰写的消息
Alpakka Consumer not consuming messages from Kafka running via Docker compose
我通过 Docker 编写了 Kafka 和 Zookeeper 运行。我能够使用 Kafka 终端向主题发送 send/consume 消息,并且能够通过 Conduktor 监控所有内容。但不幸的是,我无法使用 Alpakka 连接器通过我的 Scala 应用程序使用消息。该应用程序连接到该主题,但每当我向该主题发送消息时,都没有任何反应。
只有 Kafka 和 Zookeeper 是 运行 通过 docker-compose。我直接在主机中 运行 Scala 消费者应用程序。
Docker撰写
version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
HOSTNAME_COMMAND: "route -n | awk '/UG[ \t]/{print $}'"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
depends_on:
- zookeeper
Scala 应用程序
object Main extends App {
implicit val actorSystem = ActorSystem()
import actorSystem.dispatcher
val kafkaConsumerSettings = ConsumerSettings(actorSystem, new StringDeserializer, new StringDeserializer)
.withGroupId("new_id")
.withCommitRefreshInterval(1.seconds)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
.withBootstrapServers("localhost:9092")
Consumer
.plainSource(kafkaConsumerSettings, Subscriptions.topics("test1"))
.map(msg => msg.value())
.runWith(Sink.foreach(println)).onComplete {
case Failure(exception) => exception.printStackTrace()
case Success(value) => println("done")
}
}
应用程序 - 控制台输出
16:58:33.877 INFO [akka.event.slf4j.Slf4jLogger] Slf4jLogger started
16:58:34.470 INFO [akka.kafka.internal.SingleSourceLogic] [1955f] Starting. StageActor Actor[akka://default/system/Materializers/StreamSupervisor-0/$$a#-591284224]
16:58:34.516 INFO [org.apache.kafka.clients.consumer.ConsumerConfig] ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.dns.lookup = default
client.id =
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = novo_id
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
16:58:34.701 INFO [org.apache.kafka.common.utils.AppInfoParser] Kafka version: 2.4.0
16:58:34.702 INFO [org.apache.kafka.common.utils.AppInfoParser] Kafka commitId: 77a89fcf8d7fa018
16:58:34.702 INFO [org.apache.kafka.common.utils.AppInfoParser] Kafka startTimeMs: 1585256314699
16:58:34.715 INFO [org.apache.kafka.clients.consumer.KafkaConsumer] [Consumer clientId=consumer-novo_id-1, groupId=novo_id] Subscribed to topic(s): test1
16:58:35.308 INFO [org.apache.kafka.clients.Metadata] [Consumer clientId=consumer-novo_id-1, groupId=novo_id] Cluster ID: c2XBuDIJTI-gBs9guTvG
Describes how the host name that is advertised and can be reached by
clients. The value is published to ZooKeeper for clients to use.
If using the SSL
or SASL
protocol, the endpoint value must specify the
protocols in the following formats:
SSL: SSL://
or SASL_SSL://
SASL: SASL_PLAINTEXT://
or SASL_SSL://
KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:29092
现在您的消费者可以使用端口 29092
:
.withBootstrapServers("localhost:29092")
我通过 Docker 编写了 Kafka 和 Zookeeper 运行。我能够使用 Kafka 终端向主题发送 send/consume 消息,并且能够通过 Conduktor 监控所有内容。但不幸的是,我无法使用 Alpakka 连接器通过我的 Scala 应用程序使用消息。该应用程序连接到该主题,但每当我向该主题发送消息时,都没有任何反应。
只有 Kafka 和 Zookeeper 是 运行 通过 docker-compose。我直接在主机中 运行 Scala 消费者应用程序。
Docker撰写
version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
HOSTNAME_COMMAND: "route -n | awk '/UG[ \t]/{print $}'"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
depends_on:
- zookeeper
Scala 应用程序
object Main extends App {
implicit val actorSystem = ActorSystem()
import actorSystem.dispatcher
val kafkaConsumerSettings = ConsumerSettings(actorSystem, new StringDeserializer, new StringDeserializer)
.withGroupId("new_id")
.withCommitRefreshInterval(1.seconds)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
.withBootstrapServers("localhost:9092")
Consumer
.plainSource(kafkaConsumerSettings, Subscriptions.topics("test1"))
.map(msg => msg.value())
.runWith(Sink.foreach(println)).onComplete {
case Failure(exception) => exception.printStackTrace()
case Success(value) => println("done")
}
}
应用程序 - 控制台输出
16:58:33.877 INFO [akka.event.slf4j.Slf4jLogger] Slf4jLogger started
16:58:34.470 INFO [akka.kafka.internal.SingleSourceLogic] [1955f] Starting. StageActor Actor[akka://default/system/Materializers/StreamSupervisor-0/$$a#-591284224]
16:58:34.516 INFO [org.apache.kafka.clients.consumer.ConsumerConfig] ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.dns.lookup = default
client.id =
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = novo_id
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
16:58:34.701 INFO [org.apache.kafka.common.utils.AppInfoParser] Kafka version: 2.4.0
16:58:34.702 INFO [org.apache.kafka.common.utils.AppInfoParser] Kafka commitId: 77a89fcf8d7fa018
16:58:34.702 INFO [org.apache.kafka.common.utils.AppInfoParser] Kafka startTimeMs: 1585256314699
16:58:34.715 INFO [org.apache.kafka.clients.consumer.KafkaConsumer] [Consumer clientId=consumer-novo_id-1, groupId=novo_id] Subscribed to topic(s): test1
16:58:35.308 INFO [org.apache.kafka.clients.Metadata] [Consumer clientId=consumer-novo_id-1, groupId=novo_id] Cluster ID: c2XBuDIJTI-gBs9guTvG
Describes how the host name that is advertised and can be reached by clients. The value is published to ZooKeeper for clients to use.
If using the
SSL
orSASL
protocol, the endpoint value must specify the protocols in the following formats:
SSL:
SSL://
orSASL_SSL://
SASL:
SASL_PLAINTEXT://
orSASL_SSL://
KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:29092
现在您的消费者可以使用端口 29092
:
.withBootstrapServers("localhost:29092")