TPL 数据流使用旧数据而不是最新数据
TPL Dataflow uses old data instead of the newest
我正在使用 TPL 数据流,以便一次为每个符号执行一项任务。前两个 Operation taking...
消息是正确的,但接下来的消息使用的是旧数据。换句话说,它使用下面截图中绿色标记的旧数据,而不是最新数据(蓝色标记的数据)。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using Binance.Net;
using Binance.Net.Enums;
using Binance.Net.Interfaces;
using Binance.Net.Objects.Spot;
using CryptoExchange.Net.Authentication;
using CryptoExchange.Net.Logging;
namespace SubscribeToCandlesEventFixTest
{
public class BinanceSocketHandler
{
private readonly IBinanceClient _client;
private readonly IBinanceSocketClient _socketClient;
public BinanceSocketHandler()
{
_client = new BinanceClient(new BinanceClientOptions
{
ApiCredentials = new ApiCredentials("not required", "not required"),
AutoTimestamp = true,
AutoTimestampRecalculationInterval = TimeSpan.FromMinutes(30),
#if DEBUG
LogVerbosity = LogVerbosity.Debug
#endif
});
_socketClient = new BinanceSocketClient(new BinanceSocketClientOptions
{
ApiCredentials = new ApiCredentials("not required", "not required"),
AutoReconnect = true,
ReconnectInterval = TimeSpan.FromSeconds(15),
#if DEBUG
LogVerbosity = LogVerbosity.Debug
#endif
});
}
private Dictionary<string, ActionBlock<IBinanceStreamKlineData>> _ab = new();
public async Task StartAsync(CancellationToken cancellationToken)
{
var symbols = new[] { "TRXUSDT", "BTCUSDT" };
var interval = KlineInterval.OneMinute;
foreach (var symbol in symbols)
{
_ab[symbol] = new ActionBlock<IBinanceStreamKlineData>(
async data =>
{
Console.WriteLine($"Operation taking 10 seconds to execute... | Symbol: {data.Symbol} | Timestamp: {data.Data.OpenTime} | Price: {data.Data.Close}");
await Task.Delay(10000, cancellationToken).ConfigureAwait(false);
},
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 1
});
await _socketClient.Spot.SubscribeToKlineUpdatesAsync(symbol, interval,
async data =>
{
if (data.Data.Final)
{
Console.WriteLine(
$"[{DateTime.UtcNow}] [{data.Symbol}] New final candle | Timestamp: {data.Data.OpenTime} | Price: {data.Data.Close}");
}
else
{
Console.WriteLine(
$"[{DateTime.UtcNow}] [{data.Symbol}] Candle update | Timestamp: {data.Data.OpenTime} | Price: {data.Data.Close}");
// TODO: Use the most up-to-date value
await _ab[symbol].SendAsync(data, cancellationToken).ConfigureAwait(false);
}
}).ConfigureAwait(false);
}
}
public async Task StopAsync()
{
foreach (var symbol in _ab.Keys)
{
_ab[symbol].Complete();
await _ab[symbol].Completion.ConfigureAwait(false);
}
}
}
class Program
{
static async Task Main(string[] args)
{
var test = new BinanceSocketHandler();
await test.StartAsync(new CancellationToken()).ConfigureAwait(false);
Console.ReadLine();
}
}
}
TPL Dataflow 将按顺序处理所有项目;这就是它要做的。您可以尝试通过使用 BroadcastBlock
来执行最新的一种方法,但是由于该块链接到另一个块,您可能最终会得到一个正在处理的方法,一个等待处理的方法,以及第三个是实际被覆盖的那个。
如果您想要比这更严格(即,一个正在处理中,一个正在等待也被覆盖),那么我推荐 Channels。具体来说,使用 BoundedChannelFullMode.DropOldest
.
的有界通道
我正在使用 TPL 数据流,以便一次为每个符号执行一项任务。前两个 Operation taking...
消息是正确的,但接下来的消息使用的是旧数据。换句话说,它使用下面截图中绿色标记的旧数据,而不是最新数据(蓝色标记的数据)。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using Binance.Net;
using Binance.Net.Enums;
using Binance.Net.Interfaces;
using Binance.Net.Objects.Spot;
using CryptoExchange.Net.Authentication;
using CryptoExchange.Net.Logging;
namespace SubscribeToCandlesEventFixTest
{
public class BinanceSocketHandler
{
private readonly IBinanceClient _client;
private readonly IBinanceSocketClient _socketClient;
public BinanceSocketHandler()
{
_client = new BinanceClient(new BinanceClientOptions
{
ApiCredentials = new ApiCredentials("not required", "not required"),
AutoTimestamp = true,
AutoTimestampRecalculationInterval = TimeSpan.FromMinutes(30),
#if DEBUG
LogVerbosity = LogVerbosity.Debug
#endif
});
_socketClient = new BinanceSocketClient(new BinanceSocketClientOptions
{
ApiCredentials = new ApiCredentials("not required", "not required"),
AutoReconnect = true,
ReconnectInterval = TimeSpan.FromSeconds(15),
#if DEBUG
LogVerbosity = LogVerbosity.Debug
#endif
});
}
private Dictionary<string, ActionBlock<IBinanceStreamKlineData>> _ab = new();
public async Task StartAsync(CancellationToken cancellationToken)
{
var symbols = new[] { "TRXUSDT", "BTCUSDT" };
var interval = KlineInterval.OneMinute;
foreach (var symbol in symbols)
{
_ab[symbol] = new ActionBlock<IBinanceStreamKlineData>(
async data =>
{
Console.WriteLine($"Operation taking 10 seconds to execute... | Symbol: {data.Symbol} | Timestamp: {data.Data.OpenTime} | Price: {data.Data.Close}");
await Task.Delay(10000, cancellationToken).ConfigureAwait(false);
},
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 1
});
await _socketClient.Spot.SubscribeToKlineUpdatesAsync(symbol, interval,
async data =>
{
if (data.Data.Final)
{
Console.WriteLine(
$"[{DateTime.UtcNow}] [{data.Symbol}] New final candle | Timestamp: {data.Data.OpenTime} | Price: {data.Data.Close}");
}
else
{
Console.WriteLine(
$"[{DateTime.UtcNow}] [{data.Symbol}] Candle update | Timestamp: {data.Data.OpenTime} | Price: {data.Data.Close}");
// TODO: Use the most up-to-date value
await _ab[symbol].SendAsync(data, cancellationToken).ConfigureAwait(false);
}
}).ConfigureAwait(false);
}
}
public async Task StopAsync()
{
foreach (var symbol in _ab.Keys)
{
_ab[symbol].Complete();
await _ab[symbol].Completion.ConfigureAwait(false);
}
}
}
class Program
{
static async Task Main(string[] args)
{
var test = new BinanceSocketHandler();
await test.StartAsync(new CancellationToken()).ConfigureAwait(false);
Console.ReadLine();
}
}
}
TPL Dataflow 将按顺序处理所有项目;这就是它要做的。您可以尝试通过使用 BroadcastBlock
来执行最新的一种方法,但是由于该块链接到另一个块,您可能最终会得到一个正在处理的方法,一个等待处理的方法,以及第三个是实际被覆盖的那个。
如果您想要比这更严格(即,一个正在处理中,一个正在等待也被覆盖),那么我推荐 Channels。具体来说,使用 BoundedChannelFullMode.DropOldest
.