定期在 Golang 中抓取 API

Periodically crawl API in Golang

我需要异步地、周期性地(每 10 秒)scrape/invoke 特定的 URL(例如 http://dummy.com/{address})地址列表。
根据从 URL 收到的结果,需要发布事件。
爬虫需要在一个 goroutine 中启动,每个 API 调用需要在一个单独的 goroutine 中。
将启动另一个 goroutine 来监听事件。
爬虫是用地址列表初始化的,但它需要有公开的方法来在任何时间点添加将要被抓取的新地址或删除现有地址。

请看下面,我的解决方案存在竞争问题。
它的发生是因为爬虫结构的 observables 字段不是 'thread save' 用于并发访问。
我知道“不交流共享内存”规则,但没有弄清楚我将如何使用它通道(谈论可观察字段)而不是切片以及如何可能 add/remove 'watching' 的附加地址(如果使用通道)。

如何修改波纹管解决方案以修复竞争条件?

package crawler

import (
    "fmt"
    log "github.com/sirupsen/logrus"
    "io/ioutil"
    "net/http"
    "strconv"
    "time"
)

type Service interface {
    Start()
    Stop()
    AddObservable(observable Observable)
    RemoveObservable(observable Observable)
    GetEventChannel() chan event
}

type event struct {
    EventType int
    Result    Result
}

type Result struct {
    resp []byte
}

type Observable struct {
    AccountType int
    Address     string
}

type crawler struct {
    explorerApiUrl string
    interval       *time.Ticker
    errChan        chan error
    quitChan       chan int
    eventChan      chan event
    observables    []Observable
    errorHandler   func(err error)
}

func NewService(
    observables []Observable,
    errorHandler func(err error),
) Service {

    interval := time.NewTicker(10 * time.Second)

    return &crawler{
        explorerApiUrl: "http://dummy.com",
        interval:       interval,
        errChan:        make(chan error),
        quitChan:       make(chan int),
        eventChan:      make(chan event),
        observables:    observables,
        errorHandler:   errorHandler,
    }
}

func (u *crawler) Start() {
    log.Debug("start observe")
    for {
        select {
        case <-u.interval.C:
            log.Debug("observe interval")
            u.observeAll(u.observables)
        case err := <-u.errChan:
            u.errorHandler(err)
        case <-u.quitChan:
            log.Debug("stop observe")
            u.interval.Stop()
            time.Sleep(time.Second)
            close(u.eventChan)
            return
        }
    }
}

func (u *crawler) Stop() {
    u.quitChan <- 1
}

func (u *crawler) AddObservable(observable Observable) {
    u.observables = append(u.observables, observable)
}

func (u *crawler) RemoveObservable(observable Observable) {
    newObservableList := make([]Observable, 0)
    for _, o := range u.observables {
        if o.Address != observable.Address {
            newObservableList = append(newObservableList, o)
        }
    }
    u.observables = newObservableList
}

func (u *crawler) GetEventChannel() chan event {
    return u.eventChan
}

func (u *crawler) observeAll(observables []Observable) {
    for _, a := range observables {
        go u.observe(a)
    }
}

func (u *crawler) observe(observe Observable) {

    resp, err := http.Get(
        fmt.Sprintf("%v/%v", u.explorerApiUrl, observe.Address),
    )
    if err != nil {
        log.Error(err)
    }
    defer resp.Body.Close()
    body, err := ioutil.ReadAll(resp.Body)
    if err != nil {
        log.Error(err)
    }
    e := event{
        EventType: 0,
        Result: Result{
            resp: body,
        },
    }

    u.eventChan <- e
}

//// TEST ////

func TestCrawler(t *testing.T) {
    observables := make([]Observable, 0)
    for i := 0; i < 100; i++ {
        observable := Observable{
            AccountType: 1,
            Address:     strconv.Itoa(i),
        }
        observables = append(observables, observable)
    }

    crawlSvc := NewService(observables, nil)

    go crawlSvc.Start()

    go removeObservableAfterTimeout(crawlSvc)

    go addObservableAfterTimeout(crawlSvc)

    go stopCrawlerAfterTimeout(crawlSvc)

    for event := range crawlSvc.GetEventChannel() {
        t.Log(event)
    }
}

func stopCrawlerAfterTimeout(crawler Service) {
    time.Sleep(7 * time.Second)
    crawler.Stop()
}

func removeObservableAfterTimeout(crawler Service) {
    time.Sleep(2 * time.Second)
    crawler.RemoveObservable(Observable{
        AccountType: 0,
        Address:     "2",
    })
}

func addObservableAfterTimeout(crawler Service) {
    time.Sleep(5 * time.Second)
    crawler.AddObservable(Observable{
        AccountType: 0,
        Address:     "101",
    })
}

这里最简单的做法是在爬虫结构中引入 RWMutex,无需过多修改解决方案。这将有助于在处理切片时锁定代码的关键部分。请参阅以下更改:

type crawler struct {
    explorerAPIURL string
    interval       *time.Ticker
    errChan        chan error
    quitChan       chan int
    eventChan      chan event
    observables    []Observable
    errorHandler   func(err error)
    mtx            sync.RWMutex
}

...

func (u *crawler) AddObservable(observable Observable) {
    u.mtx.Lock()
    u.observables = append(u.observables, observable)
    u.mtx.Unlock()
}

func (u *crawler) RemoveObservable(observable Observable) {
    u.mtx.Lock()
    newObservableList := make([]Observable, 0)
    for _, o := range u.observables {
        if o.Address != observable.Address {
            newObservableList = append(newObservableList, o)
        }
    }
    u.observables = newObservableList
    u.mtx.Unlock()
}

然而,虽然这解决了争用问题,但我不保证您最终 运行 不会在某个时候陷入内存泄漏问题。例如,在尝试从切片中删除仍未完成执行的可观察对象时。

我的建议是要么推迟切片操作(添加或删除)直到所有执行完成,要么引入检查以在删除时取消可观察对象的执行。

一种解决方案是为切片操作引入额外的通道并在 Start 函数中处理操作。

type crawler struct {
    explorerAPIURL string
    interval       *time.Ticker
    errChan        chan error
    quitChan       chan int
    addChan        chan Observable
    removeChan     chan Observable
    eventChan      chan event
    observables    []Observable
    errorHandler   func(err error)
    mtx            sync.RWMutex
}

// NewService --
func NewService(
    observables []Observable,
    errorHandler func(err error),
) Service {

    interval := time.NewTicker(10 * time.Second)

    return &crawler{
        explorerAPIURL: "http://dummy.com",
        interval:       interval,
        errChan:        make(chan error),
        quitChan:       make(chan int),
        addChan:        make(chan Observable),
        removeChan:     make(chan Observable),
        eventChan:      make(chan event),
        observables:    observables,
        errorHandler:   errorHandler,
    }
}

func (u *crawler) Start() {
    log.Debug("start observe")
    for {
        select {
        case <-u.interval.C:
            log.Debug("observe interval")
            u.observeAll(u.observables)
        case err := <-u.errChan:
            u.errorHandler(err)
        case <-u.quitChan:
            log.Debug("stop observe")
            u.interval.Stop()
            time.Sleep(time.Second)
            close(u.eventChan)
            return
        case o := <-u.addChan:
            u.observables = append(u.observables, o)
        case o := <-u.removeChan:
            newObservableList := make([]Observable, 0)
            for _, observable := range u.observables {
                if o.Address != observable.Address {
                    newObservableList = append(newObservableList, observable)
                }
            }
            u.observables = newObservableList
        }
    }
}

...

func (u *crawler) AddObservable(observable Observable) {
    u.addChan <- observable
}

func (u *crawler) RemoveObservable(observable Observable) {
    u.removeChan <- observable
}

...

//EDIT - I've added the modified versions of these functions as well.
func (u *crawler) observeAll(observables []Observable) {
    g, _ := errgroup.WithContext(context.Background())
    for _, a := range observables {
        g.Go(func() error {
            return u.observe(a)
        })
    }

    if err := g.Wait(); err != nil {
        log.Error(err)
    }
}

func (u *crawler) observe(observe Observable) error {

    resp, err := http.Get(
        fmt.Sprintf("%v/%v", u.explorerAPIURL, observe.Address),
    )
    if err != nil {
        return err
    }
    defer resp.Body.Close()
    body, err := ioutil.ReadAll(resp.Body)
    if err != nil {
        return err
    }
    e := event{
        EventType: 0,
        Result: Result{
            resp: body,
        },
    }

    u.eventChan <- e
    return nil
}