使用 GCD 实现线程池

Implement a thread pool using GCD

我有一个大循环,其中包含可以并行化的计算任务。为此,我决定使用 GCD 编写一个简单的并发线程池,因为我正在 iOS.

我的线程池看起来相当简单。我将只附上 .m 个文件,足以理解我的想法:

#import "iOSThreadPool.h"

@interface iOSThreadPool()
{
    int                                     _timeout;
    int                                     _currentThreadId;
    NSMutableArray<dispatch_queue_t>        *_pool;
    NSMutableArray<dispatch_semaphore_t>    *_semaphores;
    dispatch_group_t                        _group;
}

@end

@implementation iOSThreadPool

- (instancetype)initWithSize:(int)threadsCount tasksCount:(int)tasksCount
{
    self = [super init];
    if (self) {
        _timeout = 2.0;
        _currentThreadId = 0;
        _pool = [NSMutableArray new];
        _semaphores = [NSMutableArray new];
        for (int i = 0; i < threadsCount; i++) {
            dispatch_queue_attr_t attr = dispatch_queue_attr_make_with_qos_class(DISPATCH_QUEUE_CONCURRENT, QOS_CLASS_BACKGROUND, 0);
            dispatch_queue_t queue = dispatch_queue_create([NSString stringWithFormat:@"com.workerQueue_%d", i].UTF8String, attr);
            [_pool addObject:queue];

            dispatch_semaphore_t sema = dispatch_semaphore_create(tasksCount);
            [_semaphores addObject:sema];
        }

        _group = dispatch_group_create();
    }

    return self;
}

- (void)async:(iOSThreadPoolBlock)block
{
    dispatch_group_enter(self->_group);

    __block dispatch_semaphore_t sema = _semaphores[_currentThreadId];
    dispatch_async(_pool[_currentThreadId], ^{

        dispatch_semaphore_wait(sema, dispatch_time(DISPATCH_TIME_NOW, (int64_t)(self->_timeout * NSEC_PER_SEC)));
        block();
        dispatch_semaphore_signal(sema);

        dispatch_group_leave(self->_group);
    });

    _currentThreadId = (_currentThreadId + 1) % _pool.count;
}

- (void)wait {
    dispatch_group_wait(_group, dispatch_time(DISPATCH_TIME_NOW, (int64_t)(self->_timeout * NSEC_PER_SEC)));
}

@end

所以,基本上,当我创建线程池时,我会设置线程数和信号量值。由于队列是并发的,我想限制可以并发执行的任务数,这样线程就不会被淹没。

问题是 - 无论我创建多少线程,它都不会影响性能。我想这是因为每个 Dispatch Queue 任务都在全局队列中结束,无论我有多少个队列,它们大部分时间都会将任务发送到同一个 BACKGROUND 队列。

我已经阅读了很多关于 GCD 的内容,并且在我的实践中成功地使用了很多。但是当我只想超越你可以在无数教程中找到的简单用法时,比如执行一些并行化进程以尽可能节省执行时间——我失败了。我搜索了GCD更详细的解释或更详细的高效技术,我一无所获。看起来 90% 的时间都以非常简单的方式使用它。同时听说GCD是非常非常强大的多线程框架,很明显,我只是不知道如何正确使用它。

所以我的问题是 - 这真的可以在 iOS 上启动几个并行进程吗?我应该在我的线程池中更改什么以使其高效?

注意:我下载了一个基于std::thread的C++版本的ThreadPool。如果我更改此池中的线程数,我会清楚地看到性能提升。如果某些 GCD 大师可以指出如何使用 GCD 的最大容量,我将不胜感激。

GCD 已经进行了线程池化(调度队列正在利用“工作线程”池),因此 redundant/inefficient 在其上添加另一层池化。

你说:

The thing is - no matter how much threads I'm creating, it doesn't affect the performance at all.

这可能是多种情况中的任何一种。一个常见的问题是工作单元太小。正如 Performing Loops Concurrently 所说:

