并发地在 Goroutine 之间分配任务

Dividing tasks among Goroutines concurrently

此代码的作用

代码从 Postgresql 数据库获取数据。从所有数据中,只有两个字段(会话和文本)被添加到 Task Struct

我的数据库中只有 2 个(每个)提到的数据,这意味着在执行 len(task) 时将 return 我 2 作为输出。

从现在开始就是问题所在了:

我创建了一个 buffered channel ch,其长度等于任务结构的长度(在本例中为 2)。

我指定了允许的最大工人(线程)数,这里是 20。

下面的代码所做的是当我将任务发送到通道时发送 Task struct 中的所有元素(此处为 2),任务结构中的示例代码将打印所有元素两次(=任务结构的长度)。示例在最后显示。

我需要这个程序做什么

例如通道len(task) = 100中有100条数据。我想将这100个数据分成20个Goroutines,每个Goroutines处理5个数据(我不知道这是否可行,如果无效请提供其他解决方案)。

所以这 100 个数据将提供给 20 个工作人员,他们每个人将接收 5 个数据,并与他们一起 运行 任务,最后通道将关闭,仅此而已。

当数据库变大时这将很有帮助,目前也是如此。

每个 20 个 Worker 执行任务或使 Worker 数量等于通道中的数据数量哪个更好?

var wg sync.WaitGroup

type Task struct {
    FetchedSession string
    FetchedText    string
}

func FetchAllData() {

    var task []Task

    //Fetch Session from DB
    var sess []database.UserSession
    database.DB.Find(&sess)
    //Fetch CommentText from DB
    var cmt []database.CommentReq
    database.DB.Find(&cmt)

    if len(sess) == len(cmt) {
        for i := range sess {
            task = append(task, Task{FetchedSession: sess[i].Session, FetchedText: cmt[i].CommentText})
        }
    }

    //making the Task Channel
    ch := make(chan []Task, len(task))

    MAX_WORKERS := 20

    wg.Add(MAX_WORKERS)

    for i := 0; i < MAX_WORKERS; i++ {
        go func() {
            for {
                t, ok := <-ch
                if !ok {
                    wg.Done()
                    return
                }
                DoTasks(t)
            }
        }()
    }

    for i := 0; i < len(task); i++ {
        ch <- task
    }

    close(ch)
    wg.Wait()
}

//Since Total number of data in Database is 2 (rows)
//Currently this function takes all data from the channel and runs Twice
func DoTasks(t []Task) {

    //Total tasks (data) = 100
    //If Max Workers = 20, then this function will run 5 times
    //Each Goroutine will get 4 tasks from the channel
    // Get the FetchedSession and FetchedTask and do tasks

    fmt.Println(t) // This prints all data twice

    //Finish one task and continue with the second
}

示例:

Example Data:
Task{FetchedSession: "EncodedString",  FetchedText: "Hello"}
Task{FetchedSession: "ExampleString",  FetchedText: "Hi"}
//Output
EncodedString
Hello
ExampleString
Hi
EncodedString
Hello
ExampleString
Hi
  • 更改任务频道类型。
ch := make(chan Task, len(task))

这意味着通道上传递的每个值代表一个单个任务。

  • 简化您的渠道迭代
    for i := 0; i < MAX_WORKERS; i++ {
        go func() {
            defer wg.Done()
            for t := range ch {
                DoTask(t)
            }
        }()
    }
当工人退出时,

wg.Done() 现在将是 运行。 range ch通道关闭,所有任务消耗完后停止

  • 更改“执行”功能以匹配
func DoTask(t Task) {

关于如何选择工人人数:

Run some benchmarks 用于您的 FetchAllData 函数,并尝试更改 MAX_WORKERS(或将其作为参数传递)。最佳价值将取决于任务,以及 运行 启用该功能时的可用资源,这意味着今天您机器上的最佳价值可能不是其他人机器上的最佳价值,也可能不是您明天机器上的最佳价值。基准应该可以帮助您找到合适的近似范围来放入值。