Kafka 流:"TopicAuthorizationException: Not authorized to access topics" 用于内部状态存储

Kafka stream: "TopicAuthorizationException: Not authorized to access topics" for an internal state store

Java:OpenJdk 11 卡夫卡:2.2.0 Kafka 流库:2.3.0

我正在尝试在 docker 容器中部署我的 Kafka 流应用程序,但在尝试使用 TopicAuthorizationException 创建内部状态存储时它失败了。 它在当地运作良好。本地和服务器上的主要区别在于它连接到部署了 Kafka 的服务器并使用通常的 Kerberos 身份验证进行身份验证。 我无法理解身份验证与 本地商店 之间的 link。

我的流看起来像这样:

StreamsBuilder builder = new StreamsBuilder();

        //We stream from the source topic
        KStream<String, EnrichedMessagePayload> sourceMessagesStream = builder.stream(sourceTopic, Consumed
                .with(Serdes.serdeFrom(String.class), INPUT_SERDE));

        //We group per room and window
        TimeWindowedKStream<String, EnrichedMessagePayload> windowed = sourceMessagesStream
                .groupByKey().windowedBy(TimeWindows.of(Duration.ofMillis(windowSize)).grace(Duration.ZERO));

        //We make them a list
        KStream<Windowed<String>, WindowedMessages> grouped = windowed
                .aggregate(WindowedMessages::new,
                        (key, value, aggregate) -> aggregate.add(value),
                        Materialized.with(Serdes.String(), Serdes.serdeFrom(windowSerializer, windowSerializer)))
                .suppress(Suppressed.untilWindowCloses(unbounded()))
                .toStream();

        //Filter
        KStream<Windowed<String>, FilterResult> filtered = grouped
                .mapValues((readOnlyKey, value) -> filterWindow(value.getMessages()));

        //Re map to its original form
        KStream<String, OutputPayload> reduced = filtered
                .flatMap((KeyValueMapper<Windowed<String>, WindowedMessages, Iterable<KeyValue<String, OutputPayload>>>) (key, value) -> value
                        .getMessages()
                        .stream().map(payload -> new KeyValue<>(key.key(), payload))
                        .collect(toList()));


        //Target topic
        reduced.to(sinkTopic, Produced
                .with(Serdes.serdeFrom(String.class), SERDE));

        return builder.build();

它接收消息流,windows 它聚合每个 window 的所有消息,只保留带有 'Suppressed' 的列表的最后一个版本,然后 flatMaps 整个将其转发到另一个主题。

每次我遇到这种异常:

Error message was: org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [Topic authorization failed.] 2019-10-09 06:44:03.255 +0000 ERROR [filterer-d83f2f60-b2bd-40b2-a314-4b20f32918f7-StreamThread-1] [StreamThread.java:777] - stream-thread [filterer-d83f2f60-b2bd-40b2-a314-4b20f32918f7-StreamThread-1] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors: - [rapid_r-live-message-filterer-0-0-1-snapshot-10.1e842f1a-ea60-11e9-9c7d-024298932744] - [] - [] org.apache.kafka.streams.errors.StreamsException: Could not create topic filterer-KTABLE-SUPPRESS-STATE-STORE-0000000005-changelog. at org.apache.kafka.streams.processor.internals.InternalTopicManager.getNumPartitions(InternalTopicManager.java:212) at org.apache.kafka.streams.processor.internals.InternalTopicManager.validateTopics(InternalTopicManager.java:226) at org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:104) at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.prepareTopic(StreamsPartitionAssignor.java:971) at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:618) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:424) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:622) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access00(AbstractCoordinator.java:107) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:544) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:527) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:978) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:958) at org.apache.kafka.clients.consumer.internals.RequestFuture.onSuccess(RequestFuture.java:204) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:578) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:388) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:415) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:353) at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1251) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201) at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:941) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:846) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774) Caused by: org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [Topic authorization failed.]

不是"authentication"而是"authorization"。查看您的日志消息,它显示 "Not authorized to access topics"。据我所知,您无权创建支持本地抑制状态存储的内部主题 'filterer-KTABLE-SUPPRESS-STATE-STORE-0000000005-changelog'。 Kafka Streams 中包含的状态存储默认由 Kafka 代理上的主题支持。此内部主题在故障转移期间用于恢复本地状态存储。这些内部主题由 Kafka Streams 应用程序自动创建,因此应用程序需要具有适当的权限才能创建它们。

有关详细信息,请参阅 https://kafka.apache.org/23/documentation/streams/developer-guide/security.html#id1。上面写着 "the principal running the application must have the ACL set so that the application has the permissions to create, read and write internal topics."