You should make sure that your task code does a reasonable amount of work through each iteration. As with any block or function you dispatch to a queue, there is overhead to scheduling that code for execution. If each iteration of your loop performs only a small amount of work, the overhead of scheduling the code may outweigh the performance benefits you might achieve from dispatching it to a queue.

但是还有各种其他问题,包括低效的同步代码、缓存晃动等。没有可重现的问题示例是不可能的。虽然 QoS 也有影响,但与这些算法问题相比,它通常可以忽略不计。

你说:

Since queues are concurrent I want to limit tasks count which can be executed concurrently so thread would not be overwhelmed.

虽然您可以使用非零分派信号量或 NSOperationQueue 和一些 maxConcurrentOperationCountdispatch_apply (known as concurrentPerform for Swift users) is a “go to” solution for computationally-intensive, parallelized routines that balance workloads across CPU cores. It automatically looks at how many cores you’ve got, and distributes the loop across them, not risking an explosion in threads. And, as outlined in Improving on Loop Code 来实现此目的,但您可以尝试在平衡每个线程完成的工作量以及线程协调的固有开销。 (跨步也可以最大限度地减少缓存争用。)

我可能会建议研究 dispatch_apply 并尝试一下。如果您在这一点上仍然不清楚,只需 post 一个显示非并行例程和并行化再现的新问题,我们可以进一步提供帮助。


正如我上面所说,我认为你根本不想要这个套路。对于计算密集型例程,我更喜欢 dispatch_apply。对于我想控制并发程度的简单队列(特别是如果其中一些任务本身是异步的),我会使用 NSOperationQueue with a maxConcurrentOperationCount。但我想我会分享一些关于您的代码片段的观察结果:

  • 您实现的是队列池,而不是线程池;

  • 你说的threadsCount不是线程数,而是队列数。因此,如果您创建一个计数为 10 且 tasksCount 为 20 的池,这意味着您可能会使用 200 个线程。

  • 同样你所说的_currentThreadId也不是当前线程。这是当前队列。

  • _currentThreadId 的交互不是线程安全的。

最重要的是,GCD 有自己的线程池,因此您不应该重现该逻辑。您需要做的就是实现“不超过 threadCount”逻辑(这可以通过非零分派信号量实现)。因此,我建议将其简化为:

@interface ThreadPool()
@property (nonatomic, strong) dispatch_queue_t pool;
@property (nonatomic, strong) dispatch_queue_t scheduler;
@property (nonatomic, strong) dispatch_semaphore_t semaphore;
@end

@implementation ThreadPool

- (instancetype)initWithThreadCount:(int)threadCount {
    self = [super init];
    if (self) {
        NSString *identifier = [[NSUUID UUID] UUIDString];
        NSString *bundleIdentifier = [[NSBundle mainBundle] bundleIdentifier];

        NSString *schedulingLabel = [NSString stringWithFormat:@"%@.scheduler.%@", bundleIdentifier, identifier];
        _scheduler = dispatch_queue_create(schedulingLabel.UTF8String, DISPATCH_QUEUE_SERIAL);

        NSString *poolLabel = [NSString stringWithFormat:@"%@.pool.%@", bundleIdentifier, identifier];

        dispatch_queue_attr_t attr = dispatch_queue_attr_make_with_qos_class(DISPATCH_QUEUE_CONCURRENT, QOS_CLASS_BACKGROUND, 0);
        _pool = dispatch_queue_create(poolLabel.UTF8String, attr);

        _semaphore = dispatch_semaphore_create(threadCount);
    }

    return self;
}

- (void)async:(ThreadPoolBlock)block {
    dispatch_async(self.scheduler, ^{
        dispatch_semaphore_wait(self.semaphore, DISPATCH_TIME_FOREVER);
        dispatch_async(self.pool, ^{
            block();
            dispatch_semaphore_signal(self.semaphore);
        });
    });
}

@end

不用说,这个实现和你的一样,假设传递给 async 方法的块本身是同步的(例如,它没有启动另一个异步进程,如网络请求或其他)。我想你知道,但我只是为了完整起见才提到它。