使用 Rx 和 SelectMany 限制并发请求
Limiting concurrent requests using Rx and SelectMany
我有一个要使用 HttpClient
同时下载的页面的 URL 列表。 URL 列表可以很大(100 个或更多!)
我目前有这个代码:
var urls = new List<string>
{
@"http:\www.amazon.com",
@"http:\www.bing.com",
@"http:\www.facebook.com",
@"http:\www.twitter.com",
@"http:\www.google.com"
};
var client = new HttpClient();
var contents = urls
.ToObservable()
.SelectMany(uri => client.GetStringAsync(new Uri(uri, UriKind.Absolute)));
contents.Subscribe(Console.WriteLine);
问题:由于SelectMany
的使用,几乎同时创建了一大堆任务。似乎如果 URL 列表足够大,很多任务都会超时(我得到 "A Task was cancelled" 异常)。
所以,我认为应该有一种方法,也许使用某种调度程序,来限制并发任务的数量,在给定时间不允许超过 5 或 6 个。
这样我就可以获得并发下载,而无需启动太多可能会停滞的任务,就像他们现在所做的那样。
如何做到这一点,这样我才不会被大量超时的任务搞得筋疲力尽?
这里有一个例子,说明如何使用 DataFlow API:
private static Task DoIt()
{
var urls = new List<string>
{
@"http:\www.amazon.com",
@"http:\www.bing.com",
@"http:\www.facebook.com",
@"http:\www.twitter.com",
@"http:\www.google.com"
};
var client = new HttpClient();
//Create a block that takes a URL as input
//and produces the download result as output
TransformBlock<string,string> downloadBlock =
new TransformBlock<string, string>(
uri => client.GetStringAsync(new Uri(uri, UriKind.Absolute)),
new ExecutionDataflowBlockOptions
{
//At most 2 download operation execute at the same time
MaxDegreeOfParallelism = 2
});
//Create a block that prints out the result
ActionBlock<string> doneBlock =
new ActionBlock<string>(x => Console.WriteLine(x));
//Link the output of the first block to the input of the second one
downloadBlock.LinkTo(
doneBlock,
new DataflowLinkOptions { PropagateCompletion = true});
//input the urls into the first block
foreach (var url in urls)
{
downloadBlock.Post(url);
}
downloadBlock.Complete(); //Mark completion of input
//Allows consumer to wait for the whole operation to complete
return doneBlock.Completion;
}
static void Main(string[] args)
{
DoIt().Wait();
Console.WriteLine("Done");
Console.ReadLine();
}
你能看看这是否有帮助吗?
var urls = new List<string>
{
@"http:\www.amazon.com",
@"http:\www.bing.com",
@"http:\www.google.com",
@"http:\www.twitter.com",
@"http:\www.google.com"
};
var contents =
urls
.ToObservable()
.SelectMany(uri =>
Observable
.Using(
() => new System.Net.Http.HttpClient(),
client =>
client
.GetStringAsync(new Uri(uri, UriKind.Absolute))
.ToObservable()));
记住 SelectMany()
实际上是 Select().Merge()
。虽然 SelectMany
没有 maxConcurrent
参数,但 Merge()
有。所以你可以使用那个。
根据您的示例,您可以这样做:
var urls = new List<string>
{
@"http:\www.amazon.com",
@"http:\www.bing.com",
@"http:\www.facebook.com",
@"http:\www.twitter.com",
@"http:\www.google.com"
};
var client = new HttpClient();
var contents = urls
.ToObservable()
.Select(uri => Observable.FromAsync(() => client.GetStringAsync(uri)))
.Merge(2); // 2 maximum concurrent requests!
contents.Subscribe(Console.WriteLine);
我有一个要使用 HttpClient
同时下载的页面的 URL 列表。 URL 列表可以很大(100 个或更多!)
我目前有这个代码:
var urls = new List<string>
{
@"http:\www.amazon.com",
@"http:\www.bing.com",
@"http:\www.facebook.com",
@"http:\www.twitter.com",
@"http:\www.google.com"
};
var client = new HttpClient();
var contents = urls
.ToObservable()
.SelectMany(uri => client.GetStringAsync(new Uri(uri, UriKind.Absolute)));
contents.Subscribe(Console.WriteLine);
问题:由于SelectMany
的使用,几乎同时创建了一大堆任务。似乎如果 URL 列表足够大,很多任务都会超时(我得到 "A Task was cancelled" 异常)。
所以,我认为应该有一种方法,也许使用某种调度程序,来限制并发任务的数量,在给定时间不允许超过 5 或 6 个。
这样我就可以获得并发下载,而无需启动太多可能会停滞的任务,就像他们现在所做的那样。
如何做到这一点,这样我才不会被大量超时的任务搞得筋疲力尽?
这里有一个例子,说明如何使用 DataFlow API:
private static Task DoIt()
{
var urls = new List<string>
{
@"http:\www.amazon.com",
@"http:\www.bing.com",
@"http:\www.facebook.com",
@"http:\www.twitter.com",
@"http:\www.google.com"
};
var client = new HttpClient();
//Create a block that takes a URL as input
//and produces the download result as output
TransformBlock<string,string> downloadBlock =
new TransformBlock<string, string>(
uri => client.GetStringAsync(new Uri(uri, UriKind.Absolute)),
new ExecutionDataflowBlockOptions
{
//At most 2 download operation execute at the same time
MaxDegreeOfParallelism = 2
});
//Create a block that prints out the result
ActionBlock<string> doneBlock =
new ActionBlock<string>(x => Console.WriteLine(x));
//Link the output of the first block to the input of the second one
downloadBlock.LinkTo(
doneBlock,
new DataflowLinkOptions { PropagateCompletion = true});
//input the urls into the first block
foreach (var url in urls)
{
downloadBlock.Post(url);
}
downloadBlock.Complete(); //Mark completion of input
//Allows consumer to wait for the whole operation to complete
return doneBlock.Completion;
}
static void Main(string[] args)
{
DoIt().Wait();
Console.WriteLine("Done");
Console.ReadLine();
}
你能看看这是否有帮助吗?
var urls = new List<string>
{
@"http:\www.amazon.com",
@"http:\www.bing.com",
@"http:\www.google.com",
@"http:\www.twitter.com",
@"http:\www.google.com"
};
var contents =
urls
.ToObservable()
.SelectMany(uri =>
Observable
.Using(
() => new System.Net.Http.HttpClient(),
client =>
client
.GetStringAsync(new Uri(uri, UriKind.Absolute))
.ToObservable()));
记住 SelectMany()
实际上是 Select().Merge()
。虽然 SelectMany
没有 maxConcurrent
参数,但 Merge()
有。所以你可以使用那个。
根据您的示例,您可以这样做:
var urls = new List<string>
{
@"http:\www.amazon.com",
@"http:\www.bing.com",
@"http:\www.facebook.com",
@"http:\www.twitter.com",
@"http:\www.google.com"
};
var client = new HttpClient();
var contents = urls
.ToObservable()
.Select(uri => Observable.FromAsync(() => client.GetStringAsync(uri)))
.Merge(2); // 2 maximum concurrent requests!
contents.Subscribe(Console.WriteLine);