没有收到来自频道的消息

Doesn't receive a message from a channel

编辑:

在我添加我正在使用的一小部分文件 (7 GB) 并尝试 运行 程序后,我可以看到:

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan receive]:
main.main()
    /media/developer/golang/manual/examples/sp/v2/sp.v2.go:71 +0x4a9
exit status 2

情况:

我是 GO 的新手,所以如果我的问题真的很简单,我很抱歉。

我正在尝试流式传输 xml 文件、拆分文档,然后在不同的 GO 例程中解析它们。

我正在使用的 XML 文件示例:

<?xml version="1.0" encoding="UTF-8"?>
<osm version="0.6" generator="CGImap 0.0.2">
    <relation id="56688" user="kmvar" uid="56190" visible="true" version="28" changeset="6947637" timestamp="2011-01-12T14:23:49Z">
        <member type="node" ref="294942404" role=""/>
        <member type="node" ref="364933006" role=""/>
        <tag k="name" v="Küstenbus Linie 123"/>
        <tag k="network" v="VVW"/>
        <tag k="route" v="bus"/>
        <tag k="type" v="route"/>
    </relation>
    <relation id="98367" user="jdifh" uid="92834" visible="true" version="28" changeset="6947637" timestamp="2011-01-12T14:23:49Z">
        <member type="node" ref="294942404" role=""/>
        <member type="way" ref="4579143" role=""/>
        <member type="node" ref="249673494" role=""/>
        <tag k="name" v="Küstenbus Linie 123"/>
        <tag k="network" v="VVW"/>
        <tag k="operator" v="Regionalverkehr Küste"/>
        <tag k="ref" v="123"/>
    </relation>
    <relation id="72947" user="schsu" uid="92374" visible="true" version="28" changeset="6947637" timestamp="2011-01-12T14:23:49Z">
        <member type="node" ref="294942404" role=""/>
        <tag k="name" v="Küstenbus Linie 123"/>
        <tag k="type" v="route"/>
    </relation>
    <relation id="93742" user="doiff" uid="61731" visible="true" version="28" changeset="6947637" timestamp="2011-01-12T14:23:49Z">
        <member type="node" ref="294942404" role=""/>
        <member type="node" ref="364933006" role=""/>
        <tag k="route" v="bus"/>
        <tag k="type" v="route"/>
    </relation>
</osm>

我有这段代码:

package main

import (
  "encoding/xml"
  "bufio"
  "fmt"
  "os"
  "io"
)

type RS struct {
  I string `xml:"id,attr"`
  M []struct {
    I string `xml:"ref,attr"`
    T string `xml:"type,attr"`
    R string `xml:"role,attr"`
  } `xml:"member"`
  T []struct {
    K string `xml:"k,attr"`
    V string `xml:"v,attr"`
  } `xml:"tag"`
}

func main() {
  p1D, err := os.Open("/media/developer/Transcend/osm/xml/relations.xml")

  if err != nil {
    fmt.Println(err)
    os.Exit(1)
  }

  defer p1D.Close()

  reader := bufio.NewReader(p1D)

  var count int32
  var element string

  channel := make(chan RS) // channel

  for {
    p2Rc, err := reader.ReadSlice('\n')
    if err != nil {
      if err == io.EOF {
        break
      } else {
        fmt.Println(err)
        os.Exit(1)
      }
    }

    var p2Rs = string(p2Rc)

    if p2Rc[2] == 114 {
      count++

      if (count != 1) {
        go parseRelation(element, channel)
      }

      element = ""
      element += p2Rs
    } else {
      element += p2Rs
    }
  }

  for i := 0; i < 5973974; i++ {
    fmt.Println(<- channel)
  }
}

func parseRelation(p1E string, channel chan RS) {
  var rs RS
  xml.Unmarshal([]byte(p1E), &rs)

  channel <- rs
}

它应该打印每个结构,但我什么也没看到。程序挂起。

我测试了 streamer 和 splitter(刚刚在函数 parseRelation 中添加了 fmt.Println(rs),然后将消息发送到频道)。我可以看到结构。所以,问题出在发送和接收消息上。

