从非缓冲通道读取
Reading from non buffered channels
我试图了解非缓冲通道,因此我编写了一个小应用程序,它循环访问用户输入数组,做一些工作,将信息放在非缓冲通道上,然后读取它。但是,我无法从频道中读取。
这是我的代码
toProcess := os.Args[1:]
var wg sync.WaitGroup
results := make(chan string)
errs := make(chan error)
for _, t := range toProcess {
wg.Add(1)
go Worker(t, "text", results, errs, &wg)
}
go func() {
for err := range errs {
if err != nil {
fmt.Println(err)
}
}
}()
go func() {
for res := range results {
fmt.Println(res)
}
}()
我对非缓冲通道有什么不了解的地方?我想我应该把信息放在上面,然后再去例行阅读它。
编辑:使用两个 goroutine 解决了问题,但是当出现错误时它仍然给我以下信息:
open /Users/roosingh/go/src/github.com/nonbuff/files/22.txt: no such file or directory
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [semacquire]:
sync.runtime_Semacquire(0xc42001416c)
/usr/local/Cellar/go/1.10.2/libexec/src/runtime/sema.go:56 +0x39
sync.(*WaitGroup).Wait(0xc420014160)
/usr/local/Cellar/go/1.10.2/libexec/src/sync/waitgroup.go:129 +0x72
main.main()
/Users/roosingh/go/src/github.com/nonbuff/main.go:39 +0x207
goroutine 6 [chan receive]:
main.main.func1(0xc4200780c0)
/Users/roosingh/go/src/github.com/nonbuff/main.go:25 +0x41
created by main.main
/Users/roosingh/go/src/github.com/nonbuff/main.go:24 +0x1d4
goroutine 7 [chan receive]:
main.main.func2(0xc420078060)
/Users/roosingh/go/src/github.com/nonbuff/main.go:34 +0xb2
created by main.main
/Users/roosingh/go/src/github.com/nonbuff/main.go:33 +0x1f6
所以它能够打印出错误信息。
我的worker代码如下;
func Worker(fn string, text string, results chan string, errs chan error, wg *sync.WaitGroup) {
file, err := os.Open(fn)
if err != nil {
errs <- err
return
}
defer func() {
file.Close()
wg.Done()
}()
reader := bufio.NewReader(file)
for {
var buffer bytes.Buffer
var l []byte
var isPrefix bool
for {
l, isPrefix, err = reader.ReadLine()
buffer.Write(l)
if !isPrefix {
break
}
if err != nil {
errs <- err
return
}
}
if err == io.EOF {
return
}
line := buffer.String()
results <- fmt. Sprintf("%s, %s", line, text)
}
if err != io.EOF {
errs <- err
return
}
return
}
至于无缓冲通道,你似乎理解这个概念,意思是它用于在goroutines之间传递消息但不能容纳任何消息。因此,对无缓冲通道的写入将阻塞,直到另一个 goroutine 从该通道读取,从一个通道读取将阻塞,直到另一个 goroutine 写入该通道。
在你的例子中,你似乎想在同一个 goroutine 中同时读取 2 个通道。由于通道的工作方式,您不能在非封闭通道上进行范围,而在另一个通道上的相同 goroutine 范围内进一步向下。除非第一个通道关闭,否则您将无法到达第二个范围。
但是,这并不意味着不可能!这就是 select
语句的用武之地。
select
语句允许您有选择地从多个频道读取,这意味着它将读取第一个有可读内容的频道。
考虑到这一点,您可以将 for
与 select
结合使用,并以这种方式重写您的例程:
go func() {
for {
select {
case err := <- errs: // you got an error
fmt.Println(err)
case res := <- results: // you got a result
fmt.Println(res)
}
}
}()
另外,你在这里不需要等待组,因为你知道你开始了多少个工人,你可以只计算你得到了多少错误和结果,并在达到工人数量时停止。
示例:
go func() {
var i int
for {
select {
case err := <- errs: // you got an error
fmt.Println(err)
i++
case res := <- results: // you got a result
fmt.Println(res)
i++
}
// all our workers are done
if i == len(toProcess) {
return
}
}
}()
我试图了解非缓冲通道,因此我编写了一个小应用程序,它循环访问用户输入数组,做一些工作,将信息放在非缓冲通道上,然后读取它。但是,我无法从频道中读取。 这是我的代码
toProcess := os.Args[1:]
var wg sync.WaitGroup
results := make(chan string)
errs := make(chan error)
for _, t := range toProcess {
wg.Add(1)
go Worker(t, "text", results, errs, &wg)
}
go func() {
for err := range errs {
if err != nil {
fmt.Println(err)
}
}
}()
go func() {
for res := range results {
fmt.Println(res)
}
}()
我对非缓冲通道有什么不了解的地方?我想我应该把信息放在上面,然后再去例行阅读它。
编辑:使用两个 goroutine 解决了问题,但是当出现错误时它仍然给我以下信息:
open /Users/roosingh/go/src/github.com/nonbuff/files/22.txt: no such file or directory
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [semacquire]:
sync.runtime_Semacquire(0xc42001416c)
/usr/local/Cellar/go/1.10.2/libexec/src/runtime/sema.go:56 +0x39
sync.(*WaitGroup).Wait(0xc420014160)
/usr/local/Cellar/go/1.10.2/libexec/src/sync/waitgroup.go:129 +0x72
main.main()
/Users/roosingh/go/src/github.com/nonbuff/main.go:39 +0x207
goroutine 6 [chan receive]:
main.main.func1(0xc4200780c0)
/Users/roosingh/go/src/github.com/nonbuff/main.go:25 +0x41
created by main.main
/Users/roosingh/go/src/github.com/nonbuff/main.go:24 +0x1d4
goroutine 7 [chan receive]:
main.main.func2(0xc420078060)
/Users/roosingh/go/src/github.com/nonbuff/main.go:34 +0xb2
created by main.main
/Users/roosingh/go/src/github.com/nonbuff/main.go:33 +0x1f6
所以它能够打印出错误信息。 我的worker代码如下;
func Worker(fn string, text string, results chan string, errs chan error, wg *sync.WaitGroup) {
file, err := os.Open(fn)
if err != nil {
errs <- err
return
}
defer func() {
file.Close()
wg.Done()
}()
reader := bufio.NewReader(file)
for {
var buffer bytes.Buffer
var l []byte
var isPrefix bool
for {
l, isPrefix, err = reader.ReadLine()
buffer.Write(l)
if !isPrefix {
break
}
if err != nil {
errs <- err
return
}
}
if err == io.EOF {
return
}
line := buffer.String()
results <- fmt. Sprintf("%s, %s", line, text)
}
if err != io.EOF {
errs <- err
return
}
return
}
至于无缓冲通道,你似乎理解这个概念,意思是它用于在goroutines之间传递消息但不能容纳任何消息。因此,对无缓冲通道的写入将阻塞,直到另一个 goroutine 从该通道读取,从一个通道读取将阻塞,直到另一个 goroutine 写入该通道。
在你的例子中,你似乎想在同一个 goroutine 中同时读取 2 个通道。由于通道的工作方式,您不能在非封闭通道上进行范围,而在另一个通道上的相同 goroutine 范围内进一步向下。除非第一个通道关闭,否则您将无法到达第二个范围。
但是,这并不意味着不可能!这就是 select
语句的用武之地。
select
语句允许您有选择地从多个频道读取,这意味着它将读取第一个有可读内容的频道。
考虑到这一点,您可以将 for
与 select
结合使用,并以这种方式重写您的例程:
go func() {
for {
select {
case err := <- errs: // you got an error
fmt.Println(err)
case res := <- results: // you got a result
fmt.Println(res)
}
}
}()
另外,你在这里不需要等待组,因为你知道你开始了多少个工人,你可以只计算你得到了多少错误和结果,并在达到工人数量时停止。
示例:
go func() {
var i int
for {
select {
case err := <- errs: // you got an error
fmt.Println(err)
i++
case res := <- results: // you got a result
fmt.Println(res)
i++
}
// all our workers are done
if i == len(toProcess) {
return
}
}
}()