Observable.Interval同时执行一个方法两次
Observable.Interval executes a method twice at the same time
ExecuteSellAsync 方法同时被调用两次,您可以在下面的日志中看到。当我在 Observable.Interval(TimeSpan.FromSeconds(15))
上放置 15 秒时它工作正常。我怎样才能防止这种情况发生?也许是锁定之类的东西,或者我不知道。
2021-02-12 19:04:09 [11] DEBUG LiveTradeManager Order ID: 263010769 | Pair: DOGEUSDT | Order side: Sell | Status: New | Price: 0.06783960 | Last filled price: 0.00000000 | Stop price: 0.00000000 | Quantity: 0.00000000 | Quote quantity: 0.00000000 | Commission: 0
2021-02-12 19:04:09 [11] DEBUG LiveTradeManager Order ID: 263010769 | Pair: DOGEUSDT | Order side: Sell | Status: Filled | Price: 0.06783960 | Last filled price: 0.06784260 | Stop price: 0.00000000 | Quantity: 5420.00000000 | Quote quantity: 367.70689200 | Commission: 0.00201210 BNB
2021-02-12 19:04:09 [11] DEBUG LiveTradeManager Sell order was filled | Close date: 2021/02/12 17:04:09 | Close rate (price): 0.06784260
2021-02-12 19:04:13 [9] INFO Wallets Wallets synced.
2021-02-12 19:04:14 [10] DEBUG LiveTradeManager Timer triggered. Price: 0.06783910 | Timestamp: 2/12/2021 5:03:00 PM | Close: 0.06790680
2021-02-12 19:04:17 [9] DEBUG BinanceSpotClient Limit sell order has failed | Error code: -2010 | Error message: Account has insufficient balance for requested action. | Pair: DOGEUSDT | Quantity: 0.00000000 | Price: 0.06782540
_throttlerObservable = Observable.Interval(TimeSpan.FromSeconds(5))
.SelectMany(_ => Observable.FromAsync(async () =>
{
var lastCandle = _candles.Last();
_logger.Debug($"Timer triggered. Price: {_ticker.LastPrice} | Open time: {lastCandle.Timestamp} | Close: {lastCandle.Close}");
if (_orderSide == OrderSide.Sell)
{
var trade = _trades.FirstOrDefault(e => e.Pair.Equals(_tradeOptions.Pair) && e.IsOpen);
if (trade.NotNull())
{
var shouldSell = _tradingStrategy.ShouldSell(trade, _ticker.LastPrice, _tradeAdvice);
if (shouldSell.SellFlag)
{
await ExecuteSellAsync(trade, lastCandle.Timestamp, shouldSell.SellType).ConfigureAwait(false);
}
}
}
}))
.Subscribe();
编辑
我明白是什么问题了。 _tradingStrategy.ShouldSell执行需要几秒,下一次执行同时开始执行下一次检查。我可以在那个逻辑中使用 lock
吗?
这就是解决问题的方法,但我需要锁定整个支票:
bool test = false;
public (bool SellFlag, SellType SellType) ShouldSell(Trade trade, decimal rate, TradeAdvice tradeAdvice, decimal? low = null, decimal? high = null)
{
if (!test)
{
test = true;
// my logic is here. It takes a few seconds.
test = false;
}
return (false, SellType.None);
}
编辑2
可测试的代码。 Observable.Interval
每秒执行一次,ShouldSellAsync 的逻辑需要 5 秒才能执行。一旦 _completed 变为 true
,则不再打印该消息。它执行消息 5 次,而我只希望它执行一次。
using System;
using System.Linq;
using System.Reactive.Linq;
using System.Threading.Tasks;
namespace RxNETDispose
{
class Program
{
private static bool _completed = false;
public static async Task ShouldSellAsync()
{
if (!_completed)
{
await Task.Delay(5000);
Console.WriteLine($"{DateTime.UtcNow} - ShouldSell called");
_completed = true;
}
}
static void Main(string[] args)
{
Observable.Interval(TimeSpan.FromSeconds(1))
.SelectMany(_ => Observable.FromAsync(async () =>
{
await ShouldSellAsync().ConfigureAwait(false);
}))
.Subscribe();
Console.ReadLine();
}
}
}
SelectMany
确实引入了并发。我们想控制这种并发性,所以这里的答案是滚动你自己的运算符,以确保对 ExecuteSellAsync
.
的调用之间有固定的间隔。
谢天谢地,有一种漂亮但不明显的方法可以使用 Rx 调度程序来做到这一点。
我们正在寻找的方法是这样的:
public static IDisposable ScheduleAsync(this IScheduler scheduler, TimeSpan dueTime, Func<IScheduler, CancellationToken, Task<IDisposable>> action)
要使用此调用,需要将 Func<IScheduler, CancellationToken, Task<IDisposable>> action
定义为递归的,以便在对 ExecuteSellAsync
的调用完成后调用自身重新安排。
因此,要每 2.0
秒执行一次,例如,我们这样做:
Func<IScheduler, CancellationToken, Task<IDisposable>> handler = null;
handler = async (s, ct) =>
{
await ExecuteSellAsync();
return s.ScheduleAsync(TimeSpan.FromSeconds(2.0), handler);
};
我们可以通过这样调用来启动它:
IDisposable subscription = Scheduler.Default.ScheduleAsync(TimeSpan.Zero, handler);
当然,就像所有好的 Rx 操作一样,我们可以调用 subscription.Dispose()
来停止它 运行ning.
这是一个完整的例子:
async Task Main()
{
Func<IScheduler, CancellationToken, Task<IDisposable>> handler = null;
handler = async (s, ct) =>
{
await ExecuteSellAsync();
return s.ScheduleAsync(TimeSpan.FromSeconds(2.0), handler);
};
IDisposable subscription = Scheduler.Default.ScheduleAsync(TimeSpan.Zero, handler);
await Task.Delay(TimeSpan.FromSeconds(9.0));
subscription.Dispose();
}
private DateTime then = DateTime.Now;
private int __counter = 0;
async Task ExecuteSellAsync()
{
var counter = Interlocked.Increment(ref __counter);
Console.WriteLine($"ExecuteSellAsync() Start {counter} - {DateTime.Now.Subtract(then).TotalSeconds}");
await Task.Delay(TimeSpan.FromSeconds(2.0));
Console.WriteLine($"ExecuteSellAsync() End {counter} - {DateTime.Now.Subtract(then).TotalSeconds}");
}
当我运行这个我得到这个输出:
ExecuteSellAsync() Start 1 - 0.0019952
ExecuteSellAsync() End 1 - 2.0095866
ExecuteSellAsync() Start 2 - 4.0185182
ExecuteSellAsync() End 2 - 6.0199157
ExecuteSellAsync() Start 3 - 8.0303588
ExecuteSellAsync() End 3 - 10.0417079
请注意,ExecuteSellAsync()
不会协同取消,因此 运行 会完成。不难改成async Task ExecuteSellAsync(CancellationToken ct)
,让它协同取消。
现在,可以对其进行扩展,使其成为一个不错的可观察对象。
试试这个:
IObservable<Unit> query =
Observable.Create<Unit>(o =>
{
Func<IScheduler, CancellationToken, Task<IDisposable>> handler = null;
handler = async (s, ct) =>
{
if (ct.IsCancellationRequested)
{
o.OnCompleted();
}
else
{
await ExecuteSellAsync();
o.OnNext(Unit.Default);
}
return s.ScheduleAsync(TimeSpan.FromSeconds(2.0), handler);
};
return Scheduler.Default.ScheduleAsync(TimeSpan.Zero, handler);
});
IDisposable subscription = query.Take(3).Subscribe(x => Console.WriteLine("U"), () => Console.WriteLine("C"));
await Task.Delay(TimeSpan.FromSeconds(11.0));
subscription.Dispose();
这有以下输出:
ExecuteSellAsync() Start 1 - 0.0009972
ExecuteSellAsync() End 1 - 2.0115375
U
ExecuteSellAsync() Start 2 - 4.0128375
ExecuteSellAsync() End 2 - 6.0282818
U
ExecuteSellAsync() Start 3 - 8.0370135
ExecuteSellAsync() End 3 - 10.0521106
U
C
注意它完成了。如果您在 subscription.Dispose();
自然完成之前调用它,那么它会正常运行并且不会发出 OnComplete
通知。
让我们用一组不错的扩展方法来包装它:
public static class ObservableEx
{
public static IObservable<Unit> IntervalAsync(TimeSpan period, Func<Task> actionAsync, IScheduler scheduler) =>
TimerAsync(period, period, actionAsync, scheduler);
public static IObservable<T> IntervalAsync<T>(TimeSpan period, Func<Task<T>> functionAsync, IScheduler scheduler) =>
TimerAsync(period, period, functionAsync, scheduler);
public static IObservable<Unit> TimerAsync(TimeSpan dueTime, TimeSpan period, Func<Task> actionAsync, IScheduler scheduler) =>
Observable.Create<Unit>(o =>
{
Func<IScheduler, CancellationToken, Task<IDisposable>> handler = null;
handler = async (s, ct) =>
{
if (ct.IsCancellationRequested)
{
o.OnCompleted();
}
else
{
await actionAsync();
o.OnNext(Unit.Default);
}
return s.ScheduleAsync(period, handler);
};
return scheduler.ScheduleAsync(dueTime, handler);
});
public static IObservable<T> TimerAsync<T>(TimeSpan dueTime, TimeSpan period, Func<Task<T>> functionAsync, IScheduler scheduler) =>
Observable.Create<T>(o =>
{
Func<IScheduler, CancellationToken, Task<IDisposable>> handler = null;
handler = async (s, ct) =>
{
if (ct.IsCancellationRequested)
{
o.OnCompleted();
}
else
{
o.OnNext(await functionAsync());
}
return s.ScheduleAsync(period, handler);
};
return scheduler.ScheduleAsync(dueTime, handler);
});
}
现在,显然有很多重载不是我写的——使用默认调度程序的重载和允许协作取消的重载——但我希望你明白了。
现在使用这些扩展方法我可以做到这一点:
IDisposable subscription =
ObservableEx
.IntervalAsync(TimeSpan.FromSeconds(2.0), () => ExecuteSellAsync(), Scheduler.Default)
.Take(3)
.Subscribe(x => Console.WriteLine("U"), () => Console.WriteLine("C"));
我得到了和以前一样的输出。
我还没有完全测试扩展方法。他们可能需要更多的爱和关注。
ExecuteSellAsync 方法同时被调用两次,您可以在下面的日志中看到。当我在 Observable.Interval(TimeSpan.FromSeconds(15))
上放置 15 秒时它工作正常。我怎样才能防止这种情况发生?也许是锁定之类的东西,或者我不知道。
2021-02-12 19:04:09 [11] DEBUG LiveTradeManager Order ID: 263010769 | Pair: DOGEUSDT | Order side: Sell | Status: New | Price: 0.06783960 | Last filled price: 0.00000000 | Stop price: 0.00000000 | Quantity: 0.00000000 | Quote quantity: 0.00000000 | Commission: 0
2021-02-12 19:04:09 [11] DEBUG LiveTradeManager Order ID: 263010769 | Pair: DOGEUSDT | Order side: Sell | Status: Filled | Price: 0.06783960 | Last filled price: 0.06784260 | Stop price: 0.00000000 | Quantity: 5420.00000000 | Quote quantity: 367.70689200 | Commission: 0.00201210 BNB
2021-02-12 19:04:09 [11] DEBUG LiveTradeManager Sell order was filled | Close date: 2021/02/12 17:04:09 | Close rate (price): 0.06784260
2021-02-12 19:04:13 [9] INFO Wallets Wallets synced.
2021-02-12 19:04:14 [10] DEBUG LiveTradeManager Timer triggered. Price: 0.06783910 | Timestamp: 2/12/2021 5:03:00 PM | Close: 0.06790680
2021-02-12 19:04:17 [9] DEBUG BinanceSpotClient Limit sell order has failed | Error code: -2010 | Error message: Account has insufficient balance for requested action. | Pair: DOGEUSDT | Quantity: 0.00000000 | Price: 0.06782540
_throttlerObservable = Observable.Interval(TimeSpan.FromSeconds(5))
.SelectMany(_ => Observable.FromAsync(async () =>
{
var lastCandle = _candles.Last();
_logger.Debug($"Timer triggered. Price: {_ticker.LastPrice} | Open time: {lastCandle.Timestamp} | Close: {lastCandle.Close}");
if (_orderSide == OrderSide.Sell)
{
var trade = _trades.FirstOrDefault(e => e.Pair.Equals(_tradeOptions.Pair) && e.IsOpen);
if (trade.NotNull())
{
var shouldSell = _tradingStrategy.ShouldSell(trade, _ticker.LastPrice, _tradeAdvice);
if (shouldSell.SellFlag)
{
await ExecuteSellAsync(trade, lastCandle.Timestamp, shouldSell.SellType).ConfigureAwait(false);
}
}
}
}))
.Subscribe();
编辑
我明白是什么问题了。 _tradingStrategy.ShouldSell执行需要几秒,下一次执行同时开始执行下一次检查。我可以在那个逻辑中使用 lock
吗?
这就是解决问题的方法,但我需要锁定整个支票:
bool test = false;
public (bool SellFlag, SellType SellType) ShouldSell(Trade trade, decimal rate, TradeAdvice tradeAdvice, decimal? low = null, decimal? high = null)
{
if (!test)
{
test = true;
// my logic is here. It takes a few seconds.
test = false;
}
return (false, SellType.None);
}
编辑2
可测试的代码。 Observable.Interval
每秒执行一次,ShouldSellAsync 的逻辑需要 5 秒才能执行。一旦 _completed 变为 true
,则不再打印该消息。它执行消息 5 次,而我只希望它执行一次。
using System;
using System.Linq;
using System.Reactive.Linq;
using System.Threading.Tasks;
namespace RxNETDispose
{
class Program
{
private static bool _completed = false;
public static async Task ShouldSellAsync()
{
if (!_completed)
{
await Task.Delay(5000);
Console.WriteLine($"{DateTime.UtcNow} - ShouldSell called");
_completed = true;
}
}
static void Main(string[] args)
{
Observable.Interval(TimeSpan.FromSeconds(1))
.SelectMany(_ => Observable.FromAsync(async () =>
{
await ShouldSellAsync().ConfigureAwait(false);
}))
.Subscribe();
Console.ReadLine();
}
}
}
SelectMany
确实引入了并发。我们想控制这种并发性,所以这里的答案是滚动你自己的运算符,以确保对 ExecuteSellAsync
.
谢天谢地,有一种漂亮但不明显的方法可以使用 Rx 调度程序来做到这一点。
我们正在寻找的方法是这样的:
public static IDisposable ScheduleAsync(this IScheduler scheduler, TimeSpan dueTime, Func<IScheduler, CancellationToken, Task<IDisposable>> action)
要使用此调用,需要将 Func<IScheduler, CancellationToken, Task<IDisposable>> action
定义为递归的,以便在对 ExecuteSellAsync
的调用完成后调用自身重新安排。
因此,要每 2.0
秒执行一次,例如,我们这样做:
Func<IScheduler, CancellationToken, Task<IDisposable>> handler = null;
handler = async (s, ct) =>
{
await ExecuteSellAsync();
return s.ScheduleAsync(TimeSpan.FromSeconds(2.0), handler);
};
我们可以通过这样调用来启动它:
IDisposable subscription = Scheduler.Default.ScheduleAsync(TimeSpan.Zero, handler);
当然,就像所有好的 Rx 操作一样,我们可以调用 subscription.Dispose()
来停止它 运行ning.
这是一个完整的例子:
async Task Main()
{
Func<IScheduler, CancellationToken, Task<IDisposable>> handler = null;
handler = async (s, ct) =>
{
await ExecuteSellAsync();
return s.ScheduleAsync(TimeSpan.FromSeconds(2.0), handler);
};
IDisposable subscription = Scheduler.Default.ScheduleAsync(TimeSpan.Zero, handler);
await Task.Delay(TimeSpan.FromSeconds(9.0));
subscription.Dispose();
}
private DateTime then = DateTime.Now;
private int __counter = 0;
async Task ExecuteSellAsync()
{
var counter = Interlocked.Increment(ref __counter);
Console.WriteLine($"ExecuteSellAsync() Start {counter} - {DateTime.Now.Subtract(then).TotalSeconds}");
await Task.Delay(TimeSpan.FromSeconds(2.0));
Console.WriteLine($"ExecuteSellAsync() End {counter} - {DateTime.Now.Subtract(then).TotalSeconds}");
}
当我运行这个我得到这个输出:
ExecuteSellAsync() Start 1 - 0.0019952
ExecuteSellAsync() End 1 - 2.0095866
ExecuteSellAsync() Start 2 - 4.0185182
ExecuteSellAsync() End 2 - 6.0199157
ExecuteSellAsync() Start 3 - 8.0303588
ExecuteSellAsync() End 3 - 10.0417079
请注意,ExecuteSellAsync()
不会协同取消,因此 运行 会完成。不难改成async Task ExecuteSellAsync(CancellationToken ct)
,让它协同取消。
现在,可以对其进行扩展,使其成为一个不错的可观察对象。
试试这个:
IObservable<Unit> query =
Observable.Create<Unit>(o =>
{
Func<IScheduler, CancellationToken, Task<IDisposable>> handler = null;
handler = async (s, ct) =>
{
if (ct.IsCancellationRequested)
{
o.OnCompleted();
}
else
{
await ExecuteSellAsync();
o.OnNext(Unit.Default);
}
return s.ScheduleAsync(TimeSpan.FromSeconds(2.0), handler);
};
return Scheduler.Default.ScheduleAsync(TimeSpan.Zero, handler);
});
IDisposable subscription = query.Take(3).Subscribe(x => Console.WriteLine("U"), () => Console.WriteLine("C"));
await Task.Delay(TimeSpan.FromSeconds(11.0));
subscription.Dispose();
这有以下输出:
ExecuteSellAsync() Start 1 - 0.0009972
ExecuteSellAsync() End 1 - 2.0115375
U
ExecuteSellAsync() Start 2 - 4.0128375
ExecuteSellAsync() End 2 - 6.0282818
U
ExecuteSellAsync() Start 3 - 8.0370135
ExecuteSellAsync() End 3 - 10.0521106
U
C
注意它完成了。如果您在 subscription.Dispose();
自然完成之前调用它,那么它会正常运行并且不会发出 OnComplete
通知。
让我们用一组不错的扩展方法来包装它:
public static class ObservableEx
{
public static IObservable<Unit> IntervalAsync(TimeSpan period, Func<Task> actionAsync, IScheduler scheduler) =>
TimerAsync(period, period, actionAsync, scheduler);
public static IObservable<T> IntervalAsync<T>(TimeSpan period, Func<Task<T>> functionAsync, IScheduler scheduler) =>
TimerAsync(period, period, functionAsync, scheduler);
public static IObservable<Unit> TimerAsync(TimeSpan dueTime, TimeSpan period, Func<Task> actionAsync, IScheduler scheduler) =>
Observable.Create<Unit>(o =>
{
Func<IScheduler, CancellationToken, Task<IDisposable>> handler = null;
handler = async (s, ct) =>
{
if (ct.IsCancellationRequested)
{
o.OnCompleted();
}
else
{
await actionAsync();
o.OnNext(Unit.Default);
}
return s.ScheduleAsync(period, handler);
};
return scheduler.ScheduleAsync(dueTime, handler);
});
public static IObservable<T> TimerAsync<T>(TimeSpan dueTime, TimeSpan period, Func<Task<T>> functionAsync, IScheduler scheduler) =>
Observable.Create<T>(o =>
{
Func<IScheduler, CancellationToken, Task<IDisposable>> handler = null;
handler = async (s, ct) =>
{
if (ct.IsCancellationRequested)
{
o.OnCompleted();
}
else
{
o.OnNext(await functionAsync());
}
return s.ScheduleAsync(period, handler);
};
return scheduler.ScheduleAsync(dueTime, handler);
});
}
现在,显然有很多重载不是我写的——使用默认调度程序的重载和允许协作取消的重载——但我希望你明白了。
现在使用这些扩展方法我可以做到这一点:
IDisposable subscription =
ObservableEx
.IntervalAsync(TimeSpan.FromSeconds(2.0), () => ExecuteSellAsync(), Scheduler.Default)
.Take(3)
.Subscribe(x => Console.WriteLine("U"), () => Console.WriteLine("C"));
我得到了和以前一样的输出。
我还没有完全测试扩展方法。他们可能需要更多的爱和关注。