为什么 Google PubSub 总是创建 60 个永久线程?
Why is Google PubSub always creating 60 permanent threads?
我的应用程序发布少量消息(最多每隔几秒发布 1 条)。它没有订阅。
首次使用时,PubSub 会创建 60 个永久保持活动状态的线程,如下所示:
"grpc-default-worker-ELG-1-1 Id\u003d115 RUNNABLE (in native)...
"grpc-default-worker-ELG-1-10 Id\u003d160 RUNNABLE (in native)":...
....
"Gax-16 Id\u003d141 TIMED_WAITING on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@24f8d334": ...
"Gax-17 Id\u003d142 WAITING on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@24f8d334":....
我们正在使用 Java 中的代码,如 this sample. The Publisher object is retained for the lifetime of the process, as recommended here 中那样。
60 是一个非常高的默认值。
此外,如果我 setExecutorThreadCount
到 4(代码如下),我仍然可以获得额外的 26 个永久线程。将此线程数设置为 1 或 2 仍会产生大约 20 个额外的线程。
ExecutorProvider executorProvider = InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(4).build();
Publisher.Builder builder = Publisher.newBuilder(ProjectTopicName.of(proj, topic)).setExecutorProvider(executorProvider);
我们的应用程序已经是线程繁重的,不能容忍为 PubSub 预留的超过一两个额外的线程。
如何解决?有这方面的文件吗?
对于单个订阅者,看到 60-65 个线程是正常行为。
因为每个订阅者都需要一定数量的 gRPC 流并且对于每个流我们有:
- 一个单独的线程从流中接收消息
- 一个线程池来处理回调
- 一个线程池发送ack/delay条消息
可以控制线程池,但一直存在死锁问题:
ExecutorProvider executorProvider =
InstantiatingExecutorProvider.newBuilder()
.setExecutorThreadCount(1)
.build();
Publisher publisher =
Publisher.defaultBuilder(ProjectTopicName.of(proj, topic)).setExecutorProvider(executorProvider).build();
Subscriber subscriber =
Subscriber.defaultBuilder(ProjectTopicName.of(proj, topic)).setExecutorProvider(executorProvider).build();
PubSub 团队在 GitHub here 上提供了答案。这允许 PubSub 仅使用 一个 额外线程,而不是如上所述的 20 或 60。
final ExecutorProvider fixedExecutorProvider = FixedExecutorProvider.create(
new ScheduledThreadPoolExecutor(1)); //adjust the size
Publisher publisher =
Publisher.newBuilder("topic_name")
.setExecutorProvider(fixedExecutorProvider)
.setChannelProvider(
PublisherStubSettings.defaultGrpcTransportProviderBuilder()
.setExecutorProvider(fixedExecutorProvider)
.setChannelConfigurator(
new ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder>() {
@Override
public ManagedChannelBuilder apply(
ManagedChannelBuilder managedChannelBuilder) {
NettyChannelBuilder nettyChannelBuilder =
(NettyChannelBuilder)
managedChannelBuilder.executor(
fixedExecutorProvider.getExecutor());
nettyChannelBuilder.eventLoopGroup(
new NioEventLoopGroup(1, fixedExecutorProvider.getExecutor()));
nettyChannelBuilder.channelType(
NioSocketChannel.class); // Use EPoll if available, if using EPoll update above line to use EPollEventLoopGroup
return nettyChannelBuilder;
}
})
.build())
.build();
publisher.publish(PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8("testdata")).build()).get();
我的应用程序发布少量消息(最多每隔几秒发布 1 条)。它没有订阅。
首次使用时,PubSub 会创建 60 个永久保持活动状态的线程,如下所示:
"grpc-default-worker-ELG-1-1 Id\u003d115 RUNNABLE (in native)...
"grpc-default-worker-ELG-1-10 Id\u003d160 RUNNABLE (in native)":...
....
"Gax-16 Id\u003d141 TIMED_WAITING on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@24f8d334": ...
"Gax-17 Id\u003d142 WAITING on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@24f8d334":....
我们正在使用 Java 中的代码,如 this sample. The Publisher object is retained for the lifetime of the process, as recommended here 中那样。
60 是一个非常高的默认值。
此外,如果我 setExecutorThreadCount
到 4(代码如下),我仍然可以获得额外的 26 个永久线程。将此线程数设置为 1 或 2 仍会产生大约 20 个额外的线程。
ExecutorProvider executorProvider = InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(4).build();
Publisher.Builder builder = Publisher.newBuilder(ProjectTopicName.of(proj, topic)).setExecutorProvider(executorProvider);
我们的应用程序已经是线程繁重的,不能容忍为 PubSub 预留的超过一两个额外的线程。
如何解决?有这方面的文件吗?
对于单个订阅者,看到 60-65 个线程是正常行为。
因为每个订阅者都需要一定数量的 gRPC 流并且对于每个流我们有:
- 一个单独的线程从流中接收消息
- 一个线程池来处理回调
- 一个线程池发送ack/delay条消息
可以控制线程池,但一直存在死锁问题:
ExecutorProvider executorProvider =
InstantiatingExecutorProvider.newBuilder()
.setExecutorThreadCount(1)
.build();
Publisher publisher =
Publisher.defaultBuilder(ProjectTopicName.of(proj, topic)).setExecutorProvider(executorProvider).build();
Subscriber subscriber =
Subscriber.defaultBuilder(ProjectTopicName.of(proj, topic)).setExecutorProvider(executorProvider).build();
PubSub 团队在 GitHub here 上提供了答案。这允许 PubSub 仅使用 一个 额外线程,而不是如上所述的 20 或 60。
final ExecutorProvider fixedExecutorProvider = FixedExecutorProvider.create(
new ScheduledThreadPoolExecutor(1)); //adjust the size
Publisher publisher =
Publisher.newBuilder("topic_name")
.setExecutorProvider(fixedExecutorProvider)
.setChannelProvider(
PublisherStubSettings.defaultGrpcTransportProviderBuilder()
.setExecutorProvider(fixedExecutorProvider)
.setChannelConfigurator(
new ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder>() {
@Override
public ManagedChannelBuilder apply(
ManagedChannelBuilder managedChannelBuilder) {
NettyChannelBuilder nettyChannelBuilder =
(NettyChannelBuilder)
managedChannelBuilder.executor(
fixedExecutorProvider.getExecutor());
nettyChannelBuilder.eventLoopGroup(
new NioEventLoopGroup(1, fixedExecutorProvider.getExecutor()));
nettyChannelBuilder.channelType(
NioSocketChannel.class); // Use EPoll if available, if using EPoll update above line to use EPollEventLoopGroup
return nettyChannelBuilder;
}
})
.build())
.build();
publisher.publish(PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8("testdata")).build()).get();