TPL DataFlow with Lazy Source / 数据流

TPL DataFlow with Lazy Source / stream of data

假设您有一个配置了并行度的 TransformBlock,并希望通过该块流式传输数据。输入数据应该只有在管道可以真正开始处理它时才创建。 (并且应该在它离开管道时被释放。)

我能做到吗?如果是的话怎么办?

基本上我想要一个用作迭代器的数据源。 像这样:

public IEnumerable<Guid> GetSourceData()
{
    //In reality -> this should also be an async task -> but yield return does not work in combination with async/await ...
    Func<ICollection<Guid>> GetNextBatch = () => Enumerable.Repeat(100).Select(x => Guid.NewGuid()).ToArray();

    while (true)
    {
        var batch = GetNextBatch();
        if (batch == null || !batch.Any()) break;
        foreach (var guid in batch)
            yield return guid;
    }
}

这将导致内存中有 +- 100 条记录。好的:如果您附加到此数据源的块会将它们保留在内存中一段时间​​,则更多,但您有机会仅获取数据的一个子集(/流)。


一些背景资料:

我打算将其与 azure cosmos db 结合使用,其中源可以是集合中的所有对象或更改提要。不用说,我不希望所有这些对象都存储在内存中。所以这行不通:

using System.Threading.Tasks.Dataflow;

public async Task ExampleTask()
{
    Func<Guid, object> TheActualAction = text => text.ToString();

    var config = new ExecutionDataflowBlockOptions
    {
        BoundedCapacity = 5,
        MaxDegreeOfParallelism = 15
    };
    var throtteler = new TransformBlock<Guid, object>(TheActualAction, config);
    var output = new BufferBlock<object>();
    throtteler.LinkTo(output);

    throtteler.Post(Guid.NewGuid());
    throtteler.Post(Guid.NewGuid());
    throtteler.Post(Guid.NewGuid());
    throtteler.Post(Guid.NewGuid());
    //...
    throtteler.Complete();

    await throtteler.Completion;
}

上面的例子不好,因为我添加了所有项目,但不知道它们是否真的被转换块'used'。另外,我真的不关心输出缓冲区。我知道我需要将它发送到某个地方以便我可以等待完成,但之后我就没有用到缓冲区了。所以它应该忘记它得到的一切......

您似乎希望以定义的并行度 (MaxDegreeOfParallelism = 15) 处理数据。对于这样一个简单的要求,TPL 数据流非常笨重。

有一个非常简单而强大的模式可以解决您的问题。这是一个并行异步 foreach 循环,如下所述:https://blogs.msdn.microsoft.com/pfxteam/2012/03/05/implementing-a-simple-foreachasync-part-2/

public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body) 
{ 
    return Task.WhenAll( 
        from partition in Partitioner.Create(source).GetPartitions(dop) 
        select Task.Run(async delegate { 
            using (partition) 
                while (partition.MoveNext()) 
                    await body(partition.Current); 
        })); 
}

然后你可以这样写:

var dataSource = ...; //some sequence
dataSource.ForEachAsync(15, async item => await ProcessItem(item));

很简单。

您可以使用 SemaphoreSlim 动态降低 DOP。信号量作为一个门,只让N个并发threads/tasks进来。N可以动态改变。

所以你会使用 ForEachAsync 作为基本的主力,然后在上面添加额外的限制和节流。

Post() 将 return false 如果目标已满且没有阻塞。虽然这个 可以 用于忙等待循环,但这是一种浪费。 SendAsync() 另一方面,如果目标已满,则会等待 :

public async Task ExampleTask()
{
    var config = new ExecutionDataflowBlockOptions
    {
        BoundedCapacity = 50,
        MaxDegreeOfParallelism = 15
    };
    var block= new ActionBlock<Guid, object>(TheActualAction, config);

    while(//some condition//)
    { 
        var data=await GetDataFromCosmosDB();
        await block.SendAsync(data);
        //Wait a bit if we want to use polling
        await Task.Delay(...);
    }

    block.Complete();
    await block.Completion;
}