具有 2 个线程的简单队列只处理一半的项目
Simple Queue with 2 threads only processes half the items
我实现了一个简单的队列来存储文件名列表,以及一种读取队列、获取下一个可用文件名并将文件从一个文件夹移动到另一个文件夹的方法。
此 class 用于跟踪文件夹中的文件。
internal class FileItem
{
public string FullFileName { get; set; }
public bool isLocked { get; set; }
}
这是我的简单队列实现
internal class MyQueue
{
List<FileItem> FileList;
public MyQueue(string FilePath)
{
FileList = new List<FileItem>();
string[] files = Directory.GetFiles(FilePath);
foreach (string file in files)
{
FileItem fileitem = new FileItem
{
FullFileName = file,
isLocked = false
};
FileList.Add(fileitem);
}
}
public FileItem GetNextAvailableItem()
{
FileItem item = FileList.Where(i => i.isLocked == false).FirstOrDefault();
if (item != null) item.isLocked = true;
return item;
}
public void RemoveProcessedItem(FileItem item)
{
FileList.Remove(item);
}
}
当我从单线程运行这个时,它工作正常。
但我正在使用这样的两个线程。
static void ProcessFilesInMultiThread()
{
Task task1 = Task.Factory.StartNew(() => ReadFromQueueAndMoveFile("Thread 1"));
Task task2 = Task.Factory.StartNew(() => ReadFromQueueAndMoveFile("Thread 2"));
Task.WaitAll(task1, task2);
}
这是ReadFromQueueAndMoveFile
方法。
static void ReadFromQueueAndMoveFile(string ThreadName)
{
while (_queue.GetNextAvailableItem() != null)
{
//get next available item from queue.
FileItem item = _queue.GetNextAvailableItem();
if(item != null)
{
string FileName = Path.GetFileName(item.FullFileName);
string SourceFilePath = Path.Combine(sourcePath, FileName);
string DestinationFilePath = Path.Combine(destinationPath, FileName);
File.Move(SourceFilePath, DestinationFilePath);
Thread.Sleep(2000);
Console.WriteLine("Successfully moved: " + FileName + " Via " + ThreadName);
//remove item from queue.
_queue.RemoveProcessedItem(item);
}
}
}
问题是当我从 2 个线程 运行 时,总是只有一半的文件被移动,我不确定为什么。如果文件夹有 6 个文件,那么只有 3 个文件被随机移动。
为什么会这样?
为了防止线程之间相互影响,必须设置锁。为此,lock
语句很有帮助。参见:https://docs.microsoft.com/en-us/dotnet/csharp/language-reference/statements/lock
为此定义了一个锁对象:
private readonly object fileListLock = new object();
然后在GetNextAvailableItem()
方法中使用:
public FileItem GetNextAvailableItem()
{
lock (fileListLock)
{
FileItem item = FileList.Where(i => i.isLocked == false).FirstOrDefault();
if (item != null) item.isLocked = true;
return item;
}
}
并且在 RemoveProcessedItem()
方法中:
public void RemoveProcessedItem(FileItem item)
{
lock(fileListLock)
{
FileList.Remove(item);
}
}
我认为你的主要问题在于:
while (_queue.GetNextAvailableItem() != null)
{
//get next available item from queue.
FileItem item = _queue.GetNextAvailableItem();
您正在调用 GetNextAvailableItem
两次,第一次调用返回的值被丢弃。
解决这个问题的一种方法是:
while (true)
{
//get next available item from queue.
FileItem item = _queue.GetNextAvailableItem();
if (item == null) break;
当然你也应该确保 MyQueue
class 是 thread-safe,正如 Gabriel 在 .
中建议的那样
我能够使用 ConcurrentQueue<T>
重写我的队列这解决了我的问题。
代码在这里:
internal class MyQueue
{
ConcurrentQueue<FileItem> FileList = new ConcurrentQueue<FileItem>();
public MyQueue(string FilePath)
{
string[] files = Directory.GetFiles(FilePath);
foreach (string file in files)
{
FileItem fileitem = new FileItem
{
FullFileName = file,
isLocked = false
};
FileList.Enqueue(fileitem);
}
}
public FileItem GetNextAvailableItem()
{
//Deque and return object.
FileItem DequeItem = new FileItem();
bool isDequeSuccess = FileList.TryDequeue(out DequeItem);
if (isDequeSuccess) return DequeItem;
else return null;
}
public bool PeekIfAnyFilesLeftInQueue()
{
FileItem PeekItem = new FileItem();
bool isFileExists = FileList.TryPeek(out PeekItem);
return isFileExists;
}
}
我还必须更改从多线程方法调用这些方法。
static void ReadFromQueueAndMoveFile(string ThreadName)
{
do
{
//get next available item from queue.
FileItem item = _queue.GetNextAvailableItem();
if (item != null)
{
string FileName = Path.GetFileName(item.FullFileName);
string SourceFilePath = Path.Combine(sourcePath, FileName);
string DestinationFilePath = Path.Combine(destinationPath, FileName);
// Let's assume this is a long running process.
File.Move(SourceFilePath, DestinationFilePath);
Thread.Sleep(2000);
Console.WriteLine("Successfully moved: " + FileName + " Via " + ThreadName);
}
}
while (_queue.PeekIfAnyFilesLeftInQueue());
}
我实现了一个简单的队列来存储文件名列表,以及一种读取队列、获取下一个可用文件名并将文件从一个文件夹移动到另一个文件夹的方法。
此 class 用于跟踪文件夹中的文件。
internal class FileItem
{
public string FullFileName { get; set; }
public bool isLocked { get; set; }
}
这是我的简单队列实现
internal class MyQueue
{
List<FileItem> FileList;
public MyQueue(string FilePath)
{
FileList = new List<FileItem>();
string[] files = Directory.GetFiles(FilePath);
foreach (string file in files)
{
FileItem fileitem = new FileItem
{
FullFileName = file,
isLocked = false
};
FileList.Add(fileitem);
}
}
public FileItem GetNextAvailableItem()
{
FileItem item = FileList.Where(i => i.isLocked == false).FirstOrDefault();
if (item != null) item.isLocked = true;
return item;
}
public void RemoveProcessedItem(FileItem item)
{
FileList.Remove(item);
}
}
当我从单线程运行这个时,它工作正常。
但我正在使用这样的两个线程。
static void ProcessFilesInMultiThread()
{
Task task1 = Task.Factory.StartNew(() => ReadFromQueueAndMoveFile("Thread 1"));
Task task2 = Task.Factory.StartNew(() => ReadFromQueueAndMoveFile("Thread 2"));
Task.WaitAll(task1, task2);
}
这是ReadFromQueueAndMoveFile
方法。
static void ReadFromQueueAndMoveFile(string ThreadName)
{
while (_queue.GetNextAvailableItem() != null)
{
//get next available item from queue.
FileItem item = _queue.GetNextAvailableItem();
if(item != null)
{
string FileName = Path.GetFileName(item.FullFileName);
string SourceFilePath = Path.Combine(sourcePath, FileName);
string DestinationFilePath = Path.Combine(destinationPath, FileName);
File.Move(SourceFilePath, DestinationFilePath);
Thread.Sleep(2000);
Console.WriteLine("Successfully moved: " + FileName + " Via " + ThreadName);
//remove item from queue.
_queue.RemoveProcessedItem(item);
}
}
}
问题是当我从 2 个线程 运行 时,总是只有一半的文件被移动,我不确定为什么。如果文件夹有 6 个文件,那么只有 3 个文件被随机移动。
为什么会这样?
为了防止线程之间相互影响,必须设置锁。为此,lock
语句很有帮助。参见:https://docs.microsoft.com/en-us/dotnet/csharp/language-reference/statements/lock
为此定义了一个锁对象:
private readonly object fileListLock = new object();
然后在GetNextAvailableItem()
方法中使用:
public FileItem GetNextAvailableItem()
{
lock (fileListLock)
{
FileItem item = FileList.Where(i => i.isLocked == false).FirstOrDefault();
if (item != null) item.isLocked = true;
return item;
}
}
并且在 RemoveProcessedItem()
方法中:
public void RemoveProcessedItem(FileItem item)
{
lock(fileListLock)
{
FileList.Remove(item);
}
}
我认为你的主要问题在于:
while (_queue.GetNextAvailableItem() != null)
{
//get next available item from queue.
FileItem item = _queue.GetNextAvailableItem();
您正在调用 GetNextAvailableItem
两次,第一次调用返回的值被丢弃。
解决这个问题的一种方法是:
while (true)
{
//get next available item from queue.
FileItem item = _queue.GetNextAvailableItem();
if (item == null) break;
当然你也应该确保 MyQueue
class 是 thread-safe,正如 Gabriel 在
我能够使用 ConcurrentQueue<T>
重写我的队列这解决了我的问题。
代码在这里:
internal class MyQueue
{
ConcurrentQueue<FileItem> FileList = new ConcurrentQueue<FileItem>();
public MyQueue(string FilePath)
{
string[] files = Directory.GetFiles(FilePath);
foreach (string file in files)
{
FileItem fileitem = new FileItem
{
FullFileName = file,
isLocked = false
};
FileList.Enqueue(fileitem);
}
}
public FileItem GetNextAvailableItem()
{
//Deque and return object.
FileItem DequeItem = new FileItem();
bool isDequeSuccess = FileList.TryDequeue(out DequeItem);
if (isDequeSuccess) return DequeItem;
else return null;
}
public bool PeekIfAnyFilesLeftInQueue()
{
FileItem PeekItem = new FileItem();
bool isFileExists = FileList.TryPeek(out PeekItem);
return isFileExists;
}
}
我还必须更改从多线程方法调用这些方法。
static void ReadFromQueueAndMoveFile(string ThreadName)
{
do
{
//get next available item from queue.
FileItem item = _queue.GetNextAvailableItem();
if (item != null)
{
string FileName = Path.GetFileName(item.FullFileName);
string SourceFilePath = Path.Combine(sourcePath, FileName);
string DestinationFilePath = Path.Combine(destinationPath, FileName);
// Let's assume this is a long running process.
File.Move(SourceFilePath, DestinationFilePath);
Thread.Sleep(2000);
Console.WriteLine("Successfully moved: " + FileName + " Via " + ThreadName);
}
}
while (_queue.PeekIfAnyFilesLeftInQueue());
}