TPL DataFlow with Lazy Source / 数据流
TPL DataFlow with Lazy Source / stream of data
假设您有一个配置了并行度的 TransformBlock,并希望通过该块流式传输数据。输入数据应该只有在管道可以真正开始处理它时才创建。 (并且应该在它离开管道时被释放。)
我能做到吗?如果是的话怎么办?
基本上我想要一个用作迭代器的数据源。
像这样:
public IEnumerable<Guid> GetSourceData()
{
//In reality -> this should also be an async task -> but yield return does not work in combination with async/await ...
Func<ICollection<Guid>> GetNextBatch = () => Enumerable.Repeat(100).Select(x => Guid.NewGuid()).ToArray();
while (true)
{
var batch = GetNextBatch();
if (batch == null || !batch.Any()) break;
foreach (var guid in batch)
yield return guid;
}
}
这将导致内存中有 +- 100 条记录。好的:如果您附加到此数据源的块会将它们保留在内存中一段时间,则更多,但您有机会仅获取数据的一个子集(/流)。
一些背景资料:
我打算将其与 azure cosmos db 结合使用,其中源可以是集合中的所有对象或更改提要。不用说,我不希望所有这些对象都存储在内存中。所以这行不通:
using System.Threading.Tasks.Dataflow;
public async Task ExampleTask()
{
Func<Guid, object> TheActualAction = text => text.ToString();
var config = new ExecutionDataflowBlockOptions
{
BoundedCapacity = 5,
MaxDegreeOfParallelism = 15
};
var throtteler = new TransformBlock<Guid, object>(TheActualAction, config);
var output = new BufferBlock<object>();
throtteler.LinkTo(output);
throtteler.Post(Guid.NewGuid());
throtteler.Post(Guid.NewGuid());
throtteler.Post(Guid.NewGuid());
throtteler.Post(Guid.NewGuid());
//...
throtteler.Complete();
await throtteler.Completion;
}
上面的例子不好,因为我添加了所有项目,但不知道它们是否真的被转换块'used'。另外,我真的不关心输出缓冲区。我知道我需要将它发送到某个地方以便我可以等待完成,但之后我就没有用到缓冲区了。所以它应该忘记它得到的一切......
您似乎希望以定义的并行度 (MaxDegreeOfParallelism = 15
) 处理数据。对于这样一个简单的要求,TPL 数据流非常笨重。
有一个非常简单而强大的模式可以解决您的问题。这是一个并行异步 foreach 循环,如下所述:https://blogs.msdn.microsoft.com/pfxteam/2012/03/05/implementing-a-simple-foreachasync-part-2/
public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
{
return Task.WhenAll(
from partition in Partitioner.Create(source).GetPartitions(dop)
select Task.Run(async delegate {
using (partition)
while (partition.MoveNext())
await body(partition.Current);
}));
}
然后你可以这样写:
var dataSource = ...; //some sequence
dataSource.ForEachAsync(15, async item => await ProcessItem(item));
很简单。
您可以使用 SemaphoreSlim
动态降低 DOP。信号量作为一个门,只让N个并发threads/tasks进来。N可以动态改变。
所以你会使用 ForEachAsync
作为基本的主力,然后在上面添加额外的限制和节流。
Post()
将 return false
如果目标已满且没有阻塞。虽然这个 可以 用于忙等待循环,但这是一种浪费。 SendAsync()
另一方面,如果目标已满,则会等待 :
public async Task ExampleTask()
{
var config = new ExecutionDataflowBlockOptions
{
BoundedCapacity = 50,
MaxDegreeOfParallelism = 15
};
var block= new ActionBlock<Guid, object>(TheActualAction, config);
while(//some condition//)
{
var data=await GetDataFromCosmosDB();
await block.SendAsync(data);
//Wait a bit if we want to use polling
await Task.Delay(...);
}
block.Complete();
await block.Completion;
}
假设您有一个配置了并行度的 TransformBlock,并希望通过该块流式传输数据。输入数据应该只有在管道可以真正开始处理它时才创建。 (并且应该在它离开管道时被释放。)
我能做到吗?如果是的话怎么办?
基本上我想要一个用作迭代器的数据源。 像这样:
public IEnumerable<Guid> GetSourceData()
{
//In reality -> this should also be an async task -> but yield return does not work in combination with async/await ...
Func<ICollection<Guid>> GetNextBatch = () => Enumerable.Repeat(100).Select(x => Guid.NewGuid()).ToArray();
while (true)
{
var batch = GetNextBatch();
if (batch == null || !batch.Any()) break;
foreach (var guid in batch)
yield return guid;
}
}
这将导致内存中有 +- 100 条记录。好的:如果您附加到此数据源的块会将它们保留在内存中一段时间,则更多,但您有机会仅获取数据的一个子集(/流)。
一些背景资料:
我打算将其与 azure cosmos db 结合使用,其中源可以是集合中的所有对象或更改提要。不用说,我不希望所有这些对象都存储在内存中。所以这行不通:
using System.Threading.Tasks.Dataflow;
public async Task ExampleTask()
{
Func<Guid, object> TheActualAction = text => text.ToString();
var config = new ExecutionDataflowBlockOptions
{
BoundedCapacity = 5,
MaxDegreeOfParallelism = 15
};
var throtteler = new TransformBlock<Guid, object>(TheActualAction, config);
var output = new BufferBlock<object>();
throtteler.LinkTo(output);
throtteler.Post(Guid.NewGuid());
throtteler.Post(Guid.NewGuid());
throtteler.Post(Guid.NewGuid());
throtteler.Post(Guid.NewGuid());
//...
throtteler.Complete();
await throtteler.Completion;
}
上面的例子不好,因为我添加了所有项目,但不知道它们是否真的被转换块'used'。另外,我真的不关心输出缓冲区。我知道我需要将它发送到某个地方以便我可以等待完成,但之后我就没有用到缓冲区了。所以它应该忘记它得到的一切......
您似乎希望以定义的并行度 (MaxDegreeOfParallelism = 15
) 处理数据。对于这样一个简单的要求,TPL 数据流非常笨重。
有一个非常简单而强大的模式可以解决您的问题。这是一个并行异步 foreach 循环,如下所述:https://blogs.msdn.microsoft.com/pfxteam/2012/03/05/implementing-a-simple-foreachasync-part-2/
public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
{
return Task.WhenAll(
from partition in Partitioner.Create(source).GetPartitions(dop)
select Task.Run(async delegate {
using (partition)
while (partition.MoveNext())
await body(partition.Current);
}));
}
然后你可以这样写:
var dataSource = ...; //some sequence
dataSource.ForEachAsync(15, async item => await ProcessItem(item));
很简单。
您可以使用 SemaphoreSlim
动态降低 DOP。信号量作为一个门,只让N个并发threads/tasks进来。N可以动态改变。
所以你会使用 ForEachAsync
作为基本的主力,然后在上面添加额外的限制和节流。
Post()
将 return false
如果目标已满且没有阻塞。虽然这个 可以 用于忙等待循环,但这是一种浪费。 SendAsync()
另一方面,如果目标已满,则会等待 :
public async Task ExampleTask()
{
var config = new ExecutionDataflowBlockOptions
{
BoundedCapacity = 50,
MaxDegreeOfParallelism = 15
};
var block= new ActionBlock<Guid, object>(TheActualAction, config);
while(//some condition//)
{
var data=await GetDataFromCosmosDB();
await block.SendAsync(data);
//Wait a bit if we want to use polling
await Task.Delay(...);
}
block.Complete();
await block.Completion;
}