问题:

我不知道如何解决这个问题。尝试更改频道中消息的类型(从 RSstring)并且每次只发送一个字符串。但这也没有帮助(我什么也看不到)

我不知道这是否对你有帮助,但你的条件 if p2Rc[2]==114 永远不会满足,那么你继续开始收听频道。从不接收输入。此外,还有更好的收听频道的方法,例如select,这里是一个例子https://tour.golang.org/concurrency/5

我认为这里的主要问题是这个(代码)是用来做什么的?上述条件属于该过程的何处?如果清楚的话,我可以更新更好的回复。

首先,让我们解决这个问题:您不能逐行解析 XML。您很幸运,您的文件恰好是每行一个标签,但这不能想当然。您必须 解析整个 XML 文档。

通过逐行处理,您试图将 <tag><member> 推入为 <relation> 设计的结构中。相反,使用 xml.NewDecoder 并让它为您处理文件。

package main

import (
    "encoding/xml"
    "fmt"
    "os"
    "log"
)

type Osm struct {
    XMLName     xml.Name    `xml:"osm"`
    Relations   []Relation  `xml:"relation"`
}
type Relation struct {
    XMLName     xml.Name    `xml:"relation"`
    ID          string      `xml:"id,attr"`
    User        string      `xml:"user,attr"`
    Uid         string      `xml:"uid,attr"`
    Members     []Member    `xml:"member"`
    Tags        []Tag       `xml:"tag"`
}
type Member struct {
    XMLName     xml.Name    `xml:"member"`
    Ref         string      `xml:"ref,attr"`
    Type        string      `xml:"type,attr"`
    Role        string      `xml:"role,attr"`
}
type Tag struct {
    XMLName     xml.Name    `xml:"tag"`
    Key         string      `xml:"k,attr"`
    Value       string      `xml:"v,attr"`
}

func main() {
    reader, err := os.Open("test.xml")
    if err != nil {
        log.Fatal(err)
    }
    defer reader.Close()

    decoder := xml.NewDecoder(reader)

    osm := &Osm{}
    err = decoder.Decode(&osm)
    if err != nil {
        log.Fatal(err)
    }
    fmt.Println(osm)
}

Osm 和其他结构类似于您期望的 XML 模式。 decoder.Decode(&osm) 应用该架构。

如果你只想提取一部分 XML, see the answers to How to extract part of an XML file as a string?.

答案的其余部分将仅涵盖通道和协程的使用。 XML 部分将被删除。


如果你添加一些调试语句,你会发现 parseRelation 从未被调用,这意味着 channel 是空的,而 fmt.Println(<- channel) 等待一个永远不会关闭的空通道.因此,一旦您完成处理,请关闭频道。

  for {
    p2Rc, err := reader.ReadSlice('\n')

    ...
  }
  close(channel)

现在我们得到 { [] []} 5973974 次。

for i := 0; i < 5973974; i++ {
  fmt.Println(<- channel)
}

尝试从频道读取 5973974 次。这违背了渠道的意义。相反,read from the channel using range.

for thing := range channel {
    fmt.Println(thing)
}

现在至少它完成了!

