为什么 Google PubSub 总是创建 60 个永久线程?

Why is Google PubSub always creating 60 permanent threads?

我的应用程序发布少量消息(最多每隔几秒发布 1 条)。它没有订阅。

首次使用时,PubSub 会创建 60 个永久保持活动状态的线程,如下所示:

"grpc-default-worker-ELG-1-1 Id\u003d115 RUNNABLE (in native)...
"grpc-default-worker-ELG-1-10 Id\u003d160 RUNNABLE (in native)":...
....
"Gax-16 Id\u003d141 TIMED_WAITING on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@24f8d334": ...
"Gax-17 Id\u003d142 WAITING on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@24f8d334":....

我们正在使用 Java 中的代码,如 this sample. The Publisher object is retained for the lifetime of the process, as recommended here 中那样。

60 是一个非常高的默认值。

此外,如果我 setExecutorThreadCount 到 4(代码如下),我仍然可以获得额外的 26 个永久线程。将此线程数设置为 1 或 2 仍会产生大约 20 个额外的线程。

ExecutorProvider executorProvider = InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(4).build();
Publisher.Builder builder = Publisher.newBuilder(ProjectTopicName.of(proj, topic)).setExecutorProvider(executorProvider);

我们的应用程序已经是线程繁重的,不能容忍为 PubSub 预留的超过一两个额外的线程。

如何解决?有这方面的文件吗?

对于单个订阅者,看到 60-65 个线程是正常行为。

因为每个订阅者都需要一定数量的 gRPC 流并且对于每个流我们有:

  1. 一个单独的线程从流中接收消息
  2. 一个线程池来处理回调
  3. 一个线程池发送ack/delay条消息

可以控制线程池,但一直存在死锁问题:

ExecutorProvider executorProvider =
   InstantiatingExecutorProvider.newBuilder()
  .setExecutorThreadCount(1)
  .build();

Publisher publisher =
    Publisher.defaultBuilder(ProjectTopicName.of(proj, topic)).setExecutorProvider(executorProvider).build();

Subscriber subscriber =
    Subscriber.defaultBuilder(ProjectTopicName.of(proj, topic)).setExecutorProvider(executorProvider).build();

PubSub 团队在 GitHub here 上提供了答案。这允许 PubSub 仅使用 一个 额外线程,而不是如上所述的 20 或 60。

  final ExecutorProvider fixedExecutorProvider = FixedExecutorProvider.create(
        new ScheduledThreadPoolExecutor(1)); //adjust the size
  Publisher publisher =
        Publisher.newBuilder("topic_name")
            .setExecutorProvider(fixedExecutorProvider)
            .setChannelProvider(
                PublisherStubSettings.defaultGrpcTransportProviderBuilder()
                    .setExecutorProvider(fixedExecutorProvider)
                    .setChannelConfigurator(
                        new ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder>() {
                          @Override
                          public ManagedChannelBuilder apply(
                              ManagedChannelBuilder managedChannelBuilder) {
                            NettyChannelBuilder nettyChannelBuilder =
                                (NettyChannelBuilder)
                                    managedChannelBuilder.executor(
                                        fixedExecutorProvider.getExecutor());
                            nettyChannelBuilder.eventLoopGroup(
                                new NioEventLoopGroup(1, fixedExecutorProvider.getExecutor()));
                              nettyChannelBuilder.channelType(
                                  NioSocketChannel.class); // Use EPoll if available, if using EPoll update above line to use EPollEventLoopGroup

                            return nettyChannelBuilder;
                          }
                        })
                    .build())
            .build();
    publisher.publish(PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8("testdata")).build()).get();