WaitGroup 在之前的 Wait 未知原因之前被重用

WaitGroup is reused before previous Wait unknown reason

我使用以下代码,但不知道为什么会崩溃并出现错误 (WaitGroup 在上一个 Wait 之前被重用) 在行:

for _, proxy := range proxies {
                    wgGroup.Wait()

我想确保在调用 proxySource.GetProxies()proxyProvider.receivingProxyBC.In() <- proxy 不允许 remoteSources 调用 proxyProvider.receivingProxyBC.In() <- proxy

详细代码在这里:

    wgGroup := sync.WaitGroup{}
    wgGroup.Add(len(localSources))
    for _, proxySource := range localSources {
        go func(proxySource *ProxySource) {
            lastTimeGet := time.Now()
            firstTimeLoad := true
            wgGroup.Done()
            for {
                currentTimeGet := time.Now()
                totalProxy := proxyProvider.receivingProxyBC.Len() + proxyProvider.workingProxyBC.Len()
                if totalProxy > 200 && currentTimeGet.Sub(lastTimeGet) < DURATION_FORCE_UPDATE && !firstTimeLoad {
                    time.Sleep(proxySource.WatchWait)
                    continue
                }
                firstTimeLoad = false
                wgGroup.Add(1)
                proxies, err := proxySource.GetProxies()
                wgGroup.Done()
                LogInfo("Get proxy from source ", proxySource.Id)
                if err != nil {
                    time.Sleep(5 * time.Second)
                    continue
                }
                wgGroup.Add(1)
                for _, proxy := range proxies {
                    proxyProvider.receivingProxyBC.In() <- proxy
                }
                wgGroup.Done()
                lastTimeGet = time.Now()
                time.Sleep(20 * time.Second)
            }
        }(proxySource)
    }
    for _, proxySource := range remoteSources {
        go func(proxySource *ProxySource) {
            time.Sleep(2 * time.Second)
            lastTimeGet := time.Now()
            firstTimeLoad := true
            for {
                currentTimeGet := time.Now()
                totalProxy := proxyProvider.receivingProxyBC.Len() + proxyProvider.workingProxyBC.Len()
                if totalProxy > 100 && currentTimeGet.Sub(lastTimeGet) < DURATION_FORCE_UPDATE && !firstTimeLoad {
                    time.Sleep(proxySource.WatchWait)
                    continue
                }
                firstTimeLoad = false
                proxies, err := proxySource.GetProxies()
                if err != nil {
                    time.Sleep(5 * time.Second)
                    continue
                }
                for _, proxy := range proxies {
                    wgGroup.Wait()
                    proxyProvider.receivingProxyBC.In() <- proxy
                }
                lastTimeGet = time.Now()
                time.Sleep(20 * time.Second)
            }
        }(proxySource)
    }

更新 RWLOCK

使用这些代码我可以锁定 localSources 但它似乎没有优化;我需要当任何 localSources 得到然后锁定所有 remoteSources;当没有 localSources 获取时,允许所有 remoteSources 获取。目前只允许同时获得一个remoteSources

wgGroup := sync.WaitGroup{}
wgGroup.Add(len(localSources))
localGroupRwLock := sync.RWMutex{}
for _, proxySource := range localSources {
  go func(proxySource *ProxySource) {
    lastTimeGet := time.Now()
    firstTimeLoad := true
    wgGroup.Done()
    for {
      currentTimeGet := time.Now()
      totalProxy := proxyProvider.receivingProxyBC.Len() + proxyProvider.workingProxyBC.Len()
      LogInfo("Total proxies ", totalProxy)
      if totalProxy > 100 && currentTimeGet.Sub(lastTimeGet) < DURATION_FORCE_UPDATE && !firstTimeLoad {
        LogInfo("Enough proxy & proxy are not new sleep ", proxySource.Id, " for ", proxySource.WatchWait.Seconds())
        time.Sleep(proxySource.WatchWait)
        continue
      }
      firstTimeLoad = false
      LogInfo("Not enough proxy or proxies are new ", proxySource.Id)
      localGroupRwLock.RLock()
      proxies, err := proxySource.GetProxies()
      localGroupRwLock.RUnlock()
      LogInfo("Get proxy from source ", proxySource.Id)
      if err != nil {
        LogError("Error when get proxies from ", proxySource.Id)
        time.Sleep(5 * time.Second)
        continue
      }
      LogInfo("Add proxy from source ", proxySource.Id)
      localGroupRwLock.RLock()
      for _, proxy := range proxies {
        proxyProvider.receivingProxyBC.In() <- proxy
      }
      localGroupRwLock.RUnlock()
      LogInfo("Done add proxy from source ", proxySource.Id)
      //LogInfo("Gotten proxy source ", proxySource.Id, " done now sleep ", proxySource.Cooldown.String())
      lastTimeGet = time.Now()
      time.Sleep(20 * time.Second) // 20 seconds for loading new proxies
      LogInfo("Watch for proxy source", proxySource.Id)
    }
  }(proxySource)
}
for _, proxySource := range remoteSources {
  go func(proxySource *ProxySource) {
    time.Sleep(2 * time.Second)
    lastTimeGet := time.Now()
    firstTimeLoad := true
    for {
      currentTimeGet := time.Now()
      totalProxy := proxyProvider.receivingProxyBC.Len() + proxyProvider.workingProxyBC.Len()
      LogInfo("Total proxies ", totalProxy)
      if totalProxy > 100 && currentTimeGet.Sub(lastTimeGet) < DURATION_FORCE_UPDATE && !firstTimeLoad {
        LogInfo("Enough proxy & proxy are not new sleep ", proxySource.Id, " for ", proxySource.WatchWait.Seconds())
        time.Sleep(proxySource.WatchWait)
        continue
      }
      firstTimeLoad = false
      LogInfo("Not enough proxy or proxies are new ", proxySource.Id)
      LogInfo("Get proxy from source ", proxySource.Id)
      localGroupRwLock.Lock()
      proxies, err := proxySource.GetProxies()
      localGroupRwLock.Unlock()
      if err != nil {
        LogError("Error when get proxies from ", proxySource.Id)
        time.Sleep(5 * time.Second)
        continue
      }
      LogInfo("Add proxy from source ", proxySource.Id)
      wgGroup.Wait()
      localGroupRwLock.Lock()
      for _, proxy := range proxies {
        proxyProvider.receivingProxyBC.In() <- proxy
      }
      localGroupRwLock.Unlock()
      LogInfo("Done add proxy from source ", proxySource.Id)
      //LogInfo("Gotten proxy source ", proxySource.Id, " done now sleep ", proxySource.Cooldown.String())
      lastTimeGet = time.Now()
      time.Sleep(20 * time.Second) // 20 seconds for loading new proxies
      LogInfo("Watch for proxy source", proxySource.Id)
    }
  }(proxySource)
}

来自 document:

A WaitGroup waits for a collection of goroutines to finish. The main goroutine calls Add to set the number of goroutines to wait for. Then each of the goroutines runs and calls Done when finished. At the same time, Wait can be used to block until all goroutines have finished.

对于Wait()

Wait blocks until the WaitGroup counter is zero.

你也可以在那里看到例子。问题是,WaitGroup 用于阻塞,直到计数器变为零。所以在原始代码中,假设没有运行时错误,第二个 for 循环中的每个 goroutine 都会阻塞,直到第一个 goroutines 完成。在第一部分,Add(1)Done() 根本不会阻塞。数据竞争将继续存在。

错误记录在 Add() 方法中: 添加向 WaitGroup 计数器添加可能为负的增量。如果计数器变为零,则释放所有阻塞在 Wait 上的 goroutine。如果计数器变为负值,添加恐慌。

Note that calls with a positive delta that occur when the counter is zero must happen before a Wait. Calls with a negative delta, or calls with a positive delta that start when the counter is greater than zero, may happen at any time. Typically this means the calls to Add should execute before the statement creating the goroutine or other event to be waited for. If a WaitGroup is reused to wait for several independent sets of events, new Add calls must happen after all previous Wait calls have returned. See the WaitGroup example.

但是,您也不是在等待 独立 组组。

适合您的代码的工具是 sync.Mutex。文件,再次:

A Mutex is a mutual exclusion lock. The zero value for a Mutex is an unlocked mutex.

A Mutex must not be copied after first use.

type Mutex struct { // contains filtered or unexported fields }

func (*Mutex) Lock

func (m *Mutex) Lock()

Lock locks m. If the lock is already in use, the calling goroutine blocks until the mutex is available. func (*Mutex) Unlock

func (m *Mutex) Unlock()

Unlock unlocks m. It is a run-time error if m is not locked on entry to Unlock.

所以正如您所描述的,您想要 "pause the calling of proxyProvider.receivingProxyBC.In() <- proxy when proxySource.GetProxies() or for _, proxy := range proxies is called"。暂停用术语 block 更好地描述,这是教科书上的互斥锁问题:用锁保护所有三个 "calls"(因为 for 循环不是调用)并且它是完成。

关于如何用互斥量保护 for 循环可能有点棘手,它应该是这样的:

lock.Lock
for ... {
    lock.Unlock()
    ...
    lock.Lock()
}

所以我更改了您的代码,希望它能按预期工作:

lock := sync.Mutex{}
lock.Lock()
for _, proxySource := range localSources {
    lock.Unlock()
    go func(proxySource *ProxySource) {
        lock.Lock()
        lastTimeGet := time.Now()
        firstTimeLoad := true
        lock.Unlock()
        for {
            currentTimeGet := time.Now()
            totalProxy := proxyProvider.receivingProxyBC.Len() + proxyProvider.workingProxyBC.Len()
            if totalProxy > 200 && currentTimeGet.Sub(lastTimeGet) < DURATION_FORCE_UPDATE && !firstTimeLoad {
                time.Sleep(proxySource.WatchWait)
                continue
            }
            firstTimeLoad = false
            lock.Lock()
            proxies, err := proxySource.GetProxies()
            lock.Unlock()
            LogInfo("Get proxy from source ", proxySource.Id)
            if err != nil {
                time.Sleep(5 * time.Second)
                continue
            }
            lock.Lock()
            for _, proxy := range proxies {
                proxyProvider.receivingProxyBC.In() <- proxy
            }
            lock.Unlock()
            lastTimeGet = time.Now()
            time.Sleep(20 * time.Second)
        }
    }(proxySource)
    lock.Lock()
}
for _, proxySource := range remoteSources {
    go func(proxySource *ProxySource) {
        time.Sleep(2 * time.Second)
        lastTimeGet := time.Now()
        firstTimeLoad := true
        for {
            currentTimeGet := time.Now()
            totalProxy := proxyProvider.receivingProxyBC.Len() + proxyProvider.workingProxyBC.Len()
            if totalProxy > 100 && currentTimeGet.Sub(lastTimeGet) < DURATION_FORCE_UPDATE && !firstTimeLoad {
                time.Sleep(proxySource.WatchWait)
                continue
            }
            firstTimeLoad = false
            proxies, err := proxySource.GetProxies()
            if err != nil {
                time.Sleep(5 * time.Second)
                continue
            }
            for _, proxy := range proxies {
                lock.Lock()
                proxyProvider.receivingProxyBC.In() <- proxy
                lock.Unlock()
            }
            lastTimeGet = time.Now()
            time.Sleep(20 * time.Second)
        }
    }(proxySource)
}

注意 1:您可能会想使用 defer。不。 defer 用于函数,而不是块。

注2:在golang中使用mutex时,经常会出现设计上的问题。人们应该总是看看使用通道是否更好并重构代码,尽管在许多情况下互斥锁并不是一个坏主意。但在这里我看不到关于设计的任何信息,所以我就让它去吧。

注3:代码实际上存在暂停proxySource.GetProxies()和调用proxyProvider.receivingProxyBC.In() <- proxy时的for循环的问题。这是否需要取决于。如果不需要,你应该看看sync.RWMutex,并根据它进行更改。就交给你了。