WaitGroup 在之前的 Wait 未知原因之前被重用
WaitGroup is reused before previous Wait unknown reason
我使用以下代码,但不知道为什么会崩溃并出现错误 (WaitGroup 在上一个 Wait 之前被重用) 在行:
for _, proxy := range proxies {
wgGroup.Wait()
我想确保在调用 proxySource.GetProxies()
时 proxyProvider.receivingProxyBC.In() <- proxy
不允许 remoteSources
调用 proxyProvider.receivingProxyBC.In() <- proxy
详细代码在这里:
wgGroup := sync.WaitGroup{}
wgGroup.Add(len(localSources))
for _, proxySource := range localSources {
go func(proxySource *ProxySource) {
lastTimeGet := time.Now()
firstTimeLoad := true
wgGroup.Done()
for {
currentTimeGet := time.Now()
totalProxy := proxyProvider.receivingProxyBC.Len() + proxyProvider.workingProxyBC.Len()
if totalProxy > 200 && currentTimeGet.Sub(lastTimeGet) < DURATION_FORCE_UPDATE && !firstTimeLoad {
time.Sleep(proxySource.WatchWait)
continue
}
firstTimeLoad = false
wgGroup.Add(1)
proxies, err := proxySource.GetProxies()
wgGroup.Done()
LogInfo("Get proxy from source ", proxySource.Id)
if err != nil {
time.Sleep(5 * time.Second)
continue
}
wgGroup.Add(1)
for _, proxy := range proxies {
proxyProvider.receivingProxyBC.In() <- proxy
}
wgGroup.Done()
lastTimeGet = time.Now()
time.Sleep(20 * time.Second)
}
}(proxySource)
}
for _, proxySource := range remoteSources {
go func(proxySource *ProxySource) {
time.Sleep(2 * time.Second)
lastTimeGet := time.Now()
firstTimeLoad := true
for {
currentTimeGet := time.Now()
totalProxy := proxyProvider.receivingProxyBC.Len() + proxyProvider.workingProxyBC.Len()
if totalProxy > 100 && currentTimeGet.Sub(lastTimeGet) < DURATION_FORCE_UPDATE && !firstTimeLoad {
time.Sleep(proxySource.WatchWait)
continue
}
firstTimeLoad = false
proxies, err := proxySource.GetProxies()
if err != nil {
time.Sleep(5 * time.Second)
continue
}
for _, proxy := range proxies {
wgGroup.Wait()
proxyProvider.receivingProxyBC.In() <- proxy
}
lastTimeGet = time.Now()
time.Sleep(20 * time.Second)
}
}(proxySource)
}
更新 RWLOCK
使用这些代码我可以锁定 localSources
但它似乎没有优化;我需要当任何 localSources
得到然后锁定所有 remoteSources
;当没有 localSources
获取时,允许所有 remoteSources
获取。目前只允许同时获得一个remoteSources
wgGroup := sync.WaitGroup{}
wgGroup.Add(len(localSources))
localGroupRwLock := sync.RWMutex{}
for _, proxySource := range localSources {
go func(proxySource *ProxySource) {
lastTimeGet := time.Now()
firstTimeLoad := true
wgGroup.Done()
for {
currentTimeGet := time.Now()
totalProxy := proxyProvider.receivingProxyBC.Len() + proxyProvider.workingProxyBC.Len()
LogInfo("Total proxies ", totalProxy)
if totalProxy > 100 && currentTimeGet.Sub(lastTimeGet) < DURATION_FORCE_UPDATE && !firstTimeLoad {
LogInfo("Enough proxy & proxy are not new sleep ", proxySource.Id, " for ", proxySource.WatchWait.Seconds())
time.Sleep(proxySource.WatchWait)
continue
}
firstTimeLoad = false
LogInfo("Not enough proxy or proxies are new ", proxySource.Id)
localGroupRwLock.RLock()
proxies, err := proxySource.GetProxies()
localGroupRwLock.RUnlock()
LogInfo("Get proxy from source ", proxySource.Id)
if err != nil {
LogError("Error when get proxies from ", proxySource.Id)
time.Sleep(5 * time.Second)
continue
}
LogInfo("Add proxy from source ", proxySource.Id)
localGroupRwLock.RLock()
for _, proxy := range proxies {
proxyProvider.receivingProxyBC.In() <- proxy
}
localGroupRwLock.RUnlock()
LogInfo("Done add proxy from source ", proxySource.Id)
//LogInfo("Gotten proxy source ", proxySource.Id, " done now sleep ", proxySource.Cooldown.String())
lastTimeGet = time.Now()
time.Sleep(20 * time.Second) // 20 seconds for loading new proxies
LogInfo("Watch for proxy source", proxySource.Id)
}
}(proxySource)
}
for _, proxySource := range remoteSources {
go func(proxySource *ProxySource) {
time.Sleep(2 * time.Second)
lastTimeGet := time.Now()
firstTimeLoad := true
for {
currentTimeGet := time.Now()
totalProxy := proxyProvider.receivingProxyBC.Len() + proxyProvider.workingProxyBC.Len()
LogInfo("Total proxies ", totalProxy)
if totalProxy > 100 && currentTimeGet.Sub(lastTimeGet) < DURATION_FORCE_UPDATE && !firstTimeLoad {
LogInfo("Enough proxy & proxy are not new sleep ", proxySource.Id, " for ", proxySource.WatchWait.Seconds())
time.Sleep(proxySource.WatchWait)
continue
}
firstTimeLoad = false
LogInfo("Not enough proxy or proxies are new ", proxySource.Id)
LogInfo("Get proxy from source ", proxySource.Id)
localGroupRwLock.Lock()
proxies, err := proxySource.GetProxies()
localGroupRwLock.Unlock()
if err != nil {
LogError("Error when get proxies from ", proxySource.Id)
time.Sleep(5 * time.Second)
continue
}
LogInfo("Add proxy from source ", proxySource.Id)
wgGroup.Wait()
localGroupRwLock.Lock()
for _, proxy := range proxies {
proxyProvider.receivingProxyBC.In() <- proxy
}
localGroupRwLock.Unlock()
LogInfo("Done add proxy from source ", proxySource.Id)
//LogInfo("Gotten proxy source ", proxySource.Id, " done now sleep ", proxySource.Cooldown.String())
lastTimeGet = time.Now()
time.Sleep(20 * time.Second) // 20 seconds for loading new proxies
LogInfo("Watch for proxy source", proxySource.Id)
}
}(proxySource)
}
来自 document:
A WaitGroup waits for a collection of goroutines to finish. The main
goroutine calls Add to set the number of goroutines to wait for. Then
each of the goroutines runs and calls Done when finished. At the same
time, Wait can be used to block until all goroutines have finished.
对于Wait()
:
Wait blocks until the WaitGroup counter is zero.
你也可以在那里看到例子。问题是,WaitGroup
用于阻塞,直到计数器变为零。所以在原始代码中,假设没有运行时错误,第二个 for 循环中的每个 goroutine 都会阻塞,直到第一个 goroutines 完成。在第一部分,Add(1)
和 Done()
根本不会阻塞。数据竞争将继续存在。
错误记录在 Add()
方法中:
添加向 WaitGroup 计数器添加可能为负的增量。如果计数器变为零,则释放所有阻塞在 Wait 上的 goroutine。如果计数器变为负值,添加恐慌。
Note that calls with a positive delta that occur when the counter is
zero must happen before a Wait. Calls with a negative delta, or calls
with a positive delta that start when the counter is greater than
zero, may happen at any time. Typically this means the calls to Add
should execute before the statement creating the goroutine or other
event to be waited for. If a WaitGroup is reused to wait for several
independent sets of events, new Add calls must happen after all
previous Wait calls have returned. See the WaitGroup example.
但是,您也不是在等待 独立 组组。
适合您的代码的工具是 sync.Mutex
。文件,再次:
A Mutex is a mutual exclusion lock. The zero value for a Mutex is an
unlocked mutex.
A Mutex must not be copied after first use.
type Mutex struct {
// contains filtered or unexported fields }
func (*Mutex) Lock
func (m *Mutex) Lock()
Lock locks m. If the lock is already in use, the calling goroutine
blocks until the mutex is available. func (*Mutex) Unlock
func (m *Mutex) Unlock()
Unlock unlocks m. It is a run-time error if m is not locked on entry
to Unlock.
所以正如您所描述的,您想要 "pause the calling of proxyProvider.receivingProxyBC.In() <- proxy
when proxySource.GetProxies()
or for _, proxy := range proxies
is called"。暂停用术语 block 更好地描述,这是教科书上的互斥锁问题:用锁保护所有三个 "calls"(因为 for 循环不是调用)并且它是完成。
关于如何用互斥量保护 for 循环可能有点棘手,它应该是这样的:
lock.Lock
for ... {
lock.Unlock()
...
lock.Lock()
}
所以我更改了您的代码,希望它能按预期工作:
lock := sync.Mutex{}
lock.Lock()
for _, proxySource := range localSources {
lock.Unlock()
go func(proxySource *ProxySource) {
lock.Lock()
lastTimeGet := time.Now()
firstTimeLoad := true
lock.Unlock()
for {
currentTimeGet := time.Now()
totalProxy := proxyProvider.receivingProxyBC.Len() + proxyProvider.workingProxyBC.Len()
if totalProxy > 200 && currentTimeGet.Sub(lastTimeGet) < DURATION_FORCE_UPDATE && !firstTimeLoad {
time.Sleep(proxySource.WatchWait)
continue
}
firstTimeLoad = false
lock.Lock()
proxies, err := proxySource.GetProxies()
lock.Unlock()
LogInfo("Get proxy from source ", proxySource.Id)
if err != nil {
time.Sleep(5 * time.Second)
continue
}
lock.Lock()
for _, proxy := range proxies {
proxyProvider.receivingProxyBC.In() <- proxy
}
lock.Unlock()
lastTimeGet = time.Now()
time.Sleep(20 * time.Second)
}
}(proxySource)
lock.Lock()
}
for _, proxySource := range remoteSources {
go func(proxySource *ProxySource) {
time.Sleep(2 * time.Second)
lastTimeGet := time.Now()
firstTimeLoad := true
for {
currentTimeGet := time.Now()
totalProxy := proxyProvider.receivingProxyBC.Len() + proxyProvider.workingProxyBC.Len()
if totalProxy > 100 && currentTimeGet.Sub(lastTimeGet) < DURATION_FORCE_UPDATE && !firstTimeLoad {
time.Sleep(proxySource.WatchWait)
continue
}
firstTimeLoad = false
proxies, err := proxySource.GetProxies()
if err != nil {
time.Sleep(5 * time.Second)
continue
}
for _, proxy := range proxies {
lock.Lock()
proxyProvider.receivingProxyBC.In() <- proxy
lock.Unlock()
}
lastTimeGet = time.Now()
time.Sleep(20 * time.Second)
}
}(proxySource)
}
注意 1:您可能会想使用 defer
。不。 defer
用于函数,而不是块。
注2:在golang中使用mutex时,经常会出现设计上的问题。人们应该总是看看使用通道是否更好并重构代码,尽管在许多情况下互斥锁并不是一个坏主意。但在这里我看不到关于设计的任何信息,所以我就让它去吧。
注3:代码实际上存在暂停proxySource.GetProxies()
和调用proxyProvider.receivingProxyBC.In() <- proxy
时的for循环的问题。这是否需要取决于。如果不需要,你应该看看sync.RWMutex
,并根据它进行更改。就交给你了。
我使用以下代码,但不知道为什么会崩溃并出现错误 (WaitGroup 在上一个 Wait 之前被重用) 在行:
for _, proxy := range proxies {
wgGroup.Wait()
我想确保在调用 proxySource.GetProxies()
时 proxyProvider.receivingProxyBC.In() <- proxy
不允许 remoteSources
调用 proxyProvider.receivingProxyBC.In() <- proxy
详细代码在这里:
wgGroup := sync.WaitGroup{}
wgGroup.Add(len(localSources))
for _, proxySource := range localSources {
go func(proxySource *ProxySource) {
lastTimeGet := time.Now()
firstTimeLoad := true
wgGroup.Done()
for {
currentTimeGet := time.Now()
totalProxy := proxyProvider.receivingProxyBC.Len() + proxyProvider.workingProxyBC.Len()
if totalProxy > 200 && currentTimeGet.Sub(lastTimeGet) < DURATION_FORCE_UPDATE && !firstTimeLoad {
time.Sleep(proxySource.WatchWait)
continue
}
firstTimeLoad = false
wgGroup.Add(1)
proxies, err := proxySource.GetProxies()
wgGroup.Done()
LogInfo("Get proxy from source ", proxySource.Id)
if err != nil {
time.Sleep(5 * time.Second)
continue
}
wgGroup.Add(1)
for _, proxy := range proxies {
proxyProvider.receivingProxyBC.In() <- proxy
}
wgGroup.Done()
lastTimeGet = time.Now()
time.Sleep(20 * time.Second)
}
}(proxySource)
}
for _, proxySource := range remoteSources {
go func(proxySource *ProxySource) {
time.Sleep(2 * time.Second)
lastTimeGet := time.Now()
firstTimeLoad := true
for {
currentTimeGet := time.Now()
totalProxy := proxyProvider.receivingProxyBC.Len() + proxyProvider.workingProxyBC.Len()
if totalProxy > 100 && currentTimeGet.Sub(lastTimeGet) < DURATION_FORCE_UPDATE && !firstTimeLoad {
time.Sleep(proxySource.WatchWait)
continue
}
firstTimeLoad = false
proxies, err := proxySource.GetProxies()
if err != nil {
time.Sleep(5 * time.Second)
continue
}
for _, proxy := range proxies {
wgGroup.Wait()
proxyProvider.receivingProxyBC.In() <- proxy
}
lastTimeGet = time.Now()
time.Sleep(20 * time.Second)
}
}(proxySource)
}
更新 RWLOCK
使用这些代码我可以锁定 localSources
但它似乎没有优化;我需要当任何 localSources
得到然后锁定所有 remoteSources
;当没有 localSources
获取时,允许所有 remoteSources
获取。目前只允许同时获得一个remoteSources
wgGroup := sync.WaitGroup{}
wgGroup.Add(len(localSources))
localGroupRwLock := sync.RWMutex{}
for _, proxySource := range localSources {
go func(proxySource *ProxySource) {
lastTimeGet := time.Now()
firstTimeLoad := true
wgGroup.Done()
for {
currentTimeGet := time.Now()
totalProxy := proxyProvider.receivingProxyBC.Len() + proxyProvider.workingProxyBC.Len()
LogInfo("Total proxies ", totalProxy)
if totalProxy > 100 && currentTimeGet.Sub(lastTimeGet) < DURATION_FORCE_UPDATE && !firstTimeLoad {
LogInfo("Enough proxy & proxy are not new sleep ", proxySource.Id, " for ", proxySource.WatchWait.Seconds())
time.Sleep(proxySource.WatchWait)
continue
}
firstTimeLoad = false
LogInfo("Not enough proxy or proxies are new ", proxySource.Id)
localGroupRwLock.RLock()
proxies, err := proxySource.GetProxies()
localGroupRwLock.RUnlock()
LogInfo("Get proxy from source ", proxySource.Id)
if err != nil {
LogError("Error when get proxies from ", proxySource.Id)
time.Sleep(5 * time.Second)
continue
}
LogInfo("Add proxy from source ", proxySource.Id)
localGroupRwLock.RLock()
for _, proxy := range proxies {
proxyProvider.receivingProxyBC.In() <- proxy
}
localGroupRwLock.RUnlock()
LogInfo("Done add proxy from source ", proxySource.Id)
//LogInfo("Gotten proxy source ", proxySource.Id, " done now sleep ", proxySource.Cooldown.String())
lastTimeGet = time.Now()
time.Sleep(20 * time.Second) // 20 seconds for loading new proxies
LogInfo("Watch for proxy source", proxySource.Id)
}
}(proxySource)
}
for _, proxySource := range remoteSources {
go func(proxySource *ProxySource) {
time.Sleep(2 * time.Second)
lastTimeGet := time.Now()
firstTimeLoad := true
for {
currentTimeGet := time.Now()
totalProxy := proxyProvider.receivingProxyBC.Len() + proxyProvider.workingProxyBC.Len()
LogInfo("Total proxies ", totalProxy)
if totalProxy > 100 && currentTimeGet.Sub(lastTimeGet) < DURATION_FORCE_UPDATE && !firstTimeLoad {
LogInfo("Enough proxy & proxy are not new sleep ", proxySource.Id, " for ", proxySource.WatchWait.Seconds())
time.Sleep(proxySource.WatchWait)
continue
}
firstTimeLoad = false
LogInfo("Not enough proxy or proxies are new ", proxySource.Id)
LogInfo("Get proxy from source ", proxySource.Id)
localGroupRwLock.Lock()
proxies, err := proxySource.GetProxies()
localGroupRwLock.Unlock()
if err != nil {
LogError("Error when get proxies from ", proxySource.Id)
time.Sleep(5 * time.Second)
continue
}
LogInfo("Add proxy from source ", proxySource.Id)
wgGroup.Wait()
localGroupRwLock.Lock()
for _, proxy := range proxies {
proxyProvider.receivingProxyBC.In() <- proxy
}
localGroupRwLock.Unlock()
LogInfo("Done add proxy from source ", proxySource.Id)
//LogInfo("Gotten proxy source ", proxySource.Id, " done now sleep ", proxySource.Cooldown.String())
lastTimeGet = time.Now()
time.Sleep(20 * time.Second) // 20 seconds for loading new proxies
LogInfo("Watch for proxy source", proxySource.Id)
}
}(proxySource)
}
来自 document:
A WaitGroup waits for a collection of goroutines to finish. The main goroutine calls Add to set the number of goroutines to wait for. Then each of the goroutines runs and calls Done when finished. At the same time, Wait can be used to block until all goroutines have finished.
对于Wait()
:
Wait blocks until the WaitGroup counter is zero.
你也可以在那里看到例子。问题是,WaitGroup
用于阻塞,直到计数器变为零。所以在原始代码中,假设没有运行时错误,第二个 for 循环中的每个 goroutine 都会阻塞,直到第一个 goroutines 完成。在第一部分,Add(1)
和 Done()
根本不会阻塞。数据竞争将继续存在。
错误记录在 Add()
方法中:
添加向 WaitGroup 计数器添加可能为负的增量。如果计数器变为零,则释放所有阻塞在 Wait 上的 goroutine。如果计数器变为负值,添加恐慌。
Note that calls with a positive delta that occur when the counter is zero must happen before a Wait. Calls with a negative delta, or calls with a positive delta that start when the counter is greater than zero, may happen at any time. Typically this means the calls to Add should execute before the statement creating the goroutine or other event to be waited for. If a WaitGroup is reused to wait for several independent sets of events, new Add calls must happen after all previous Wait calls have returned. See the WaitGroup example.
但是,您也不是在等待 独立 组组。
适合您的代码的工具是 sync.Mutex
。文件,再次:
A Mutex is a mutual exclusion lock. The zero value for a Mutex is an unlocked mutex.
A Mutex must not be copied after first use.
type Mutex struct { // contains filtered or unexported fields }
func (*Mutex) Lock
func (m *Mutex) Lock()
Lock locks m. If the lock is already in use, the calling goroutine blocks until the mutex is available. func (*Mutex) Unlock
func (m *Mutex) Unlock()
Unlock unlocks m. It is a run-time error if m is not locked on entry to Unlock.
所以正如您所描述的,您想要 "pause the calling of proxyProvider.receivingProxyBC.In() <- proxy
when proxySource.GetProxies()
or for _, proxy := range proxies
is called"。暂停用术语 block 更好地描述,这是教科书上的互斥锁问题:用锁保护所有三个 "calls"(因为 for 循环不是调用)并且它是完成。
关于如何用互斥量保护 for 循环可能有点棘手,它应该是这样的:
lock.Lock
for ... {
lock.Unlock()
...
lock.Lock()
}
所以我更改了您的代码,希望它能按预期工作:
lock := sync.Mutex{}
lock.Lock()
for _, proxySource := range localSources {
lock.Unlock()
go func(proxySource *ProxySource) {
lock.Lock()
lastTimeGet := time.Now()
firstTimeLoad := true
lock.Unlock()
for {
currentTimeGet := time.Now()
totalProxy := proxyProvider.receivingProxyBC.Len() + proxyProvider.workingProxyBC.Len()
if totalProxy > 200 && currentTimeGet.Sub(lastTimeGet) < DURATION_FORCE_UPDATE && !firstTimeLoad {
time.Sleep(proxySource.WatchWait)
continue
}
firstTimeLoad = false
lock.Lock()
proxies, err := proxySource.GetProxies()
lock.Unlock()
LogInfo("Get proxy from source ", proxySource.Id)
if err != nil {
time.Sleep(5 * time.Second)
continue
}
lock.Lock()
for _, proxy := range proxies {
proxyProvider.receivingProxyBC.In() <- proxy
}
lock.Unlock()
lastTimeGet = time.Now()
time.Sleep(20 * time.Second)
}
}(proxySource)
lock.Lock()
}
for _, proxySource := range remoteSources {
go func(proxySource *ProxySource) {
time.Sleep(2 * time.Second)
lastTimeGet := time.Now()
firstTimeLoad := true
for {
currentTimeGet := time.Now()
totalProxy := proxyProvider.receivingProxyBC.Len() + proxyProvider.workingProxyBC.Len()
if totalProxy > 100 && currentTimeGet.Sub(lastTimeGet) < DURATION_FORCE_UPDATE && !firstTimeLoad {
time.Sleep(proxySource.WatchWait)
continue
}
firstTimeLoad = false
proxies, err := proxySource.GetProxies()
if err != nil {
time.Sleep(5 * time.Second)
continue
}
for _, proxy := range proxies {
lock.Lock()
proxyProvider.receivingProxyBC.In() <- proxy
lock.Unlock()
}
lastTimeGet = time.Now()
time.Sleep(20 * time.Second)
}
}(proxySource)
}
注意 1:您可能会想使用 defer
。不。 defer
用于函数,而不是块。
注2:在golang中使用mutex时,经常会出现设计上的问题。人们应该总是看看使用通道是否更好并重构代码,尽管在许多情况下互斥锁并不是一个坏主意。但在这里我看不到关于设计的任何信息,所以我就让它去吧。
注3:代码实际上存在暂停proxySource.GetProxies()
和调用proxyProvider.receivingProxyBC.In() <- proxy
时的for循环的问题。这是否需要取决于。如果不需要,你应该看看sync.RWMutex
,并根据它进行更改。就交给你了。