bufio.Scanner:如何知道我们是在处理新行还是截断的字符串?
bufio.Scanner: how to know if we are processing a new line or a truncated string?
我基本上需要处理从流中读取的有限缓冲区中的每一行字符串。使用 bufio.Scanner 我可以逐行使用扫描仪,但不得不使用似乎过于复杂的解决方案来检测 "truncation"。有更好的方法吗?非常感谢。我对任何 lib 或任何东西都不紧张。
func (p *Parser) Read(data []byte, tmpline *string, n int, bufSize int) {
var line string
strdata := string(data)
scanner := bufio.NewScanner(strings.NewReader(strdata))
line = ""
for scanner.Scan() {
if line != "" {
if p.lineProcessor != nil {
p.lineProcessor(line)
}
}
line = scanner.Text()
if *tmpline != "" {
line = *tmpline + line //prepend line here
*tmpline = ""
}
}
if n == bufSize && data[bufSize-1] != '\n' { //detecting a fragment here, overcomplicated?
*tmpline = line
} else {
//fmt.Println("last line >[" + line + "]")
if p.lineProcessor != nil {
p.lineProcessor(line)
}
}
}
func (p *Parser) Handle(r io.Reader, bufSize int) ([]byte, error) {
var out []byte
buf := make([]byte, bufSize)
var n int
var err error
tmpline := ""
for {
n, err = r.Read(buf[:])
if n > 0 {
data := buf[:n]
p.Read(data, &tmpline, n, bufSize)
}
if err != nil {
if err == io.EOF {
err = nil
}
break
}
}
return out, err
}
main
...
cmd := exec.Command("ping", "8.8.8.8")
//var stdout, stderr []byte
var errStdout, errStderr error
//this is the type of stream I want to process **stdoutIn** and **stderrIn**
stdoutIn, _ := cmd.StdoutPipe()
stderrIn, _ := cmd.StderrPipe()
// 10 is the bufSize
parser.Init(stdoutIn, stderrIn, wg, 10, lineProcessor)
...
编辑:我试图实施建议的解决方案。现在我已经破坏了正在工作的东西(尽管看起来过于复杂)。输出似乎很时髦。这是完整的代码和完整的输出:
package main
import (
"bufio"
"fmt"
"io"
"log"
"os/exec"
"sync"
)
type LineProcessor func(string)
type Parser struct {
r io.Reader
lineProcessor LineProcessor
scanner bufio.Scanner
}
func (p *Parser) Init(stdoutIn io.Reader, stderrIn io.Reader,
wg sync.WaitGroup,
lineProcessor LineProcessor) {
wg.Add(2)
p.lineProcessor = lineProcessor
p.scanner = *bufio.NewScanner(stdoutIn)
go p.Handler(stdoutIn, wg)
go p.Handler(stderrIn, wg)
}
func (p *Parser) Handler(r io.Reader, wg sync.WaitGroup) { // ([]byte, error)
var line string
for p.scanner.Scan() {
line = p.scanner.Text()
if p.lineProcessor != nil {
p.lineProcessor(line)
}
}
wg.Done()
}
func lineProcessor(line string) {
fmt.Println(line)
}
func main() {
var err error
cmd := exec.Command("ping", "8.8.8.8")
var errStdout, errStderr error
stdoutIn, _ := cmd.StdoutPipe()
stderrIn, _ := cmd.StderrPipe()
var parser Parser
var wg sync.WaitGroup
parser.Init(stdoutIn, stderrIn, wg, lineProcessor)
err = cmd.Start()
if err != nil {
log.Fatalf("cmd.Start() failed with '%s'\n", err)
}
fmt.Printf("Waiting\n")
wg.Wait()
err = cmd.Wait()
if err != nil {
log.Fatalf("cmd.Run() failed with %s\n", err)
}
if errStdout != nil || errStderr != nil {
log.Fatal("failed to capture stdout or stderr\n")
}
}
$./buggysolution
Waiting
PING 8.8.8.8 (8.8.8.8): 56 data bytes
64 bytes from 8.8.8.8: icmp_seq=0 ttl=52 time=4.786 ms
64 bytes from 8.8.8.8: icmp_seq=2 ttl=52 time=3.661 ms
64 bytes from 8.8.8.8: icmp_seq=4 ttl=52 time=4.117 ms
64 bytes from 8.8.8.8: icmp_seq=6 ttl=52 time=4.172 ms
64 bytes from 8.8.8.8: icmp_seq=8 ttl=52 time=3.584 ms
64 bytes from 8.8.8.8: icmp_seq=10 ttl=52 time=4.301 ms
mp_seq=11 ttl=52 time=4.534 ms
64 bytes from 8.8.8.8: icmp_seq=12 ttl=52 time=4.349 ms
64 bytes from 8.8.8.8: icmp_seq=13 ttl=52 time=4.923 ms
64 bytes from 8.8.8.8: icmp_seq=14 ttl=52 time=4.349 ms
64 bytes from 8.8.8.8: icmp_seq=15 ttl=52 time=4.106 ms
64 bytes from 8.8.8.8: icmp_seq=16 ttl=52 time=4.270 ms
64 bytes from 8.8.8.8: icmp_seq=17 ttl=52 time=4.231 ms
64 bytes from 8.8.8.8: icmp_seq=18 ttl=52 time=4.915 ms
64 bytes from 8.8.8.8: icmp_seq=19 ttl=52 time=4.487 ms
64 bytes from 8.8.8.8: icmp_seq=20 ttl=52 time=4.182 ms
64 bytes from 8.8.8.8: icmp_seq=21 ttl=52 time=4.369 ms
64 bytes from 8.8.8.8: icmp_seq=22 ttl=52 time=4.287 ms
64 bytes from 8.8.8.8: icmp_seq=23 ttl=52 time=3.922 ms
64 bytes from 8.8.8.8: icmp_seq=24 ttl=52 time=4.905 ms
64 bytes from 8.8.8.8: icmp_seq=25 ttl=52 time=4.226 ms
64 bytes from 8.8.8.8: icmp_seq=27 ttl=52 time=4.052 ms
64 bytes from 8.8.8.8: icmp_seq=29 ttl=52 time=3.453 ms
64 bytes from 8.8.8.8: icmp_seq=31 ttl=52 time=5.103 ms
64 bytes from 8.8.8.8: icmp_seq=33 ttl=52 time=4.066 ms
64 bytes from 8.8.8.8: icmp_seq=35 ttl=52 time=4.128 ms
64 bytes from 8.8.8.8: icmp_seq=37 ttl=52 time=4.982 ms
64 bytes from 8.8.8.8: icmp_seq=1 ttl=52 time=4.206 ms
.64 bytes from 8.8.8.8: icmp_seq=39 ttl=52 time=4.215 ms
.8: icmp_seq=3 ttl=52 time=4.218 ms
tl=52 time=3.650 ms
8: icmp_seq=2 ttl=52 time=3.661 ms
ttl=52 time=4.791 ms
: icmp_seq=5 ttl=52 time=3.581 ms
ttl=52 time=4.211 ms
icmp_seq=4 ttl=52 time=4.117 ms
4 ttl=52 time=4.245 ms
icmp_seq=7 ttl=52 time=4.955 ms
45 ttl=52 time=4.518 ms
cmp_seq=6 ttl=52 time=4.172 ms
=46 ttl=52 time=4.764 ms
seq 9
^C
感谢任何修复。
这不是 bufio.Scanner
的使用方式。
bufio.Scanner
接受一个 reader 并且可以 return 行直接从中取出。
如果令牌大于 bufio.Scanner.maxTokenSize
,bufio.Scanner
将失败,默认情况下为 MaxScanTokenSize
,但不会 return 截断结果。
您还可以在 https://golang.org/src/bufio/scan_test.go#L214 (not that the MaxTokenSize method being used is defined with a special scheme to exists only during tests see https://golang.org/src/bufio/export_test.go#L16)
查看测试套件
您还可以在扫描之前定义自己的底层缓冲区以更改该值https://golang.org/pkg/bufio/#Scanner.Buffer
要推出您自己的版本,它可能看起来像这样 https://play.golang.org/p/kDcO6eZPVhY 但是沿着这条路走下去需要您编写额外的测试和基准。
经过多次评论,这里是 OP 正在寻找的内容:
package main
import (
"bufio"
"io"
"log"
"os/exec"
)
func main() {
cmd := exec.Command("ping", "8.8.8.8")
var out io.Reader
{
stdout, err := cmd.StdoutPipe()
if err != nil {
log.Fatal(err)
}
stderr, err := cmd.StderrPipe()
if err != nil {
log.Fatal(err)
}
out = io.MultiReader(stdout, stderr)
}
if err := cmd.Start(); err != nil {
log.Fatal(err)
}
// defer cmd.Process.Kill()
s := bufio.NewScanner(out)
for s.Scan() {
log.Println(s.Text())
}
// if out closes, cmd closed.
log.Println("all done")
}
我基本上需要处理从流中读取的有限缓冲区中的每一行字符串。使用 bufio.Scanner 我可以逐行使用扫描仪,但不得不使用似乎过于复杂的解决方案来检测 "truncation"。有更好的方法吗?非常感谢。我对任何 lib 或任何东西都不紧张。
func (p *Parser) Read(data []byte, tmpline *string, n int, bufSize int) {
var line string
strdata := string(data)
scanner := bufio.NewScanner(strings.NewReader(strdata))
line = ""
for scanner.Scan() {
if line != "" {
if p.lineProcessor != nil {
p.lineProcessor(line)
}
}
line = scanner.Text()
if *tmpline != "" {
line = *tmpline + line //prepend line here
*tmpline = ""
}
}
if n == bufSize && data[bufSize-1] != '\n' { //detecting a fragment here, overcomplicated?
*tmpline = line
} else {
//fmt.Println("last line >[" + line + "]")
if p.lineProcessor != nil {
p.lineProcessor(line)
}
}
}
func (p *Parser) Handle(r io.Reader, bufSize int) ([]byte, error) {
var out []byte
buf := make([]byte, bufSize)
var n int
var err error
tmpline := ""
for {
n, err = r.Read(buf[:])
if n > 0 {
data := buf[:n]
p.Read(data, &tmpline, n, bufSize)
}
if err != nil {
if err == io.EOF {
err = nil
}
break
}
}
return out, err
}
main
...
cmd := exec.Command("ping", "8.8.8.8")
//var stdout, stderr []byte
var errStdout, errStderr error
//this is the type of stream I want to process **stdoutIn** and **stderrIn**
stdoutIn, _ := cmd.StdoutPipe()
stderrIn, _ := cmd.StderrPipe()
// 10 is the bufSize
parser.Init(stdoutIn, stderrIn, wg, 10, lineProcessor)
...
编辑:我试图实施建议的解决方案。现在我已经破坏了正在工作的东西(尽管看起来过于复杂)。输出似乎很时髦。这是完整的代码和完整的输出:
package main
import (
"bufio"
"fmt"
"io"
"log"
"os/exec"
"sync"
)
type LineProcessor func(string)
type Parser struct {
r io.Reader
lineProcessor LineProcessor
scanner bufio.Scanner
}
func (p *Parser) Init(stdoutIn io.Reader, stderrIn io.Reader,
wg sync.WaitGroup,
lineProcessor LineProcessor) {
wg.Add(2)
p.lineProcessor = lineProcessor
p.scanner = *bufio.NewScanner(stdoutIn)
go p.Handler(stdoutIn, wg)
go p.Handler(stderrIn, wg)
}
func (p *Parser) Handler(r io.Reader, wg sync.WaitGroup) { // ([]byte, error)
var line string
for p.scanner.Scan() {
line = p.scanner.Text()
if p.lineProcessor != nil {
p.lineProcessor(line)
}
}
wg.Done()
}
func lineProcessor(line string) {
fmt.Println(line)
}
func main() {
var err error
cmd := exec.Command("ping", "8.8.8.8")
var errStdout, errStderr error
stdoutIn, _ := cmd.StdoutPipe()
stderrIn, _ := cmd.StderrPipe()
var parser Parser
var wg sync.WaitGroup
parser.Init(stdoutIn, stderrIn, wg, lineProcessor)
err = cmd.Start()
if err != nil {
log.Fatalf("cmd.Start() failed with '%s'\n", err)
}
fmt.Printf("Waiting\n")
wg.Wait()
err = cmd.Wait()
if err != nil {
log.Fatalf("cmd.Run() failed with %s\n", err)
}
if errStdout != nil || errStderr != nil {
log.Fatal("failed to capture stdout or stderr\n")
}
}
$./buggysolution
Waiting
PING 8.8.8.8 (8.8.8.8): 56 data bytes
64 bytes from 8.8.8.8: icmp_seq=0 ttl=52 time=4.786 ms
64 bytes from 8.8.8.8: icmp_seq=2 ttl=52 time=3.661 ms
64 bytes from 8.8.8.8: icmp_seq=4 ttl=52 time=4.117 ms
64 bytes from 8.8.8.8: icmp_seq=6 ttl=52 time=4.172 ms
64 bytes from 8.8.8.8: icmp_seq=8 ttl=52 time=3.584 ms
64 bytes from 8.8.8.8: icmp_seq=10 ttl=52 time=4.301 ms
mp_seq=11 ttl=52 time=4.534 ms
64 bytes from 8.8.8.8: icmp_seq=12 ttl=52 time=4.349 ms
64 bytes from 8.8.8.8: icmp_seq=13 ttl=52 time=4.923 ms
64 bytes from 8.8.8.8: icmp_seq=14 ttl=52 time=4.349 ms
64 bytes from 8.8.8.8: icmp_seq=15 ttl=52 time=4.106 ms
64 bytes from 8.8.8.8: icmp_seq=16 ttl=52 time=4.270 ms
64 bytes from 8.8.8.8: icmp_seq=17 ttl=52 time=4.231 ms
64 bytes from 8.8.8.8: icmp_seq=18 ttl=52 time=4.915 ms
64 bytes from 8.8.8.8: icmp_seq=19 ttl=52 time=4.487 ms
64 bytes from 8.8.8.8: icmp_seq=20 ttl=52 time=4.182 ms
64 bytes from 8.8.8.8: icmp_seq=21 ttl=52 time=4.369 ms
64 bytes from 8.8.8.8: icmp_seq=22 ttl=52 time=4.287 ms
64 bytes from 8.8.8.8: icmp_seq=23 ttl=52 time=3.922 ms
64 bytes from 8.8.8.8: icmp_seq=24 ttl=52 time=4.905 ms
64 bytes from 8.8.8.8: icmp_seq=25 ttl=52 time=4.226 ms
64 bytes from 8.8.8.8: icmp_seq=27 ttl=52 time=4.052 ms
64 bytes from 8.8.8.8: icmp_seq=29 ttl=52 time=3.453 ms
64 bytes from 8.8.8.8: icmp_seq=31 ttl=52 time=5.103 ms
64 bytes from 8.8.8.8: icmp_seq=33 ttl=52 time=4.066 ms
64 bytes from 8.8.8.8: icmp_seq=35 ttl=52 time=4.128 ms
64 bytes from 8.8.8.8: icmp_seq=37 ttl=52 time=4.982 ms
64 bytes from 8.8.8.8: icmp_seq=1 ttl=52 time=4.206 ms
.64 bytes from 8.8.8.8: icmp_seq=39 ttl=52 time=4.215 ms
.8: icmp_seq=3 ttl=52 time=4.218 ms
tl=52 time=3.650 ms
8: icmp_seq=2 ttl=52 time=3.661 ms
ttl=52 time=4.791 ms
: icmp_seq=5 ttl=52 time=3.581 ms
ttl=52 time=4.211 ms
icmp_seq=4 ttl=52 time=4.117 ms
4 ttl=52 time=4.245 ms
icmp_seq=7 ttl=52 time=4.955 ms
45 ttl=52 time=4.518 ms
cmp_seq=6 ttl=52 time=4.172 ms
=46 ttl=52 time=4.764 ms
seq 9
^C
感谢任何修复。
这不是 bufio.Scanner
的使用方式。
bufio.Scanner
接受一个 reader 并且可以 return 行直接从中取出。
bufio.Scanner.maxTokenSize
,bufio.Scanner
将失败,默认情况下为 MaxScanTokenSize
,但不会 return 截断结果。
您还可以在 https://golang.org/src/bufio/scan_test.go#L214 (not that the MaxTokenSize method being used is defined with a special scheme to exists only during tests see https://golang.org/src/bufio/export_test.go#L16)
查看测试套件您还可以在扫描之前定义自己的底层缓冲区以更改该值https://golang.org/pkg/bufio/#Scanner.Buffer
要推出您自己的版本,它可能看起来像这样 https://play.golang.org/p/kDcO6eZPVhY 但是沿着这条路走下去需要您编写额外的测试和基准。
经过多次评论,这里是 OP 正在寻找的内容:
package main
import (
"bufio"
"io"
"log"
"os/exec"
)
func main() {
cmd := exec.Command("ping", "8.8.8.8")
var out io.Reader
{
stdout, err := cmd.StdoutPipe()
if err != nil {
log.Fatal(err)
}
stderr, err := cmd.StderrPipe()
if err != nil {
log.Fatal(err)
}
out = io.MultiReader(stdout, stderr)
}
if err := cmd.Start(); err != nil {
log.Fatal(err)
}
// defer cmd.Process.Kill()
s := bufio.NewScanner(out)
for s.Scan() {
log.Println(s.Text())
}
// if out closes, cmd closed.
log.Println("all done")
}