C# StreamReader.ReadLine 在流结束前返回 null
C# StreamReader.ReadLine returning null before end of stream
我正在使用 SSH.NET library to implement a file system watcher on a remote linux server using the inotifywait 命令。本质上它是一个包装器:
ssh myhost "inotifywait -m -e close_write --format '%:e %f' /dropzone"
该命令将打印出(到 STDOUT):
CLOSE_WRITE:CLOSE foo
CLOSE_WRITE:CLOSE bar
CLOSE_WRITE:CLOSE baz
这很简单,可以解析并转化为事件。无论如何,我的 C# 代码本质上是:
var privateKeyFile = new PrivateKeyFile(identity);
var client = new SshClient(hostname, username, privateKeyFile);
SshCommand command = null;
IAsyncResult result = null;
try
{
client.Connect();
command = client.CreateCommand("inotifywait -m -e close_write --format '%:e %f' " + dropZone);
result = command.BeginExecute();
Console.WriteLine("Watching for events");
var reader = new StreamReader(command.OutputStream);
string line = null;
while ((line = reader.ReadLine()) != null)
{
Console.WriteLine(line);
}
Console.WriteLine("Reached end of stream");
}
finally
{
if (client != null)
{
Console.WriteLine("Close the connection");
client.Dispose();
}
}
Console.WriteLine("Press enter to quit");
Console.ReadLine();
和运行它在写入单个文件后产生此输出:
Watching for events
CLOSE_WRITE:CLOSE baz
Reached end of stream
Close the connection
Press enter to quit
Watching for events
会立即出现并等待第一个文件被写入(阻塞等待正如我对 StreamReader
的预期)。然而,下一个 ReadLine
,而不是另一个阻塞等待,returns null(表示流结束),即使命令仍然很高兴 运行。我知道我可以这样改变循环:
while (!result.IsCompleted)
{
line = reader.ReadLine();
if (line != null)
{
Console.WriteLine(line);
}
}
这导致:
Watching for events
CLOSE_WRITE:CLOSE baz
CLOSE_WRITE:CLOSE bar
CLOSE_WRITE:CLOSE foo
...
根据需要,但它摆脱了阻塞等待新输入,这意味着循环不断旋转(显然不希望...)
你能解释一下这种行为吗?对另一种方法有什么建议吗?
----更新----
看起来该库正在迁移到 github 并进行更新。我已提交 this issue 以尝试解决此问题。
观察到的行为的原因是 PipeStream class。它像字节队列一样工作。当您从 PipeStream
读取字节时,实际上是将它们出列,因此流长度会减少。当您读取所有字节时,流长度变为 0。这意味着在您首先读取 "line" 之后(实际上可以是多行,只是数据的第一部分)- 流的长度为 0,因此有效结束。下次读取将只是 return 而不会阻塞,直到下一部分数据到达(如果有的话)。
不幸的是,这些流似乎并不适合您的情况 - 它们旨在执行命令、接收一个结果并完成。如果你想读取连续的数据流(比如你的情况或例如 "tail -f" 结果 - 你唯一的选择似乎是在读取之间回落到 Thread.Sleep
,至少在快速搜索之后我没有找到任何替代方案。
更新:还是经过一些反思你可以达到你想要的结果。 Undelying 通道具有 DataReceived 事件,您可以使用该事件在新数据可用时收到通知。下面的代码应该可以解决问题(注意这是一个草图,所以要小心):
static void Main(string[] args) {
var privateKeyFile = new PrivateKeyFile(@"somefile");
using (var client = new SshClient("somehost", "someuser", privateKeyFile)) {
client.Connect();
var command = client.CreateCommand("tail -f /tmp/test.txt");
var result = command.BeginExecute();
var channelField = command.GetType().GetField("_channel", BindingFlags.Instance | BindingFlags.NonPublic);
var channel = channelField.GetValue(command);
var receivedEvent = channel.GetType().GetEvent("DataReceived", BindingFlags.Instance | BindingFlags.Public);
Console.WriteLine("Watching for events");
using (var handler = new ReceivedHandler()) {
// add event handler here
receivedEvent.AddEventHandler(channel, Delegate.CreateDelegate(receivedEvent.EventHandlerType, handler, handler.GetType().GetMethod("OnReceive")));
while (true) {
// wait on both command completion and our custom wait handle. This is blocking call
WaitHandle.WaitAny(new[] {result.AsyncWaitHandle, handler.Signal});
// if done - break
if (result.IsCompleted)
break;
var line = handler.ReadLine();
Console.WriteLine(line);
}
}
Console.WriteLine("Reached end of stream");
Console.ReadKey();
}
}
public class ReceivedHandler : IDisposable {
private readonly AutoResetEvent _signal;
private readonly StringBuilder _buffer = new StringBuilder();
public ReceivedHandler() {
_signal = new AutoResetEvent(false);
}
public void OnReceive(object sender, EventArgs e) {
var dataProp = e.GetType().GetProperty("Data", BindingFlags.Instance | BindingFlags.Public);
var rawData = (byte[])dataProp.GetValue(e);
var data = Encoding.ASCII.GetString(rawData);
lock (_buffer) {
// append to buffer for reader to consume
_buffer.Append(data);
}
// notify reader
Signal.Set();
}
public AutoResetEvent Signal => _signal;
public string ReadLine() {
lock (_buffer) {
// cleanup buffer
var result = _buffer.ToString();
_buffer.Clear();
return result;
}
}
public void Dispose() {
_signal.Dispose();
}
}
当然,如果您联系该库的开发人员并解释问题总是更好,也许他们能够添加缺失的行为。
@Evk 的回答是正确的,PipeStream
是罪魁祸首。 PipeStream
的另一个问题是,如果您尝试读取超过可用字节数,它将 阻塞 。出于性能原因,阻塞应该是 PipeStream
的 消费者 的工作。我用SSH.NET执行SshCommand
,异步读取标准output/error。我解决这些问题的方法是写信给中介 MemoryStream
,然后使用标准机制,如 StreamReader
。这是从 PipeStream
:
中读取的更笼统的答案
public class SshCommandStreamReader : IDisposable
{
private readonly Stream stream;
private readonly MemoryStream intermediateStream;
private readonly StreamReader reader;
public SshCommandOutputReader(Stream stream)
{
this.stream = stream;
this.intermediateStream = new MemoryStream();
this.reader = new StreamReader(intermediateStream, Encoding.UTF8);
}
private int FlushToIntermediateStream()
{
var length = stream.Length;
if (length == 0)
{
return 0;
}
// IMPORTANT: Do NOT read with a count higher than the stream length (which is typical of reading
// from streams). The streams for SshCommand are implemented by PipeStream (an internal class to
// SSH.NET). Reading more than the current length causes it to *block* until data is available.
// If the stream is flushed when reading, it does not block. It is not reliable to flush and then
// read because there is a possible race condition where a write might occur between flushing and
// reading (writing resets the flag that it was flushed). The only reliable solution to prevent
// blocking when reading is to always read the current length rather than an arbitrary buffer size.
var intermediateOutputBuffer = new byte[length];
var bytesRead = stream.Read(intermediateOutputBuffer, 0, intermediateOutputBuffer.Length);
intermediateStream.Write(intermediateOutputBuffer, 0, bytesRead);
return bytesRead;
}
public string Read()
{
var bytesFlushed = FlushToIntermediateStream();
// Allow reading the newly flushed bytes.
intermediateStream.Position -= bytesFlushed;
// Minor optimization since this may be called in a tight loop.
if (intermediateStream.Position == intermediateStream.Length)
{
return null;
}
else
{
var result = reader.ReadToEnd();
return result;
}
}
public void Dispose()
{
reader.Dispose();
intermediateStream.Dispose();
}
}
然后使用它:
using (var command = client.CreateCommand("your command text"))
{
var cmdAsyncResult = command.BeginExecute();
using (var standardOutputReader = new SshCommandStreamReader(command.OutputStream))
{
while (!cmdAsyncResult.IsCompleted)
{
var result = standardOutputReader.Read();
if (!String.IsNullOrEmpty(result))
{
Console.Write(result);
}
// Or what ever mechanism you'd like to use to prevent CPU thrashing.
Thread.Sleep(1);
}
// This must be done *after* the loop and *before* EndExecute() so that any extra output
// is captured (e.g. the loop never ran because the command was so fast).
var resultFinal = standardOutputReader.Read();
if (!String.IsNullOrEmpty(resultFinal))
{
Console.Write(resultFinal);
}
}
command.EndExecute(cmdAsyncResult);
}
您应该能够修改此示例以从标准错误中读取(通过 ExtendedOutputStream
),并将其更改为逐行读取特定于您的应用程序。
我正在使用 SSH.NET library to implement a file system watcher on a remote linux server using the inotifywait 命令。本质上它是一个包装器:
ssh myhost "inotifywait -m -e close_write --format '%:e %f' /dropzone"
该命令将打印出(到 STDOUT):
CLOSE_WRITE:CLOSE foo
CLOSE_WRITE:CLOSE bar
CLOSE_WRITE:CLOSE baz
这很简单,可以解析并转化为事件。无论如何,我的 C# 代码本质上是:
var privateKeyFile = new PrivateKeyFile(identity);
var client = new SshClient(hostname, username, privateKeyFile);
SshCommand command = null;
IAsyncResult result = null;
try
{
client.Connect();
command = client.CreateCommand("inotifywait -m -e close_write --format '%:e %f' " + dropZone);
result = command.BeginExecute();
Console.WriteLine("Watching for events");
var reader = new StreamReader(command.OutputStream);
string line = null;
while ((line = reader.ReadLine()) != null)
{
Console.WriteLine(line);
}
Console.WriteLine("Reached end of stream");
}
finally
{
if (client != null)
{
Console.WriteLine("Close the connection");
client.Dispose();
}
}
Console.WriteLine("Press enter to quit");
Console.ReadLine();
和运行它在写入单个文件后产生此输出:
Watching for events
CLOSE_WRITE:CLOSE baz
Reached end of stream
Close the connection
Press enter to quit
Watching for events
会立即出现并等待第一个文件被写入(阻塞等待正如我对 StreamReader
的预期)。然而,下一个 ReadLine
,而不是另一个阻塞等待,returns null(表示流结束),即使命令仍然很高兴 运行。我知道我可以这样改变循环:
while (!result.IsCompleted)
{
line = reader.ReadLine();
if (line != null)
{
Console.WriteLine(line);
}
}
这导致:
Watching for events
CLOSE_WRITE:CLOSE baz
CLOSE_WRITE:CLOSE bar
CLOSE_WRITE:CLOSE foo
...
根据需要,但它摆脱了阻塞等待新输入,这意味着循环不断旋转(显然不希望...)
你能解释一下这种行为吗?对另一种方法有什么建议吗?
----更新----
看起来该库正在迁移到 github 并进行更新。我已提交 this issue 以尝试解决此问题。
观察到的行为的原因是 PipeStream class。它像字节队列一样工作。当您从 PipeStream
读取字节时,实际上是将它们出列,因此流长度会减少。当您读取所有字节时,流长度变为 0。这意味着在您首先读取 "line" 之后(实际上可以是多行,只是数据的第一部分)- 流的长度为 0,因此有效结束。下次读取将只是 return 而不会阻塞,直到下一部分数据到达(如果有的话)。
不幸的是,这些流似乎并不适合您的情况 - 它们旨在执行命令、接收一个结果并完成。如果你想读取连续的数据流(比如你的情况或例如 "tail -f" 结果 - 你唯一的选择似乎是在读取之间回落到 Thread.Sleep
,至少在快速搜索之后我没有找到任何替代方案。
更新:还是经过一些反思你可以达到你想要的结果。 Undelying 通道具有 DataReceived 事件,您可以使用该事件在新数据可用时收到通知。下面的代码应该可以解决问题(注意这是一个草图,所以要小心):
static void Main(string[] args) {
var privateKeyFile = new PrivateKeyFile(@"somefile");
using (var client = new SshClient("somehost", "someuser", privateKeyFile)) {
client.Connect();
var command = client.CreateCommand("tail -f /tmp/test.txt");
var result = command.BeginExecute();
var channelField = command.GetType().GetField("_channel", BindingFlags.Instance | BindingFlags.NonPublic);
var channel = channelField.GetValue(command);
var receivedEvent = channel.GetType().GetEvent("DataReceived", BindingFlags.Instance | BindingFlags.Public);
Console.WriteLine("Watching for events");
using (var handler = new ReceivedHandler()) {
// add event handler here
receivedEvent.AddEventHandler(channel, Delegate.CreateDelegate(receivedEvent.EventHandlerType, handler, handler.GetType().GetMethod("OnReceive")));
while (true) {
// wait on both command completion and our custom wait handle. This is blocking call
WaitHandle.WaitAny(new[] {result.AsyncWaitHandle, handler.Signal});
// if done - break
if (result.IsCompleted)
break;
var line = handler.ReadLine();
Console.WriteLine(line);
}
}
Console.WriteLine("Reached end of stream");
Console.ReadKey();
}
}
public class ReceivedHandler : IDisposable {
private readonly AutoResetEvent _signal;
private readonly StringBuilder _buffer = new StringBuilder();
public ReceivedHandler() {
_signal = new AutoResetEvent(false);
}
public void OnReceive(object sender, EventArgs e) {
var dataProp = e.GetType().GetProperty("Data", BindingFlags.Instance | BindingFlags.Public);
var rawData = (byte[])dataProp.GetValue(e);
var data = Encoding.ASCII.GetString(rawData);
lock (_buffer) {
// append to buffer for reader to consume
_buffer.Append(data);
}
// notify reader
Signal.Set();
}
public AutoResetEvent Signal => _signal;
public string ReadLine() {
lock (_buffer) {
// cleanup buffer
var result = _buffer.ToString();
_buffer.Clear();
return result;
}
}
public void Dispose() {
_signal.Dispose();
}
}
当然,如果您联系该库的开发人员并解释问题总是更好,也许他们能够添加缺失的行为。
@Evk 的回答是正确的,PipeStream
是罪魁祸首。 PipeStream
的另一个问题是,如果您尝试读取超过可用字节数,它将 阻塞 。出于性能原因,阻塞应该是 PipeStream
的 消费者 的工作。我用SSH.NET执行SshCommand
,异步读取标准output/error。我解决这些问题的方法是写信给中介 MemoryStream
,然后使用标准机制,如 StreamReader
。这是从 PipeStream
:
public class SshCommandStreamReader : IDisposable
{
private readonly Stream stream;
private readonly MemoryStream intermediateStream;
private readonly StreamReader reader;
public SshCommandOutputReader(Stream stream)
{
this.stream = stream;
this.intermediateStream = new MemoryStream();
this.reader = new StreamReader(intermediateStream, Encoding.UTF8);
}
private int FlushToIntermediateStream()
{
var length = stream.Length;
if (length == 0)
{
return 0;
}
// IMPORTANT: Do NOT read with a count higher than the stream length (which is typical of reading
// from streams). The streams for SshCommand are implemented by PipeStream (an internal class to
// SSH.NET). Reading more than the current length causes it to *block* until data is available.
// If the stream is flushed when reading, it does not block. It is not reliable to flush and then
// read because there is a possible race condition where a write might occur between flushing and
// reading (writing resets the flag that it was flushed). The only reliable solution to prevent
// blocking when reading is to always read the current length rather than an arbitrary buffer size.
var intermediateOutputBuffer = new byte[length];
var bytesRead = stream.Read(intermediateOutputBuffer, 0, intermediateOutputBuffer.Length);
intermediateStream.Write(intermediateOutputBuffer, 0, bytesRead);
return bytesRead;
}
public string Read()
{
var bytesFlushed = FlushToIntermediateStream();
// Allow reading the newly flushed bytes.
intermediateStream.Position -= bytesFlushed;
// Minor optimization since this may be called in a tight loop.
if (intermediateStream.Position == intermediateStream.Length)
{
return null;
}
else
{
var result = reader.ReadToEnd();
return result;
}
}
public void Dispose()
{
reader.Dispose();
intermediateStream.Dispose();
}
}
然后使用它:
using (var command = client.CreateCommand("your command text"))
{
var cmdAsyncResult = command.BeginExecute();
using (var standardOutputReader = new SshCommandStreamReader(command.OutputStream))
{
while (!cmdAsyncResult.IsCompleted)
{
var result = standardOutputReader.Read();
if (!String.IsNullOrEmpty(result))
{
Console.Write(result);
}
// Or what ever mechanism you'd like to use to prevent CPU thrashing.
Thread.Sleep(1);
}
// This must be done *after* the loop and *before* EndExecute() so that any extra output
// is captured (e.g. the loop never ran because the command was so fast).
var resultFinal = standardOutputReader.Read();
if (!String.IsNullOrEmpty(resultFinal))
{
Console.Write(resultFinal);
}
}
command.EndExecute(cmdAsyncResult);
}
您应该能够修改此示例以从标准错误中读取(通过 ExtendedOutputStream
),并将其更改为逐行读取特定于您的应用程序。