如何在 F# 中尽快释放事件
how to release an event as fast as possible, in F#
我遇到一种情况,我正在从套接字连接中突发接收事件。可能会突然发生很多事件,然后有一段时间什么都没有(可能在 100 毫秒内,所以不会那么长)。
当数据到达时,它来自 RabbitMQ 回调,我需要尽快从该回调中 return。
我以前在回调中处理数据,这导致了缓冲问题,这变成了连接问题。
所以,我想尽可能快地将到达队列的数据和来自事件的 return 放入,然后让另一个线程获取数据,但也要通过事件。
由于这是一个需要在多个地方使用的机制,所以我为它做了一个Type,但是这段代码还没有在主系统中进行测试。集成需要一些工作,我想首先解决这些问题:
- .NET 中是否有任何现有机制可以实现这一点。
- 这段代码是否有意义(在某种程度上,这可能更多地询问代码审查而不是纯粹的问题,但上面的问题保证张贴在这里而不是代码审查网站)
代码及其测试在这里:
open System
open System.Collections.Concurrent
open System.Threading
type EventThreadDecoupling<'a>() =
// data queue
let queue = ConcurrentQueue<'a>()
// event called when an element is in the queue
let popEvent = Event<'a>()
// wait handle, triggered when data gets put in the queue
let eventHandle = new EventWaitHandle(false, EventResetMode.ManualReset)
// setup the thread that processes the queue
do
async {
while true do
// there is a 1s timeout just in case there is data
// that was added while I reset the eventHandle
eventHandle.WaitOne(TimeSpan.FromSeconds(1.)) |> ignore
let mutable dataRead = false
while not queue.IsEmpty do
match queue.TryDequeue() with
| true, v -> popEvent.Trigger(v)
dataRead <- true
| _, _ -> ()
if dataRead then
eventHandle.Reset() |> ignore
} |> Async.Start
// event called when data has arrived
member this.OnEvent =
popEvent.Publish
// push data to the queue
member this.Push(data: 'a) =
queue.Enqueue(data)
eventHandle.Set() |> ignore
[<EntryPoint>]
let main _ =
let r = Random()
let e = EventThreadDecoupling<DateTime>()
e.OnEvent.Add(fun d ->
printfn "%A: received %A" DateTime.Now d
)
while true do
Thread.Sleep(r.Next(200))
e. Push(DateTime.Now)
0
F# 有 MailboxProcessor
。听起来这很适合这个问题。
https://fsharpforfunandprofit.com/posts/concurrency-actor-model/
基本上您需要创建一个使用 F# MailboxProcessor 的代理。在您的 RabbitMQ 回调中,除了将收到的消息(数据)转发(发布)到代理之外,您什么都不做。 MailboxProcessor 将为您进行排队,因此您无需在这里重新发明轮子。
代码为:
type Agent<'Msg> (processMsg) =
let inbox = MailboxProcessor<'Msg>.Start <| fun inbox ->
let rec loop () = async {
let! msg = inbox.Receive ()
processMsg msg
return! loop ()
}
loop ()
member this.Post msg =
inbox.Post msg
[<EntryPoint>]
let main argv =
let evt = Event<_> ()
let agent = Agent<DateTime> (fun msg ->
printfn "Processing msg: %A" msg
Thread.Sleep 3000
printfn "Processed msg: %A" msg
)
evt.Publish.Subscribe (fun msg ->
printfn "Received: %A" msg
// forward msg to the agent:
agent.Post msg)
|> ignore
let rec loop () = async {
do! Async.Sleep 500
evt.Trigger DateTime.Now
return! loop ()
}
let cts = new CancellationTokenSource ()
Async.Start (loop (), cts.Token)
Console.ReadKey true |> ignore
cts.Cancel ()
0
如果您 运行 此代码,您将看到文本 "Received: xxx"
每 500 毫秒定期打印一次,无论您处理代理中收到的消息多长时间。
我遇到一种情况,我正在从套接字连接中突发接收事件。可能会突然发生很多事件,然后有一段时间什么都没有(可能在 100 毫秒内,所以不会那么长)。
当数据到达时,它来自 RabbitMQ 回调,我需要尽快从该回调中 return。 我以前在回调中处理数据,这导致了缓冲问题,这变成了连接问题。
所以,我想尽可能快地将到达队列的数据和来自事件的 return 放入,然后让另一个线程获取数据,但也要通过事件。
由于这是一个需要在多个地方使用的机制,所以我为它做了一个Type,但是这段代码还没有在主系统中进行测试。集成需要一些工作,我想首先解决这些问题:
- .NET 中是否有任何现有机制可以实现这一点。
- 这段代码是否有意义(在某种程度上,这可能更多地询问代码审查而不是纯粹的问题,但上面的问题保证张贴在这里而不是代码审查网站)
代码及其测试在这里:
open System
open System.Collections.Concurrent
open System.Threading
type EventThreadDecoupling<'a>() =
// data queue
let queue = ConcurrentQueue<'a>()
// event called when an element is in the queue
let popEvent = Event<'a>()
// wait handle, triggered when data gets put in the queue
let eventHandle = new EventWaitHandle(false, EventResetMode.ManualReset)
// setup the thread that processes the queue
do
async {
while true do
// there is a 1s timeout just in case there is data
// that was added while I reset the eventHandle
eventHandle.WaitOne(TimeSpan.FromSeconds(1.)) |> ignore
let mutable dataRead = false
while not queue.IsEmpty do
match queue.TryDequeue() with
| true, v -> popEvent.Trigger(v)
dataRead <- true
| _, _ -> ()
if dataRead then
eventHandle.Reset() |> ignore
} |> Async.Start
// event called when data has arrived
member this.OnEvent =
popEvent.Publish
// push data to the queue
member this.Push(data: 'a) =
queue.Enqueue(data)
eventHandle.Set() |> ignore
[<EntryPoint>]
let main _ =
let r = Random()
let e = EventThreadDecoupling<DateTime>()
e.OnEvent.Add(fun d ->
printfn "%A: received %A" DateTime.Now d
)
while true do
Thread.Sleep(r.Next(200))
e. Push(DateTime.Now)
0
F# 有 MailboxProcessor
。听起来这很适合这个问题。
https://fsharpforfunandprofit.com/posts/concurrency-actor-model/
基本上您需要创建一个使用 F# MailboxProcessor 的代理。在您的 RabbitMQ 回调中,除了将收到的消息(数据)转发(发布)到代理之外,您什么都不做。 MailboxProcessor 将为您进行排队,因此您无需在这里重新发明轮子。
代码为:
type Agent<'Msg> (processMsg) =
let inbox = MailboxProcessor<'Msg>.Start <| fun inbox ->
let rec loop () = async {
let! msg = inbox.Receive ()
processMsg msg
return! loop ()
}
loop ()
member this.Post msg =
inbox.Post msg
[<EntryPoint>]
let main argv =
let evt = Event<_> ()
let agent = Agent<DateTime> (fun msg ->
printfn "Processing msg: %A" msg
Thread.Sleep 3000
printfn "Processed msg: %A" msg
)
evt.Publish.Subscribe (fun msg ->
printfn "Received: %A" msg
// forward msg to the agent:
agent.Post msg)
|> ignore
let rec loop () = async {
do! Async.Sleep 500
evt.Trigger DateTime.Now
return! loop ()
}
let cts = new CancellationTokenSource ()
Async.Start (loop (), cts.Token)
Console.ReadKey true |> ignore
cts.Cancel ()
0
如果您 运行 此代码,您将看到文本 "Received: xxx"
每 500 毫秒定期打印一次,无论您处理代理中收到的消息多长时间。