如何在 F# 中尽快释放事件

how to release an event as fast as possible, in F#

我遇到一种情况,我正在从套接字连接中突发接收事件。可能会突然发生很多事件,然后有一段时间什么都没有(可能在 100 毫秒内,所以不会那么长)。

当数据到达时,它来自 RabbitMQ 回调,我需要尽快从该回调中 return。 我以前在回调中处理数据,这导致了缓冲问题,这变成了连接问题。

所以,我想尽可能快地将到达队列的数据和来自事件的 return 放入,然后让另一个线程获取数据,但也要通过事件。

由于这是一个需要在多个地方使用的机制,所以我为它做了一个Type,但是这段代码还没有在主系统中进行测试。集成需要一些工作,我想首先解决这些问题:

代码及其测试在这里:

open System
open System.Collections.Concurrent
open System.Threading


type EventThreadDecoupling<'a>() =

    // data queue
    let queue = ConcurrentQueue<'a>()

    // event called when an element is in the queue
    let popEvent = Event<'a>()

    // wait handle, triggered when data gets put in the queue
    let eventHandle = new EventWaitHandle(false, EventResetMode.ManualReset)

    // setup the thread that processes the queue
    do
        async {
            while true do
                // there is a 1s timeout just in case there is data
                // that was added while I reset the eventHandle
                eventHandle.WaitOne(TimeSpan.FromSeconds(1.)) |> ignore
                let mutable dataRead = false
                while not queue.IsEmpty do
                    match queue.TryDequeue() with
                    | true, v -> popEvent.Trigger(v)
                                 dataRead <- true
                    | _, _    -> ()

                if dataRead then
                    eventHandle.Reset() |> ignore
        } |> Async.Start

    // event called when data has arrived
    member this.OnEvent =
        popEvent.Publish

    // push data to the queue
    member this.Push(data: 'a) =
        queue.Enqueue(data)
        eventHandle.Set() |> ignore


[<EntryPoint>]
let main _ =

    let r = Random()

    let e = EventThreadDecoupling<DateTime>()

    e.OnEvent.Add(fun d ->
        printfn "%A: received %A" DateTime.Now d
    )

    while true do
        Thread.Sleep(r.Next(200))
        e. Push(DateTime.Now)

    0

F# 有 MailboxProcessor。听起来这很适合这个问题。

https://fsharpforfunandprofit.com/posts/concurrency-actor-model/

基本上您需要创建一个使用 F# MailboxProcessor 的代理。在您的 RabbitMQ 回调中,除了将收到的消息(数据)转发(发布)到代理之外,您什么都不做。 MailboxProcessor 将为您进行排队,因此您无需在这里重新发明轮子。

代码为:

type Agent<'Msg> (processMsg) =
    let inbox = MailboxProcessor<'Msg>.Start <| fun inbox ->
        let rec loop () = async {
            let! msg = inbox.Receive ()
            processMsg msg
            return! loop ()
        }
        loop ()
    member this.Post msg =
        inbox.Post msg

[<EntryPoint>]
let main argv =    
    let evt = Event<_> ()
    let agent = Agent<DateTime> (fun msg ->
        printfn "Processing msg: %A" msg
        Thread.Sleep 3000
        printfn "Processed msg: %A" msg
    )

    evt.Publish.Subscribe (fun msg ->
        printfn "Received: %A" msg
        // forward msg to the agent:
        agent.Post msg)
    |> ignore

    let rec loop () = async {
        do! Async.Sleep 500
        evt.Trigger DateTime.Now
        return! loop ()
    }
    let cts = new CancellationTokenSource ()
    Async.Start (loop (), cts.Token)
    Console.ReadKey true |> ignore
    cts.Cancel ()

    0

如果您 运行 此代码,您将看到文本 "Received: xxx" 每 500 毫秒定期打印一次,无论您处理代理中收到的消息多长时间。