Reactive.NET - 从 .Subscribe 返回通用对象
Reactive.NET - returning generic object out of .Subscribe
有一个组合的网络套接字流wss://stream.binance.com:9443/stream?streams=bnbusdt@ticker/dogeusdt@depth5,我需要以下输出:
public IObservable<WebSocketPriceTicker24Hr> Tickers => ...;
public IObservable<WebSocketDepth> Depth => ...;
日志
Connection opened
Message: {"stream":"dogeusdt@depth5","data":{"lastUpdateId":3740272226,"bids":[["0.20140000","21189.00000000"],["0.20130000","275878.00000000"],["0.20120000","290900.00000000"],["0.20110000","313592.00000000"],["0.20100000","367368.00000000"]],"asks":[["0.20150000","109090.00000000"],["0.20160000","404515.00000000"],["0.20170000","649409.00000000"],["0.20180000","360650.00000000"],["0.20190000","185381.00000000"]]}}
Message: {"stream":"bnbusdt@ticker","data":{"e":"24hrTicker","E":1638097890123,"s":"BNBUSDT","p":"-2.50000000","P":"-0.416","w":"598.07225116","x":"601.40000000","c":"599.00000000","Q":"0.45200000","b":"599.00000000","B":"122.06600000","a":"599.10000000","A":"0.54000000","o":"601.50000000","h":"621.30000000","l":"572.40000000","v":"1286613.77200000","q":"769487994.99120000","O":1638011490067,"C":1638097890067,"F":471394573,"L":472263211,"n":868639}}
Message: {"stream":"dogeusdt@depth5","data":{"lastUpdateId":3740272244,"bids":[["0.20140000","21189.00000000"],["0.20130000","273472.00000000"],["0.20120000","262491.00000000"],["0.20110000","350795.00000000"],["0.20100000","362174.00000000"]],"asks":[["0.20150000","129653.00000000"],["0.20160000","411961.00000000"],["0.20170000","634098.00000000"],["0.20180000","360650.00000000"],["0.20190000","194995.00000000"]]}}
Message: {"stream":"bnbusdt@ticker","data":{"e":"24hrTicker","E":1638097891059,"s":"BNBUSDT","p":"-2.50000000","P":"-0.416","w":"598.07224947","x":"601.40000000","c":"599.00000000","Q":"0.28800000","b":"599.00000000","B":"116.83300000","a":"599.10000000","A":"35.25500000","o":"601.50000000","h":"621.30000000","l":"572.40000000","v":"1286614.33800000","q":"769488331.33030000","O":1638011491059,"C":1638097891059,"F":471394579,"L":472263222,"n":868644}}
Message: {"stream":"dogeusdt@depth5","data":{"lastUpdateId":3740272263,"bids":[["0.20140000","84255.00000000"],["0.20130000","263544.00000000"],["0.20120000","290699.00000000"],["0.20110000","322587.00000000"],["0.20100000","362174.00000000"]],"asks":[["0.20150000","128586.00000000"],["0.20160000","422245.00000000"],["0.20170000","629711.00000000"],["0.20180000","365383.00000000"],["0.20190000","194995.00000000"]]}}
简要说明
流正在返回消息,如前面的日志所示。我需要将结果反序列化为 WebSocketResponse<T>
,但事情就是这样。我需要以某种方式拆分消息,或者我不知道,但我排除的结果是以下属性:IObservable<WebSocketPriceTicker24Hr>
和 IObservable<WebSocketDepth>
.
public IObservable<string> Messages => Observable
.FromEventPattern<MessageReceivedEventArgs>(h => _webSocket.MessageReceived += h,
h => _webSocket.MessageReceived -= h)
.Select(e => e.EventArgs.Message);
...
public IObservable<WebSocketPriceTicker24Hr> Tickers => ...;
public IObservable<WebSocketDepth> Depth => ...;
_eventSubscription = _webSocket.Messages
.Select(m => // string
{
Console.WriteLine($"Message: {m}");
// TODO: What here?
//JsonSerializer.Deserialize<WebSocketResponse<WebSocketPriceTicker24Hr>>(m) ?? throw new ArgumentException(m, nameof(m));
//JsonSerializer.Deserialize<WebSocketResponse<WebSocketDepth>>(m) ?? throw new ArgumentException(m, nameof(m));
return m;
})
.Subscribe((result) => { // IObservable<string>
// TODO: What here?
});
...
// Models
public class WebSocketResponse<T>
{
public string? Stream { get; set; }
public T? Data { get; set; }
}
public class WebSocketPriceTicker24Hr
{
[JsonPropertyName("e")] public string? EventType { get; set; }
[JsonPropertyName("E")] public long EventTime { get; set; }
[JsonPropertyName("s")] public string? Symbol { get; set; }
[JsonPropertyName("p")] public decimal PriceChange { get; set; }
[JsonPropertyName("P")] public decimal PriceChangePercent { get; set; }
[JsonPropertyName("w")] public decimal WeightedAveragePrice { get; set; }
[JsonPropertyName("x")] public decimal PreviousClosePrice { get; set; }
[JsonPropertyName("c")] public decimal LastPrice { get; set; }
[JsonPropertyName("Q")] public decimal LastQuantity { get; set; }
[JsonPropertyName("b")] public decimal BestBidPrice { get; set; }
[JsonPropertyName("B")] public decimal BestBidQuantity { get; set; }
[JsonPropertyName("a")] public decimal BestAskPrice { get; set; }
[JsonPropertyName("A")] public decimal BestAskQuantity { get; set; }
[JsonPropertyName("o")] public decimal OpenPrice { get; set; }
[JsonPropertyName("h")] public decimal HighPrice { get; set; }
[JsonPropertyName("l")] public decimal LowPrice { get; set; }
[JsonPropertyName("v")] public decimal TotalTradedBaseVolume { get; set; }
[JsonPropertyName("q")] public decimal TotalTradedQuoteVolume { get; set; }
[JsonPropertyName("O")] public long OpenTime { get; set; }
[JsonPropertyName("C")] public long CloseTime { get; set; }
[JsonPropertyName("F")] public long FirstTradeId { get; set; }
[JsonPropertyName("L")] public long LastTradeId { get; set; }
[JsonPropertyName("n")] public long Count { get; set; }
}
public class WebSocketDepth
{
[JsonPropertyName("e")] public string? EventType { get; set; }
[JsonPropertyName("E")] public long EventTime { get; set; }
[JsonPropertyName("s")] public string? Symbol { get; set; }
[JsonPropertyName("U")] public long FirstUpdateId { get; set; }
[JsonPropertyName("u")] public long FinalUpdateId { get; set; }
[JsonPropertyName("b")]
public IEnumerable<IEnumerable<string>> Bids { get; set; } = Array.Empty<IEnumerable<string>>();
[JsonPropertyName("a")]
public IEnumerable<IEnumerable<string>> Asks { get; set; } = Array.Empty<IEnumerable<string>>();
}
您可以构建一个 IObservable<object>
类型的可观察对象,其中包含 WebSocketPriceTicker24Hr
和 WebSocketDepth
对象。之后,您使用 OfType<T>()
构建特定类型的可观察对象。
IObservable<object> afterDeserialize = source.Select<string, object>(it => {
var ticker = JsonSerializer.Deserialize<WebSocketResponse<WebSocketPriceTicker24Hr>>(it);
var depth = JsonSerializer.Deserialize<WebSocketResponse<WebSocketDepth>>(it);
if (ticker != null && ticker.stream == "bnbusdt@ticker") {
return ticker.data;
}
if (depth != null && depth.stream == "dogeusdt@depth5") {
return depth.data;
}
throw new InvalidOperationException("Could not deserialize the JSON to any object");
});
这是 deserialize/extract JSON 的“数据”部分的一种可能方式,但这些硬编码 stream
检查很难看。当格式不匹配时,JsonSerializer.Deserialize()
调用将不会 return null
。您必须将反序列化过程调整得更 generic/robust。但是对于概念验证,这将 return WebSocketPriceTicker24Hr
和 WebSocketDepth
个对象。
现在我们可以在这个 observable 上使用 OfType<T>()
。
IObservable<WebSocketDepth> onlyDepths = afterDeserialize
.OfType<WebSocketDepth>();
IObservable<WebSocketPriceTicker24Hr> onlyTicker = afterDeserialize
.OfType<WebSocketPriceTicker24Hr>();
从那里您可以订阅 onlyDepths
和 onlyTicker
observables。
有一个组合的网络套接字流wss://stream.binance.com:9443/stream?streams=bnbusdt@ticker/dogeusdt@depth5,我需要以下输出:
public IObservable<WebSocketPriceTicker24Hr> Tickers => ...;
public IObservable<WebSocketDepth> Depth => ...;
日志
Connection opened
Message: {"stream":"dogeusdt@depth5","data":{"lastUpdateId":3740272226,"bids":[["0.20140000","21189.00000000"],["0.20130000","275878.00000000"],["0.20120000","290900.00000000"],["0.20110000","313592.00000000"],["0.20100000","367368.00000000"]],"asks":[["0.20150000","109090.00000000"],["0.20160000","404515.00000000"],["0.20170000","649409.00000000"],["0.20180000","360650.00000000"],["0.20190000","185381.00000000"]]}}
Message: {"stream":"bnbusdt@ticker","data":{"e":"24hrTicker","E":1638097890123,"s":"BNBUSDT","p":"-2.50000000","P":"-0.416","w":"598.07225116","x":"601.40000000","c":"599.00000000","Q":"0.45200000","b":"599.00000000","B":"122.06600000","a":"599.10000000","A":"0.54000000","o":"601.50000000","h":"621.30000000","l":"572.40000000","v":"1286613.77200000","q":"769487994.99120000","O":1638011490067,"C":1638097890067,"F":471394573,"L":472263211,"n":868639}}
Message: {"stream":"dogeusdt@depth5","data":{"lastUpdateId":3740272244,"bids":[["0.20140000","21189.00000000"],["0.20130000","273472.00000000"],["0.20120000","262491.00000000"],["0.20110000","350795.00000000"],["0.20100000","362174.00000000"]],"asks":[["0.20150000","129653.00000000"],["0.20160000","411961.00000000"],["0.20170000","634098.00000000"],["0.20180000","360650.00000000"],["0.20190000","194995.00000000"]]}}
Message: {"stream":"bnbusdt@ticker","data":{"e":"24hrTicker","E":1638097891059,"s":"BNBUSDT","p":"-2.50000000","P":"-0.416","w":"598.07224947","x":"601.40000000","c":"599.00000000","Q":"0.28800000","b":"599.00000000","B":"116.83300000","a":"599.10000000","A":"35.25500000","o":"601.50000000","h":"621.30000000","l":"572.40000000","v":"1286614.33800000","q":"769488331.33030000","O":1638011491059,"C":1638097891059,"F":471394579,"L":472263222,"n":868644}}
Message: {"stream":"dogeusdt@depth5","data":{"lastUpdateId":3740272263,"bids":[["0.20140000","84255.00000000"],["0.20130000","263544.00000000"],["0.20120000","290699.00000000"],["0.20110000","322587.00000000"],["0.20100000","362174.00000000"]],"asks":[["0.20150000","128586.00000000"],["0.20160000","422245.00000000"],["0.20170000","629711.00000000"],["0.20180000","365383.00000000"],["0.20190000","194995.00000000"]]}}
简要说明
流正在返回消息,如前面的日志所示。我需要将结果反序列化为 WebSocketResponse<T>
,但事情就是这样。我需要以某种方式拆分消息,或者我不知道,但我排除的结果是以下属性:IObservable<WebSocketPriceTicker24Hr>
和 IObservable<WebSocketDepth>
.
public IObservable<string> Messages => Observable
.FromEventPattern<MessageReceivedEventArgs>(h => _webSocket.MessageReceived += h,
h => _webSocket.MessageReceived -= h)
.Select(e => e.EventArgs.Message);
...
public IObservable<WebSocketPriceTicker24Hr> Tickers => ...;
public IObservable<WebSocketDepth> Depth => ...;
_eventSubscription = _webSocket.Messages
.Select(m => // string
{
Console.WriteLine($"Message: {m}");
// TODO: What here?
//JsonSerializer.Deserialize<WebSocketResponse<WebSocketPriceTicker24Hr>>(m) ?? throw new ArgumentException(m, nameof(m));
//JsonSerializer.Deserialize<WebSocketResponse<WebSocketDepth>>(m) ?? throw new ArgumentException(m, nameof(m));
return m;
})
.Subscribe((result) => { // IObservable<string>
// TODO: What here?
});
...
// Models
public class WebSocketResponse<T>
{
public string? Stream { get; set; }
public T? Data { get; set; }
}
public class WebSocketPriceTicker24Hr
{
[JsonPropertyName("e")] public string? EventType { get; set; }
[JsonPropertyName("E")] public long EventTime { get; set; }
[JsonPropertyName("s")] public string? Symbol { get; set; }
[JsonPropertyName("p")] public decimal PriceChange { get; set; }
[JsonPropertyName("P")] public decimal PriceChangePercent { get; set; }
[JsonPropertyName("w")] public decimal WeightedAveragePrice { get; set; }
[JsonPropertyName("x")] public decimal PreviousClosePrice { get; set; }
[JsonPropertyName("c")] public decimal LastPrice { get; set; }
[JsonPropertyName("Q")] public decimal LastQuantity { get; set; }
[JsonPropertyName("b")] public decimal BestBidPrice { get; set; }
[JsonPropertyName("B")] public decimal BestBidQuantity { get; set; }
[JsonPropertyName("a")] public decimal BestAskPrice { get; set; }
[JsonPropertyName("A")] public decimal BestAskQuantity { get; set; }
[JsonPropertyName("o")] public decimal OpenPrice { get; set; }
[JsonPropertyName("h")] public decimal HighPrice { get; set; }
[JsonPropertyName("l")] public decimal LowPrice { get; set; }
[JsonPropertyName("v")] public decimal TotalTradedBaseVolume { get; set; }
[JsonPropertyName("q")] public decimal TotalTradedQuoteVolume { get; set; }
[JsonPropertyName("O")] public long OpenTime { get; set; }
[JsonPropertyName("C")] public long CloseTime { get; set; }
[JsonPropertyName("F")] public long FirstTradeId { get; set; }
[JsonPropertyName("L")] public long LastTradeId { get; set; }
[JsonPropertyName("n")] public long Count { get; set; }
}
public class WebSocketDepth
{
[JsonPropertyName("e")] public string? EventType { get; set; }
[JsonPropertyName("E")] public long EventTime { get; set; }
[JsonPropertyName("s")] public string? Symbol { get; set; }
[JsonPropertyName("U")] public long FirstUpdateId { get; set; }
[JsonPropertyName("u")] public long FinalUpdateId { get; set; }
[JsonPropertyName("b")]
public IEnumerable<IEnumerable<string>> Bids { get; set; } = Array.Empty<IEnumerable<string>>();
[JsonPropertyName("a")]
public IEnumerable<IEnumerable<string>> Asks { get; set; } = Array.Empty<IEnumerable<string>>();
}
您可以构建一个 IObservable<object>
类型的可观察对象,其中包含 WebSocketPriceTicker24Hr
和 WebSocketDepth
对象。之后,您使用 OfType<T>()
构建特定类型的可观察对象。
IObservable<object> afterDeserialize = source.Select<string, object>(it => {
var ticker = JsonSerializer.Deserialize<WebSocketResponse<WebSocketPriceTicker24Hr>>(it);
var depth = JsonSerializer.Deserialize<WebSocketResponse<WebSocketDepth>>(it);
if (ticker != null && ticker.stream == "bnbusdt@ticker") {
return ticker.data;
}
if (depth != null && depth.stream == "dogeusdt@depth5") {
return depth.data;
}
throw new InvalidOperationException("Could not deserialize the JSON to any object");
});
这是 deserialize/extract JSON 的“数据”部分的一种可能方式,但这些硬编码 stream
检查很难看。当格式不匹配时,JsonSerializer.Deserialize()
调用将不会 return null
。您必须将反序列化过程调整得更 generic/robust。但是对于概念验证,这将 return WebSocketPriceTicker24Hr
和 WebSocketDepth
个对象。
现在我们可以在这个 observable 上使用 OfType<T>()
。
IObservable<WebSocketDepth> onlyDepths = afterDeserialize
.OfType<WebSocketDepth>();
IObservable<WebSocketPriceTicker24Hr> onlyTicker = afterDeserialize
.OfType<WebSocketPriceTicker24Hr>();
从那里您可以订阅 onlyDepths
和 onlyTicker
observables。