何时何地检查通道是否不会获得更多数据?
When and where to check if channel won't get any more data?
我正在尝试解决Exercise: Web Crawler
In this exercise you'll use Go's concurrency features to parallelize a
web crawler.
Modify the Crawl function to fetch URLs in parallel without fetching
the same URL twice.
我什么时候应该检查所有的 url 是否已经被抓取? (或者我怎么知道是否没有更多数据排队?)
package main
import (
"fmt"
)
type Result struct {
Url string
Depth int
}
type Stor struct {
Queue chan Result
Visited map[string]int
}
func NewStor() *Stor {
return &Stor{
Queue: make(chan Result,1000),
Visited: map[string]int{},
}
}
type Fetcher interface {
// Fetch returns the body of URL and
// a slice of URLs found on that page.
Fetch(url string) (body string, urls []string, err error)
}
// Crawl uses fetcher to recursively crawl
// pages starting with url, to a maximum of depth.
func Crawl(res Result, fetcher Fetcher, stor *Stor) {
defer func() {
/*
if len(stor.Queue) == 0 {
close(stor.Queue)
}
*/ // this is wrong, it makes the channel closes too early
}()
if res.Depth <= 0 {
return
}
// TODO: Don't fetch the same URL twice.
url := res.Url
stor.Visited[url]++
if stor.Visited[url] > 1 {
fmt.Println("skip:",stor.Visited[url],url)
return
}
body, urls, err := fetcher.Fetch(url)
if err != nil {
fmt.Println(err)
return
}
fmt.Printf("found: %s %q\n", url, body)
for _, u := range urls {
stor.Queue <- Result{u,res.Depth-1}
}
return
}
func main() {
stor := NewStor()
Crawl(Result{"http://golang.org/", 4}, fetcher, stor)
for res := range stor.Queue {
// TODO: Fetch URLs in parallel.
go Crawl(res,fetcher,stor)
}
}
// fakeFetcher is Fetcher that returns canned results.
type fakeFetcher map[string]*fakeResult
type fakeResult struct {
body string
urls []string
}
func (f fakeFetcher) Fetch(url string) (string, []string, error) {
if res, ok := f[url]; ok {
return res.body, res.urls, nil
}
return "", nil, fmt.Errorf("not found: %s", url)
}
// fetcher is a populated fakeFetcher.
var fetcher = fakeFetcher{
"http://golang.org/": &fakeResult{
"The Go Programming Language",
[]string{
"http://golang.org/pkg/",
"http://golang.org/cmd/",
},
},
"http://golang.org/pkg/": &fakeResult{
"Packages",
[]string{
"http://golang.org/",
"http://golang.org/cmd/",
"http://golang.org/pkg/fmt/",
"http://golang.org/pkg/os/",
},
},
"http://golang.org/pkg/fmt/": &fakeResult{
"Package fmt",
[]string{
"http://golang.org/",
"http://golang.org/pkg/",
},
},
"http://golang.org/pkg/os/": &fakeResult{
"Package os",
[]string{
"http://golang.org/",
"http://golang.org/pkg/",
},
},
}
输出是死锁,因为 stor.Queue
通道从未关闭。
检查频道的 len
始终是一场竞赛,您不能将其用于任何类型的同步。
生产者始终是关闭通道的一方,因为尝试在关闭的通道上发送是致命错误。不要在这里使用延迟,发送完成后关闭通道即可。
等待所有 goroutine 完成的最简单方法是 sync.WaitGroup 在同步包中
package main
import "sync"
var wg sync.WaitGroup
//then you do
func Crawl(res Result, fetcher Fetcher) { //what for you pass stor *Stor as arg? It just visible for all goroutings
defer wg.Done()
...
//why not to spawn new routing just inside Crowl?
for res := range urls {
wg.Add(1)
go Crawl(res,fetcher)
}
...
}
...
//And in main.main()
func main() {
wg.Add(1)
Crawl(Result{"http://golang.org/", 4}, fetcher)
...
wg.Wait() //Will block until all routings Done
}
完整的解决方案将是:
package main
import (
"fmt"
"sync"
)
var wg sync.WaitGroup
var visited map[string]int = map[string]int{}
type Result struct {
Url string
Depth int
}
type Fetcher interface {
// Fetch returns the body of URL and
// a slice of URLs found on that page.
Fetch(url string) (body string, urls []string, err error)
}
// Crawl uses fetcher to recursively crawl
// pages starting with url, to a maximum of depth.
func Crawl(res Result, fetcher Fetcher) {
defer wg.Done()
if res.Depth <= 0 {
return
}
// TODO: Don't fetch the same URL twice.
url := res.Url
visited[url]++
if visited[url] > 1 {
fmt.Println("skip:",visited[url],url)
return
}
body, urls, err := fetcher.Fetch(url)
if err != nil {
fmt.Println(err)
return
}
fmt.Printf("found: %s %q\n", url, body)
for _, u := range urls {
wg.Add(1)
go Crawl( Result{u,res.Depth-1},fetcher)
//stor.Queue <- Result{u,res.Depth-1}
}
return
}
func main() {
wg.Add(1)
Crawl(Result{"http://golang.org/", 4}, fetcher)
wg.Wait()
}
// fakeFetcher is Fetcher that returns canned results.
type fakeFetcher map[string]*fakeResult
type fakeResult struct {
body string
urls []string
}
func (f fakeFetcher) Fetch(url string) (string, []string, error) {
if res, ok := f[url]; ok {
return res.body, res.urls, nil
}
return "", nil, fmt.Errorf("not found: %s", url)
}
// fetcher is a populated fakeFetcher.
var fetcher = fakeFetcher{
"http://golang.org/": &fakeResult{
"The Go Programming Language",
[]string{
"http://golang.org/pkg/",
"http://golang.org/cmd/",
},
},
"http://golang.org/pkg/": &fakeResult{
"Packages",
[]string{
"http://golang.org/",
"http://golang.org/cmd/",
"http://golang.org/pkg/fmt/",
"http://golang.org/pkg/os/",
},
},
"http://golang.org/pkg/fmt/": &fakeResult{
"Package fmt",
[]string{
"http://golang.org/",
"http://golang.org/pkg/",
},
},
"http://golang.org/pkg/os/": &fakeResult{
"Package os",
[]string{
"http://golang.org/",
"http://golang.org/pkg/",
},
},
}
我正在尝试解决Exercise: Web Crawler
In this exercise you'll use Go's concurrency features to parallelize a web crawler.
Modify the Crawl function to fetch URLs in parallel without fetching the same URL twice.
我什么时候应该检查所有的 url 是否已经被抓取? (或者我怎么知道是否没有更多数据排队?)
package main
import (
"fmt"
)
type Result struct {
Url string
Depth int
}
type Stor struct {
Queue chan Result
Visited map[string]int
}
func NewStor() *Stor {
return &Stor{
Queue: make(chan Result,1000),
Visited: map[string]int{},
}
}
type Fetcher interface {
// Fetch returns the body of URL and
// a slice of URLs found on that page.
Fetch(url string) (body string, urls []string, err error)
}
// Crawl uses fetcher to recursively crawl
// pages starting with url, to a maximum of depth.
func Crawl(res Result, fetcher Fetcher, stor *Stor) {
defer func() {
/*
if len(stor.Queue) == 0 {
close(stor.Queue)
}
*/ // this is wrong, it makes the channel closes too early
}()
if res.Depth <= 0 {
return
}
// TODO: Don't fetch the same URL twice.
url := res.Url
stor.Visited[url]++
if stor.Visited[url] > 1 {
fmt.Println("skip:",stor.Visited[url],url)
return
}
body, urls, err := fetcher.Fetch(url)
if err != nil {
fmt.Println(err)
return
}
fmt.Printf("found: %s %q\n", url, body)
for _, u := range urls {
stor.Queue <- Result{u,res.Depth-1}
}
return
}
func main() {
stor := NewStor()
Crawl(Result{"http://golang.org/", 4}, fetcher, stor)
for res := range stor.Queue {
// TODO: Fetch URLs in parallel.
go Crawl(res,fetcher,stor)
}
}
// fakeFetcher is Fetcher that returns canned results.
type fakeFetcher map[string]*fakeResult
type fakeResult struct {
body string
urls []string
}
func (f fakeFetcher) Fetch(url string) (string, []string, error) {
if res, ok := f[url]; ok {
return res.body, res.urls, nil
}
return "", nil, fmt.Errorf("not found: %s", url)
}
// fetcher is a populated fakeFetcher.
var fetcher = fakeFetcher{
"http://golang.org/": &fakeResult{
"The Go Programming Language",
[]string{
"http://golang.org/pkg/",
"http://golang.org/cmd/",
},
},
"http://golang.org/pkg/": &fakeResult{
"Packages",
[]string{
"http://golang.org/",
"http://golang.org/cmd/",
"http://golang.org/pkg/fmt/",
"http://golang.org/pkg/os/",
},
},
"http://golang.org/pkg/fmt/": &fakeResult{
"Package fmt",
[]string{
"http://golang.org/",
"http://golang.org/pkg/",
},
},
"http://golang.org/pkg/os/": &fakeResult{
"Package os",
[]string{
"http://golang.org/",
"http://golang.org/pkg/",
},
},
}
输出是死锁,因为 stor.Queue
通道从未关闭。
检查频道的 len
始终是一场竞赛,您不能将其用于任何类型的同步。
生产者始终是关闭通道的一方,因为尝试在关闭的通道上发送是致命错误。不要在这里使用延迟,发送完成后关闭通道即可。
等待所有 goroutine 完成的最简单方法是 sync.WaitGroup 在同步包中
package main
import "sync"
var wg sync.WaitGroup
//then you do
func Crawl(res Result, fetcher Fetcher) { //what for you pass stor *Stor as arg? It just visible for all goroutings
defer wg.Done()
...
//why not to spawn new routing just inside Crowl?
for res := range urls {
wg.Add(1)
go Crawl(res,fetcher)
}
...
}
...
//And in main.main()
func main() {
wg.Add(1)
Crawl(Result{"http://golang.org/", 4}, fetcher)
...
wg.Wait() //Will block until all routings Done
}
完整的解决方案将是:
package main
import (
"fmt"
"sync"
)
var wg sync.WaitGroup
var visited map[string]int = map[string]int{}
type Result struct {
Url string
Depth int
}
type Fetcher interface {
// Fetch returns the body of URL and
// a slice of URLs found on that page.
Fetch(url string) (body string, urls []string, err error)
}
// Crawl uses fetcher to recursively crawl
// pages starting with url, to a maximum of depth.
func Crawl(res Result, fetcher Fetcher) {
defer wg.Done()
if res.Depth <= 0 {
return
}
// TODO: Don't fetch the same URL twice.
url := res.Url
visited[url]++
if visited[url] > 1 {
fmt.Println("skip:",visited[url],url)
return
}
body, urls, err := fetcher.Fetch(url)
if err != nil {
fmt.Println(err)
return
}
fmt.Printf("found: %s %q\n", url, body)
for _, u := range urls {
wg.Add(1)
go Crawl( Result{u,res.Depth-1},fetcher)
//stor.Queue <- Result{u,res.Depth-1}
}
return
}
func main() {
wg.Add(1)
Crawl(Result{"http://golang.org/", 4}, fetcher)
wg.Wait()
}
// fakeFetcher is Fetcher that returns canned results.
type fakeFetcher map[string]*fakeResult
type fakeResult struct {
body string
urls []string
}
func (f fakeFetcher) Fetch(url string) (string, []string, error) {
if res, ok := f[url]; ok {
return res.body, res.urls, nil
}
return "", nil, fmt.Errorf("not found: %s", url)
}
// fetcher is a populated fakeFetcher.
var fetcher = fakeFetcher{
"http://golang.org/": &fakeResult{
"The Go Programming Language",
[]string{
"http://golang.org/pkg/",
"http://golang.org/cmd/",
},
},
"http://golang.org/pkg/": &fakeResult{
"Packages",
[]string{
"http://golang.org/",
"http://golang.org/cmd/",
"http://golang.org/pkg/fmt/",
"http://golang.org/pkg/os/",
},
},
"http://golang.org/pkg/fmt/": &fakeResult{
"Package fmt",
[]string{
"http://golang.org/",
"http://golang.org/pkg/",
},
},
"http://golang.org/pkg/os/": &fakeResult{
"Package os",
[]string{
"http://golang.org/",
"http://golang.org/pkg/",
},
},
}