但是有一个新问题。如果它真的找到了一个东西,比如如果你把 if p2Rc[2] == 114 { 改成 if p2Rc[2] == 32 {,你会得到一个 panic: send on closed channel。这是因为 parseRelation 与 reader 并行 运行 并且可能会在主要读取代码完成并关闭通道后尝试写入。在关闭频道之前,您必须确保使用该频道的每个人都已完成。

要解决此问题,需要进行相当大的重新设计。


这是一个简单程序的示例,它从文件中读取行,将它们放入通道,然后让工作人员从该通道读取。

func main() {
    reader, err := os.Open("test.xml")
    if err != nil {
        log.Fatal(err)
    }
    defer reader.Close()

    // Use the simpler bufio.Scanner
    scanner := bufio.NewScanner(reader)

    // A buffered channel for input
    lines := make(chan string, 10)

    // Work on the lines
    go func() {
        for line := range lines {
            fmt.Println(line)
        }
    }()

    // Read lines into the channel
    for scanner.Scan() {
        lines <- scanner.Text()
    }
    if err := scanner.Err(); err != nil {
        log.Fatal(err)
    }

    // When main exits, channels gracefully close.
}

这很好用,因为 main 是特殊的,它会在退出时清理频道。但是如果 reader 和 writer 都是 goroutines 呢?

// A buffered channel for input
lines := make(chan string, 10)

// Work on the lines
go func() {
    for line := range lines {
        fmt.Println(line)
    }
}()

// Read lines into the channel
go func() {
    for scanner.Scan() {
        lines <- scanner.Text()
    }
    if err := scanner.Err(); err != nil {
        log.Fatal(err)
    }
}()

空的。 main 在 goroutine 可以完成它们的工作之前退出并关闭通道。我们需要一种方法让 main 知道要等到处理完成。有几种方法可以做到这一点。一种是 another channel to synchronize processing.

// A buffered channel for input
lines := make(chan string, 10)

// A channel for main to wait for
done := make(chan bool, 1)

// Work on the lines
go func() {
    for line := range lines {
        fmt.Println(line)
    }

    // Indicate the worker is done
    done <- true
}()

// Read lines into the channel
go func() {
    // Explicitly close `lines` when we're done so the workers can finish
    defer close(lines)

    for scanner.Scan() {
        lines <- scanner.Text()
    }
    if err := scanner.Err(); err != nil {
        log.Fatal(err)
    }
}()

// main will wait until there's something to read from `done`
<-done

现在 main 将触发 reader 和 worker goroutines 并缓冲等待 done 上的东西。 reader 将填充 lines 直到完成读取,然后关闭它。同时,工作人员将从 lines 读取并在完成读取后写入 done

另一种选择是使用 sync.WaitGroup

// A buffered channel for input
lines := make(chan string, 10)

var wg sync.WaitGroup

// Note that there is one more thing to wait for
wg.Add(1)
go func() {
    // Tell the WaitGroup we're done
    defer wg.Done()

    for line := range lines {
        fmt.Println(line)
    }
}()

// Read lines into the channel
go func() {
    defer close(lines)

    for scanner.Scan() {
        lines <- scanner.Text()
    }
    if err := scanner.Err(); err != nil {
        log.Fatal(err)
    }
}()

// Wait until everything in the WaitGroup is done
wg.Wait()

和以前一样,main 启动 reader 和 worker goroutine,但现在它在启动 worker 之前将 1 添加到 WaitGroup。然后它一直等到 wg.Wait() return 秒。 reader 与以前一样工作,完成后关闭 lines 通道。工作人员现在在完成递减 WaitGroup 的计数并允许 wg.Wait() 到 return 时调用 wg.Done()

每种技术都有优点和缺点。 done 更灵活,链条更好,如果你能绕过它会更安全。 WaitGroups 更简单,更容易理解,但要求每个 goroutine 共享一个变量。


如果我们想添加到这个处理链中,我们可以这样做。假设我们有一个读取行的 goroutine,一个在 XML 元素中处理它们,另一个对元素做一些事情。

// A buffered channel for input
lines := make(chan []byte, 10)
elements := make(chan *RS)

var wg sync.WaitGroup

// Worker goroutine, turn lines into RS structs
wg.Add(1)
go func() {
    defer wg.Done()
    defer close(elements)

    for line := range lines {
        if line[2] == 32 {
            fmt.Println("Begin")
            fmt.Println(string(line))
            fmt.Println("End")

            rs := &RS{}
            xml.Unmarshal(line, &rs)
            elements <- rs
        }
    }
}()

// File reader
go func() {
    defer close(lines)

    for scanner.Scan() {
        lines <- scanner.Bytes()
    }
    if err := scanner.Err(); err != nil {
        log.Fatal(err)
    }
}()

// Element reader
wg.Add(1)
go func() {
    defer wg.Done()

    for element := range elements {
        fmt.Println(element)
    }
}()

wg.Wait()

这会产生空结构,因为您正试图将 XML 的各个行推入表示完整 <relationship> 标记的结构中。但它演示了如何向链中添加更多工人。