如何在处理输出时暂停和取消暂停节点对象流
how to pause and unpause node object stream while processing its output
我目前正在通过发出 'line'
事件的转换流 运行 逐行处理文件流。我希望能够在发现当前行符合某些条件后暂停输入文件流,开始处理新流,并在完成后逐行恢复处理原始流。我已将其浓缩为以下最小示例:
test.coffee:
fs = require 'fs'
TestTransform = require './test-transform'
inStream = new TestTransform
fs.createReadStream("./test.coffee").pipe(inStream)
inStream.on 'line', (line) ->
process.stdout.write "-->"
if line.match /line\.match/g
process.stdout.write line
console.error "PAUSE"
inStream.pause()
fs.createReadStream("./test.coffee").pipe(process.stdout).on 'end', ->
console.error "UNPAUSE"
inStream.resume()
else
process.stdout.write line
测试-transform.coffee:
Transform = require('stream').Transform
module.exports =
class TestTransform extends Transform
constructor: ->
Transform.call @, readableObjectMode: true
@buffer = ""
pushLines: ->
newlineIndex = @buffer.indexOf "\n"
while newlineIndex isnt -1
@push @buffer.substr(0, newlineIndex + 1)
@emit 'line', @buffer.substr(0, newlineIndex + 1)
@buffer = @buffer.substr(newlineIndex + 1)
newlineIndex = @buffer.indexOf "\n"
_transform: (chunk, enc, cb) ->
@buffer = @buffer + chunk.toString()
@pushLines()
cb?()
_flush: (cb) ->
@pushLines()
@buffer += "\n" # ending newline
@push @buffer
@emit 'line', @buffer # push last line
@buffer = ""
cb?()
(不用太在意Transform流,只是举个例子。)反正coffee test.coffee
的输出是这样的:
-->fs = require 'fs'
-->
-->TestTransform = require './test-transform'
-->
-->inStream = new TestTransform
-->
-->fs.createReadStream("./test.coffee").pipe(inStream)
-->
-->inStream.on 'line', (line) ->
--> process.stdout.write "-->"
--> if line.match /line\.match/g
PAUSE
--> process.stdout.write line
--> console.error "PAUSE"
--> inStream.pause()
--> fs.createReadStream("./test.coffee").pipe(process.stdout).on 'end', ->
--> console.error "UNPAUSE"
--> inStream.unpause()
--> else
--> process.stdout.write line
-->
fs = require 'fs'
TestTransform = require './test-transform'
inStream = new TestTransform
fs.createReadStream("./test.coffee").pipe(inStream)
inStream.on 'line', (line) ->
process.stdout.write "-->"
if line.match /line\.match/g
process.stdout.write line
console.error "PAUSE"
inStream.pause()
fs.createReadStream("./test.coffee").pipe(process.stdout).on 'end', ->
console.error "UNPAUSE"
inStream.unpause()
else
process.stdout.write line
很明显,管道没有暂停,它只是继续直到完成(即使 PAUSE
正像预期的那样得到 运行),并且由于 "UNPAUSE"
是也永远不会被写出,'end'
回调永远不会触发。将流从转换流切换到 pause/unpause 到 readStream 似乎也不起作用。我从这种行为中假设节点流以某种方式不尊重事件回调中的 pause/unpause。
可能还有另一种方法可以在不调用 pause/unpause 的情况下完成此操作;如果有某种方式喜欢等待流结束并暂停当前执行线程,那将有效地完成我正在尝试做的事情。
如果我没有正确理解问题,这里有一个使用 Dust.js 的简单 Node 应用程序可以解决问题。
Dust 是一个模板引擎,但它最好的特性之一是它对 Node Streams 的原生理解。本例使用 Dust 2.7.0.
我正在使用 node-byline
替代您的转换流,但它做同样的事情——逐行读取流。
var fs = require('fs'),
byline = require('byline'),
dust = require('dustjs-linkedin');
var stream = byline(fs.createReadStream('./test.txt', { encoding: 'utf8' }));
var template = dust.loadSource(dust.compile('{#byline}--> {.|s}{~n}{match}{/byline}'));
dust.stream(template, {
byline: stream,
match: function(chunk, context) {
var currentLine = context.current();
if(currentLine.match(/line\.match/g)) {
return fs.createReadStream('./test.txt', 'utf8');
}
return chunk;
}
}).pipe(process.stdout);
这是我程序的输出:
$ node index.js
--> fs = require 'fs'
--> TestTransform = require './test-transform'
--> inStream = new TestTransform
--> fs.createReadStream("./test.coffee").pipe(inStream)
--> inStream.on 'line', (line) ->
--> process.stdout.write "-->"
--> if line.match /line\.match/g
fs = require 'fs'
TestTransform = require './test-transform'
inStream = new TestTransform
fs.createReadStream("./test.coffee").pipe(inStream)
inStream.on 'line', (line) ->
process.stdout.write "-->"
if line.match /line\.match/g
process.stdout.write line
console.error "PAUSE"
inStream.pause()
fs.createReadStream("./test.coffee").pipe(process.stdout).on 'end', ->
console.error "UNPAUSE"
inStream.resume()
else
process.stdout.write line
--> process.stdout.write line
--> console.error "PAUSE"
--> inStream.pause()
--> fs.createReadStream("./test.coffee").pipe(process.stdout).on 'end', ->
--> console.error "UNPAUSE"
--> inStream.resume()
--> else
--> process.stdout.write line
如您所见,它正确地交错了输出。如果我可以进一步详细说明 Dust 部分的工作原理,请告诉我。
编辑:这是对 Dust 模板的具体解释。
{#byline} {! look for the context variable named `byline` !}
{! okay, it's a stream. For each `data` event, output this stuff once !}
-->
{.|s} {! output the current `data`. Use |s to turn off HTML escaping !}
{~n} {! a newline !}
{match} {! look up the variable called `match` !}
{! okay, it's a function. Run it and insert the result !}
{! if the result is a stream, stream it in. !}
{/byline} {! done looping !}
我实际上也找到了一个单独的答案;不那么漂亮,但也能用。
本质上,pause()
仅暂停管道流的输出(在 "flowing" 模式下);因为我正在收听 'line'
事件,所以它没有流动,所以 pause
当然什么也没做。所以第一个解决方案是使用 removeListener
而不是 pause
,这确实有效地停止了流式传输。该文件现在看起来像:
fs = require 'fs'
TestTransform = require './test-transform'
inStream = new TestTransform
fs.createReadStream("./test.coffee").pipe(inStream)
c = (line) ->
process.stdout.write "-->"
if line.match /line\.match/g
process.stdout.write line
console.error "PAUSE"
inStream.removeListener 'line', c
f = fs.createReadStream("./test.coffee")
f.on 'end', ->
console.error "UNPAUSE"
inStream.on 'line', c
f.pipe(process.stdout)
else
process.stdout.write line
inStream.on 'line', c
这会产生 几乎 有效的输出:
-->fs = require 'fs'
-->TestTransform = require './test-transform'
-->inStream = new TestTransform
-->fs.createReadStream("./test.coffee").pipe(inStream)
-->c = (line) ->
--> process.stdout.write "-->"
--> if line.match /line\.match/g
PAUSE
fs = require 'fs'
TestTransform = require './test-transform'
inStream = new TestTransform
fs.createReadStream("./test.coffee").pipe(inStream)
c = (line) ->
process.stdout.write "-->"
if line.match /line\.match/g
process.stdout.write line
console.error "PAUSE"
inStream.removeListener 'line', c
f = fs.createReadStream("./test.coffee")
f.on 'end', ->
console.error "UNPAUSE"
inStream.on 'line', c
f.pipe(process.stdout)
else
process.stdout.write line
inStream.on 'line', c
UNPAUSE
但是,当我删除侦听器时,原来的可读流似乎刚刚停止;这具有某种扭曲的意义(我猜节点垃圾在所有侦听器已被删除时收集其可读流)。所以我找到的最终工作解决方案依赖于管道。由于我上面显示的 Transform 流也将其输出逐行推送到任何 'data'
侦听器,因此 pause()
可以在这里有效地用于其原始 objective,而不只是杀死流。最终输出:
fs = require 'fs'
TestTransform = require './test-transform'
inStream = new TestTransform
fs.createReadStream("./test.coffee").pipe(inStream)
inStream.on 'data', (chunk) ->
line = chunk.toString()
process.stdout.write "-->#{line}"
if line.match /line\.match/g
inStream.pause()
f = fs.createReadStream("./test.coffee")
f.on 'end', ->
inStream.resume()
f.pipe(process.stdout)
输出:
-->fs = require 'fs'
-->TestTransform = require './test-transform'
-->inStream = new TestTransform
-->fs.createReadStream("./test.coffee").pipe(inStream)
-->inStream.on 'data', (chunk) ->
--> line = chunk.toString()
--> process.stdout.write "-->#{line}"
--> if line.match /line\.match/g
fs = require 'fs'
TestTransform = require './test-transform'
inStream = new TestTransform
fs.createReadStream("./test.coffee").pipe(inStream)
inStream.on 'data', (chunk) ->
line = chunk.toString()
process.stdout.write "-->#{line}"
if line.match /line\.match/g
inStream.pause()
f = fs.createReadStream("./test.coffee")
f.on 'end', ->
inStream.resume()
f.pipe(process.stdout)
--> inStream.pause()
--> f = fs.createReadStream("./test.coffee")
--> f.on 'end', ->
--> inStream.resume()
--> f.pipe(process.stdout)
-->
这是预期的结果。
我目前正在通过发出 'line'
事件的转换流 运行 逐行处理文件流。我希望能够在发现当前行符合某些条件后暂停输入文件流,开始处理新流,并在完成后逐行恢复处理原始流。我已将其浓缩为以下最小示例:
test.coffee:
fs = require 'fs'
TestTransform = require './test-transform'
inStream = new TestTransform
fs.createReadStream("./test.coffee").pipe(inStream)
inStream.on 'line', (line) ->
process.stdout.write "-->"
if line.match /line\.match/g
process.stdout.write line
console.error "PAUSE"
inStream.pause()
fs.createReadStream("./test.coffee").pipe(process.stdout).on 'end', ->
console.error "UNPAUSE"
inStream.resume()
else
process.stdout.write line
测试-transform.coffee:
Transform = require('stream').Transform
module.exports =
class TestTransform extends Transform
constructor: ->
Transform.call @, readableObjectMode: true
@buffer = ""
pushLines: ->
newlineIndex = @buffer.indexOf "\n"
while newlineIndex isnt -1
@push @buffer.substr(0, newlineIndex + 1)
@emit 'line', @buffer.substr(0, newlineIndex + 1)
@buffer = @buffer.substr(newlineIndex + 1)
newlineIndex = @buffer.indexOf "\n"
_transform: (chunk, enc, cb) ->
@buffer = @buffer + chunk.toString()
@pushLines()
cb?()
_flush: (cb) ->
@pushLines()
@buffer += "\n" # ending newline
@push @buffer
@emit 'line', @buffer # push last line
@buffer = ""
cb?()
(不用太在意Transform流,只是举个例子。)反正coffee test.coffee
的输出是这样的:
-->fs = require 'fs'
-->
-->TestTransform = require './test-transform'
-->
-->inStream = new TestTransform
-->
-->fs.createReadStream("./test.coffee").pipe(inStream)
-->
-->inStream.on 'line', (line) ->
--> process.stdout.write "-->"
--> if line.match /line\.match/g
PAUSE
--> process.stdout.write line
--> console.error "PAUSE"
--> inStream.pause()
--> fs.createReadStream("./test.coffee").pipe(process.stdout).on 'end', ->
--> console.error "UNPAUSE"
--> inStream.unpause()
--> else
--> process.stdout.write line
-->
fs = require 'fs'
TestTransform = require './test-transform'
inStream = new TestTransform
fs.createReadStream("./test.coffee").pipe(inStream)
inStream.on 'line', (line) ->
process.stdout.write "-->"
if line.match /line\.match/g
process.stdout.write line
console.error "PAUSE"
inStream.pause()
fs.createReadStream("./test.coffee").pipe(process.stdout).on 'end', ->
console.error "UNPAUSE"
inStream.unpause()
else
process.stdout.write line
很明显,管道没有暂停,它只是继续直到完成(即使 PAUSE
正像预期的那样得到 运行),并且由于 "UNPAUSE"
是也永远不会被写出,'end'
回调永远不会触发。将流从转换流切换到 pause/unpause 到 readStream 似乎也不起作用。我从这种行为中假设节点流以某种方式不尊重事件回调中的 pause/unpause。
可能还有另一种方法可以在不调用 pause/unpause 的情况下完成此操作;如果有某种方式喜欢等待流结束并暂停当前执行线程,那将有效地完成我正在尝试做的事情。
如果我没有正确理解问题,这里有一个使用 Dust.js 的简单 Node 应用程序可以解决问题。
Dust 是一个模板引擎,但它最好的特性之一是它对 Node Streams 的原生理解。本例使用 Dust 2.7.0.
我正在使用 node-byline
替代您的转换流,但它做同样的事情——逐行读取流。
var fs = require('fs'),
byline = require('byline'),
dust = require('dustjs-linkedin');
var stream = byline(fs.createReadStream('./test.txt', { encoding: 'utf8' }));
var template = dust.loadSource(dust.compile('{#byline}--> {.|s}{~n}{match}{/byline}'));
dust.stream(template, {
byline: stream,
match: function(chunk, context) {
var currentLine = context.current();
if(currentLine.match(/line\.match/g)) {
return fs.createReadStream('./test.txt', 'utf8');
}
return chunk;
}
}).pipe(process.stdout);
这是我程序的输出:
$ node index.js
--> fs = require 'fs'
--> TestTransform = require './test-transform'
--> inStream = new TestTransform
--> fs.createReadStream("./test.coffee").pipe(inStream)
--> inStream.on 'line', (line) ->
--> process.stdout.write "-->"
--> if line.match /line\.match/g
fs = require 'fs'
TestTransform = require './test-transform'
inStream = new TestTransform
fs.createReadStream("./test.coffee").pipe(inStream)
inStream.on 'line', (line) ->
process.stdout.write "-->"
if line.match /line\.match/g
process.stdout.write line
console.error "PAUSE"
inStream.pause()
fs.createReadStream("./test.coffee").pipe(process.stdout).on 'end', ->
console.error "UNPAUSE"
inStream.resume()
else
process.stdout.write line
--> process.stdout.write line
--> console.error "PAUSE"
--> inStream.pause()
--> fs.createReadStream("./test.coffee").pipe(process.stdout).on 'end', ->
--> console.error "UNPAUSE"
--> inStream.resume()
--> else
--> process.stdout.write line
如您所见,它正确地交错了输出。如果我可以进一步详细说明 Dust 部分的工作原理,请告诉我。
编辑:这是对 Dust 模板的具体解释。
{#byline} {! look for the context variable named `byline` !}
{! okay, it's a stream. For each `data` event, output this stuff once !}
-->
{.|s} {! output the current `data`. Use |s to turn off HTML escaping !}
{~n} {! a newline !}
{match} {! look up the variable called `match` !}
{! okay, it's a function. Run it and insert the result !}
{! if the result is a stream, stream it in. !}
{/byline} {! done looping !}
我实际上也找到了一个单独的答案;不那么漂亮,但也能用。
本质上,pause()
仅暂停管道流的输出(在 "flowing" 模式下);因为我正在收听 'line'
事件,所以它没有流动,所以 pause
当然什么也没做。所以第一个解决方案是使用 removeListener
而不是 pause
,这确实有效地停止了流式传输。该文件现在看起来像:
fs = require 'fs'
TestTransform = require './test-transform'
inStream = new TestTransform
fs.createReadStream("./test.coffee").pipe(inStream)
c = (line) ->
process.stdout.write "-->"
if line.match /line\.match/g
process.stdout.write line
console.error "PAUSE"
inStream.removeListener 'line', c
f = fs.createReadStream("./test.coffee")
f.on 'end', ->
console.error "UNPAUSE"
inStream.on 'line', c
f.pipe(process.stdout)
else
process.stdout.write line
inStream.on 'line', c
这会产生 几乎 有效的输出:
-->fs = require 'fs'
-->TestTransform = require './test-transform'
-->inStream = new TestTransform
-->fs.createReadStream("./test.coffee").pipe(inStream)
-->c = (line) ->
--> process.stdout.write "-->"
--> if line.match /line\.match/g
PAUSE
fs = require 'fs'
TestTransform = require './test-transform'
inStream = new TestTransform
fs.createReadStream("./test.coffee").pipe(inStream)
c = (line) ->
process.stdout.write "-->"
if line.match /line\.match/g
process.stdout.write line
console.error "PAUSE"
inStream.removeListener 'line', c
f = fs.createReadStream("./test.coffee")
f.on 'end', ->
console.error "UNPAUSE"
inStream.on 'line', c
f.pipe(process.stdout)
else
process.stdout.write line
inStream.on 'line', c
UNPAUSE
但是,当我删除侦听器时,原来的可读流似乎刚刚停止;这具有某种扭曲的意义(我猜节点垃圾在所有侦听器已被删除时收集其可读流)。所以我找到的最终工作解决方案依赖于管道。由于我上面显示的 Transform 流也将其输出逐行推送到任何 'data'
侦听器,因此 pause()
可以在这里有效地用于其原始 objective,而不只是杀死流。最终输出:
fs = require 'fs'
TestTransform = require './test-transform'
inStream = new TestTransform
fs.createReadStream("./test.coffee").pipe(inStream)
inStream.on 'data', (chunk) ->
line = chunk.toString()
process.stdout.write "-->#{line}"
if line.match /line\.match/g
inStream.pause()
f = fs.createReadStream("./test.coffee")
f.on 'end', ->
inStream.resume()
f.pipe(process.stdout)
输出:
-->fs = require 'fs'
-->TestTransform = require './test-transform'
-->inStream = new TestTransform
-->fs.createReadStream("./test.coffee").pipe(inStream)
-->inStream.on 'data', (chunk) ->
--> line = chunk.toString()
--> process.stdout.write "-->#{line}"
--> if line.match /line\.match/g
fs = require 'fs'
TestTransform = require './test-transform'
inStream = new TestTransform
fs.createReadStream("./test.coffee").pipe(inStream)
inStream.on 'data', (chunk) ->
line = chunk.toString()
process.stdout.write "-->#{line}"
if line.match /line\.match/g
inStream.pause()
f = fs.createReadStream("./test.coffee")
f.on 'end', ->
inStream.resume()
f.pipe(process.stdout)
--> inStream.pause()
--> f = fs.createReadStream("./test.coffee")
--> f.on 'end', ->
--> inStream.resume()
--> f.pipe(process.stdout)
-->
这是预期的结果。