使用 System.Reactive 的扩展对代码进行多线程处理
Multithreading a code using System.Reactive's extensions
下面的代码从 Binance 下载从开始日期到结束日期的历史 OHLCV 数据。由于 Binance 一次只允许我们下载 1000 支蜡烛,所以我按原样做了 DownloadAsync
。对代码的任何建议,也很感激。
实际问题是关于使 DownloadAsync
多线程,以加快进程,因为想象一下以 5 米的间隔下载从 2018 年到 2021 年的蜡烛图。我更喜欢使用 System.Reactive
,但我想其他解决方案也很受欢迎,因为很难将代码表示为多线程版本。
下面的代码可以测试。
using Binance.Net;
using Binance.Net.Enums;
using Binance.Net.Interfaces;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading.Tasks;
using System.Linq;
using System.Text.RegularExpressions;
using System.Reactive.Linq;
using System.Threading;
namespace DownloadCandleDataTest
{
public class DataProvider
{
private Exchange _exchange;
public DataProvider(Exchange exchange)
{
_exchange = exchange;
}
public async Task<List<OHLCV>> DownloadAsync(string pair, KlineInterval interval, DateTime startDate, DateTime endDate, int startupCandleCount)
{
DateTime start = startDate;
DateTime end = endDate;
var tempStartDate = start;
var tempEndDate = end;
var tempList = new List<OHLCV>();
for (int i = 0; tempStartDate < tempEndDate; i++)
{
var candles = await _exchange.GetCandlesAsync(pair, interval, tempStartDate, tempEndDate, 100).ConfigureAwait(false);
if (candles.Count > 0)
{
// Remove the first candle when i > 0, to prevent duplicates
if (i > 0)
{
candles.RemoveAt(0);
}
var first = candles.First();
var last = candles.Last();
Console.WriteLine($"First: {first.Timestamp} | Last: {last.Timestamp}");
tempList.AddRange(candles);
tempStartDate = last.Timestamp;
}
}
// Find duplicates
var groups = tempList.GroupBy(g => g.Timestamp).Where(e => e.Skip(1).Any());
foreach (var group in groups)
{
Console.WriteLine(group.Key);
foreach (var ohclv in group)
{
Console.WriteLine("\t" + ohclv.Timestamp);
}
}
return null;
}
}
class Program
{
public static void StartBackgroundWork()
{
Console.WriteLine("Shows use of Start to start on a background thread:");
var o = Observable.Start(() =>
{
//This starts on a background thread.
Console.WriteLine("From background thread. Does not block main thread.");
Console.WriteLine("Calculating...");
Thread.Sleep(3000);
Console.WriteLine("Background work completed.");
}).Finally(() => Console.WriteLine("Main thread completed."));
Console.WriteLine("\r\n\t In Main Thread...\r\n");
o.Wait(); // Wait for completion of background operation.
}
static async Task Main(string[] args)
{
using var exchange = new Exchange();
var dataProvider = new DataProvider(exchange);
await dataProvider.DownloadAsync("TRXUSDT", KlineInterval.FiveMinutes, new DateTime(2019, 1, 1), new DateTime(2019, 1, 2), 100).ConfigureAwait(false);
Console.ReadLine();
}
}
public class OHLCV
{
public DateTime Timestamp { get; set; }
public decimal Open { get; set; }
public decimal High { get; set; }
public decimal Low { get; set; }
public decimal Close { get; set; }
public decimal Volume { get; set; }
}
public static class Extensions
{
public static OHLCV ToCandle(this IBinanceKline candle)
{
return new OHLCV
{
Timestamp = candle.OpenTime,
Open = candle.Open,
High = candle.High,
Low = candle.Low,
Close = candle.Close,
Volume = candle.BaseVolume,
};
}
}
public class Exchange : IDisposable
{
private readonly IBinanceClient _client;
public Exchange()
{
_client = new BinanceClient();
}
public async Task<List<OHLCV>> GetCandlesAsync(string pair, KlineInterval interval, DateTime? startTime = null, DateTime? endTime = null, int? limit = null)
{
var result = await _client.Spot.Market.GetKlinesAsync(pair, interval, startTime, endTime, limit).ConfigureAwait(false);
if (result.Success)
{
return result.Data?.Select(e => e.ToCandle()).ToList();
}
return null;
}
public void Dispose()
{
if (_client != null)
{
_client.Dispose();
}
}
}
}
并行发出更多 Web 请求的关键是创建许多任务并使用 Task.WhenAll()
等待所有任务,而不是在循环中等待每个任务。
如果您在循环中等待每一个,它们将按顺序处理(尽管在发出 Web 请求时 UI 线程不会被阻塞)。
你真的想多了。
由于您得到的是均匀间隔的蜡烛,并且知道每次调用 GetKlinesAsync
得到的蜡烛数,因此您可以计算所需的所有开始日期。
var pair = "TRXUSDT";
var interval = KlineInterval.FiveMinutes;
var startDate = new DateTime(2019, 1, 1);
var endDate = new DateTime(2019, 1, 2);
var gap = 5.0; // same as `interval` for purpose of computing start dates.
var limit = 100;
IObservable<DateTime> startDates =
Observable
.Generate(startDate, x => x <= endDate, x => x.AddMinutes(gap * limit), x => x);
现在很容易生成您的查询:
IObservable<OHLCV> query =
from s in startDates
from rs in
Observable
.Using(
() => new BinanceClient(),
bc => Observable.FromAsync(ct =>
bc.Spot.Market.GetKlinesAsync(pair, interval, s, endDate, limit, ct)))
where rs.Success
from c in rs.Data.Select(x => x.ToCandle())
select c;
由于这是并行查询,因此您的结果可能会乱序,因此您需要对查询执行 .ToArray()
才能处理所有最后产生的数据,而不是每根蜡烛进来时产生的数据。
IDisposable subscription =
query
.ToArray()
.Select(xs => xs.OrderBy(x => x.Timestamp).ToArray())
.Subscribe(cs =>
{
/* candles downloaded using multiple threads */
/* and sorted in `Timestamp` order */
});
这会使用多个线程按顺序生成所有蜡烛,没有任何重复。
如果您希望它作为 DownLoadAsync
方法,那么您可以这样做:
public async Task<List<OHLCV>> DownloadAsync(string pair, KlineInterval interval, double gap, DateTime startDate, DateTime endDate, int limit)
{
IObservable<DateTime> startDates =
Observable
.Generate(startDate, x => x <= endDate, x => x.AddMinutes(gap * limit), x => x);
IObservable<OHLCV> query =
from s in startDates
from rs in
Observable
.Using(
() => new BinanceClient(),
bc => Observable.FromAsync(ct =>
bc.Spot.Market.GetKlinesAsync(pair, interval, s, endDate, limit, ct)))
where rs.Success
from c in rs.Data.Select(x => x.ToCandle())
select c;
return await query.ToArray().Select(xs => xs.OrderBy(x => x.Timestamp).ToList());
}
请注意签名略有变化。
下面的代码从 Binance 下载从开始日期到结束日期的历史 OHLCV 数据。由于 Binance 一次只允许我们下载 1000 支蜡烛,所以我按原样做了 DownloadAsync
。对代码的任何建议,也很感激。
实际问题是关于使 DownloadAsync
多线程,以加快进程,因为想象一下以 5 米的间隔下载从 2018 年到 2021 年的蜡烛图。我更喜欢使用 System.Reactive
,但我想其他解决方案也很受欢迎,因为很难将代码表示为多线程版本。
下面的代码可以测试。
using Binance.Net;
using Binance.Net.Enums;
using Binance.Net.Interfaces;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading.Tasks;
using System.Linq;
using System.Text.RegularExpressions;
using System.Reactive.Linq;
using System.Threading;
namespace DownloadCandleDataTest
{
public class DataProvider
{
private Exchange _exchange;
public DataProvider(Exchange exchange)
{
_exchange = exchange;
}
public async Task<List<OHLCV>> DownloadAsync(string pair, KlineInterval interval, DateTime startDate, DateTime endDate, int startupCandleCount)
{
DateTime start = startDate;
DateTime end = endDate;
var tempStartDate = start;
var tempEndDate = end;
var tempList = new List<OHLCV>();
for (int i = 0; tempStartDate < tempEndDate; i++)
{
var candles = await _exchange.GetCandlesAsync(pair, interval, tempStartDate, tempEndDate, 100).ConfigureAwait(false);
if (candles.Count > 0)
{
// Remove the first candle when i > 0, to prevent duplicates
if (i > 0)
{
candles.RemoveAt(0);
}
var first = candles.First();
var last = candles.Last();
Console.WriteLine($"First: {first.Timestamp} | Last: {last.Timestamp}");
tempList.AddRange(candles);
tempStartDate = last.Timestamp;
}
}
// Find duplicates
var groups = tempList.GroupBy(g => g.Timestamp).Where(e => e.Skip(1).Any());
foreach (var group in groups)
{
Console.WriteLine(group.Key);
foreach (var ohclv in group)
{
Console.WriteLine("\t" + ohclv.Timestamp);
}
}
return null;
}
}
class Program
{
public static void StartBackgroundWork()
{
Console.WriteLine("Shows use of Start to start on a background thread:");
var o = Observable.Start(() =>
{
//This starts on a background thread.
Console.WriteLine("From background thread. Does not block main thread.");
Console.WriteLine("Calculating...");
Thread.Sleep(3000);
Console.WriteLine("Background work completed.");
}).Finally(() => Console.WriteLine("Main thread completed."));
Console.WriteLine("\r\n\t In Main Thread...\r\n");
o.Wait(); // Wait for completion of background operation.
}
static async Task Main(string[] args)
{
using var exchange = new Exchange();
var dataProvider = new DataProvider(exchange);
await dataProvider.DownloadAsync("TRXUSDT", KlineInterval.FiveMinutes, new DateTime(2019, 1, 1), new DateTime(2019, 1, 2), 100).ConfigureAwait(false);
Console.ReadLine();
}
}
public class OHLCV
{
public DateTime Timestamp { get; set; }
public decimal Open { get; set; }
public decimal High { get; set; }
public decimal Low { get; set; }
public decimal Close { get; set; }
public decimal Volume { get; set; }
}
public static class Extensions
{
public static OHLCV ToCandle(this IBinanceKline candle)
{
return new OHLCV
{
Timestamp = candle.OpenTime,
Open = candle.Open,
High = candle.High,
Low = candle.Low,
Close = candle.Close,
Volume = candle.BaseVolume,
};
}
}
public class Exchange : IDisposable
{
private readonly IBinanceClient _client;
public Exchange()
{
_client = new BinanceClient();
}
public async Task<List<OHLCV>> GetCandlesAsync(string pair, KlineInterval interval, DateTime? startTime = null, DateTime? endTime = null, int? limit = null)
{
var result = await _client.Spot.Market.GetKlinesAsync(pair, interval, startTime, endTime, limit).ConfigureAwait(false);
if (result.Success)
{
return result.Data?.Select(e => e.ToCandle()).ToList();
}
return null;
}
public void Dispose()
{
if (_client != null)
{
_client.Dispose();
}
}
}
}
并行发出更多 Web 请求的关键是创建许多任务并使用 Task.WhenAll()
等待所有任务,而不是在循环中等待每个任务。
如果您在循环中等待每一个,它们将按顺序处理(尽管在发出 Web 请求时 UI 线程不会被阻塞)。
你真的想多了。
由于您得到的是均匀间隔的蜡烛,并且知道每次调用 GetKlinesAsync
得到的蜡烛数,因此您可以计算所需的所有开始日期。
var pair = "TRXUSDT";
var interval = KlineInterval.FiveMinutes;
var startDate = new DateTime(2019, 1, 1);
var endDate = new DateTime(2019, 1, 2);
var gap = 5.0; // same as `interval` for purpose of computing start dates.
var limit = 100;
IObservable<DateTime> startDates =
Observable
.Generate(startDate, x => x <= endDate, x => x.AddMinutes(gap * limit), x => x);
现在很容易生成您的查询:
IObservable<OHLCV> query =
from s in startDates
from rs in
Observable
.Using(
() => new BinanceClient(),
bc => Observable.FromAsync(ct =>
bc.Spot.Market.GetKlinesAsync(pair, interval, s, endDate, limit, ct)))
where rs.Success
from c in rs.Data.Select(x => x.ToCandle())
select c;
由于这是并行查询,因此您的结果可能会乱序,因此您需要对查询执行 .ToArray()
才能处理所有最后产生的数据,而不是每根蜡烛进来时产生的数据。
IDisposable subscription =
query
.ToArray()
.Select(xs => xs.OrderBy(x => x.Timestamp).ToArray())
.Subscribe(cs =>
{
/* candles downloaded using multiple threads */
/* and sorted in `Timestamp` order */
});
这会使用多个线程按顺序生成所有蜡烛,没有任何重复。
如果您希望它作为 DownLoadAsync
方法,那么您可以这样做:
public async Task<List<OHLCV>> DownloadAsync(string pair, KlineInterval interval, double gap, DateTime startDate, DateTime endDate, int limit)
{
IObservable<DateTime> startDates =
Observable
.Generate(startDate, x => x <= endDate, x => x.AddMinutes(gap * limit), x => x);
IObservable<OHLCV> query =
from s in startDates
from rs in
Observable
.Using(
() => new BinanceClient(),
bc => Observable.FromAsync(ct =>
bc.Spot.Market.GetKlinesAsync(pair, interval, s, endDate, limit, ct)))
where rs.Success
from c in rs.Data.Select(x => x.ToCandle())
select c;
return await query.ToArray().Select(xs => xs.OrderBy(x => x.Timestamp).ToList());
}
请注意签名略有变化。