如何从主 UI 线程中处理异步队列?

How to process an asynchronous queue from within the main UI thread?

我正在设计两个组件,它们异步接收自定义 class (TMELogMessage) 的对象并将它们存储在线程安全的内部容器中。第一个组件是非可视化的 (TMEFileLogger),应该将这些对象的一些信息写入日志文件(不足为奇)。第二个组件 (TMELogGrid) 是一个视觉 FMX.Grid 后代,它应该可视化 UI 中这些对象的一些信息。但是我认为他们对这些对象所做的是无关紧要的。

我面临的问题是,这些组件实际上并不知道这些对象什么时候会在它们的内部容器中入队,所以它们必须自己检查容器,看看是否有任何新的对象需要处理,处理他们并将他们从队列中删除。理想情况下,我希望在应用程序不太忙时以类似于操作更新的方式完成此操作,以免陷入 UI.

组件挂钩 Application.OnIdle 之类的事件处理程序显然是错误的...我也许可以订阅 TIdleMessage,但我不确定这是个好主意,因为我已经读到一些应用程序永远不会闲置。使用内部计时器似乎有点过时。我也许可以使用低优先级线程来轮询队列,然后仅当我找到要处理的对象时才与 UI 同步。不过我没有其他想法。

在 Delphi + 多平台 FireMonkey 中执行此操作的正确方法是什么?

队列实现通常实现应用程序代码可以等待的事件(OS synchronization object,而不是 VCL 'event')。事件是 set/fired/triggered/however 每当一个项目被添加到一个空队列时(或者,如果多个项目被添加到一个 "batch" 中,在它们全部被添加之后。精确的模式可能各不相同)。如果您的队列是您自己的实现,那么我会考虑在您的实现中添加这样的机制。

为了避免阻塞 UI,应用程序代码创建了一个轻量级线程,其唯一目的是等待该队列事件,将队列中的项目从队列中移出到 UI 线程安全容器中然后通知 UI 线程有要处理的项目。然后监视线程继续等待事件发出信号,表明队列中还有 更多 个项目。

在 VCL 应用程序中,监视线程通知 UI 的机制可能是一个简单的 Synchronized 过程或(我建议)基于消息的通知发布到某种形式负责UI 项目处理。

注意: 队列监控线程通常还负责处理 application/UI 不再关心处理项目(例如正在关闭)等情况还侦听 "cancel" 或 "terminate" 事件,该事件向线程发出信号使项目出列但丢弃它们(或以此时适合应用程序需要的任何方式处理它们)然后终止(即, 监控线程退出)。

我不喜欢回答我自己的问题,但我希望这个问题得到回答,因为它可能对其他人有帮助。虽然 Deltics 的回答很有用,但这不是我决定采用的方式。我遵循了雷米评论中的建议,并将所有内容封装在组件和表单可以使用的消息处理 class 中。所以 TMEFileLogger 和 TMELogGrid 现在都使用这个新的 TMEMessageHandler class.

的一个实例

这里有一些界面代码来解释我做了什么。请记住,这是对 rtl System.Messaging 单元的替代和增强。 rtl 消息系统的问题是它只提供发送同步消息。我想要一个更丰富的界面。这是我的消息管理器的样子:

  TMEMessageManager = Class
    ...
  Public
    ...
    Procedure PostDelayedEnvelope(Const Envelope: TMEMessageEnvelope; Const DelayMSec: Cardinal; Const ADispose: Boolean = True); Inline;
    Procedure PostDelayedMessage(Const Sender: TObject; AMessage: TMessage; Const DelayMSec: Cardinal; Const ADispose: Boolean = True); Inline;
    Procedure PostEnvelope(Const Envelope: TMEMessageEnvelope; Const ADispose: Boolean = True); Inline;
    Procedure PostMessage(Const Sender: TObject; AMessage: TMessage; Const ADispose: Boolean = True); Inline;
    Procedure SendEnvelope(Const Envelope: TMEMessageEnvelope; Const ADispose: Boolean = True); Inline;
    Procedure SendMessage(Const Sender: TObject; AMessage: TMessage; Const ADispose: Boolean = True); Inline;

    Function Subscribe(Const AMessageClass: TClass; Const AReceiver: IMEEnvelopeReceiver): Integer; Overload;
    Function Subscribe(Const AMessageClass: TClass; Const AMethod: TMessageMethod): Integer; Overload; Deprecated 'Use TMEMessageManager.Subscribe(AMessageClass, AReceiver)';
    Function Subscribe(Const AMessageClass: TClass; Const AProcedure: TMessageProcedure): Integer; Overload; Deprecated 'Use TMEMessageManager.Subscribe(AMessageClass, AReceiver)';

    Procedure Unsubscribe(Const AMessageClass: TClass; ID: Integer; Const Immediate: Boolean = False); Overload;
    Procedure Unsubscribe(Const AMessageClass: TClass; Const AReceiver: IMEEnvelopeReceiver; Const Immediate: Boolean = False); Overload;
    Procedure Unsubscribe(Const AMessageClass: TClass; Const AMethod: TMessageMethod; Const Immediate: Boolean = False); Overload; Deprecated;
    Procedure Unsubscribe(Const AMessageClass: TClass; Const AProcedure: TMessageProcedure; Const Immediate: Boolean = False); Overload; Deprecated;
    ...
  End;

TMEMessageEnvelope 是消息的包装器,定义为:

Type
  TMEMessageEnvelope = Class(TMEPersistent)
  Public
    ...
    Property Infos: TMEMessageInfos Read FInfos;
    Property Sender: TObject Read FSender;
    Property Msg: TMessage Read FMsg;
  End;

通过信封接收器订阅的接收器将同时接收同步和异步消息。这是首选的订阅方法。通过对象方法或过程订阅的接收者将只接收同步消息。这是为了与 RTL 消息传递系统兼容而保留的,但已弃用。

问题是 RTL 消息不能按原样 posted。订阅的消费者只需提供一个过程或一个对象方法来立即使用消息。为了 post 消息以便以后可以异步使用,它需要被包装和排队。这样发送者就与接收者隔离了。所以实际上...消息是 posted(立即或延迟)首先将它们包装在信封中。

以下是此消息系统中涉及的基本接口:

Type

  IMEClonableMessage = Interface(IInterface)
    ['{45B223E2-DCA8-4E42-9847-6A3FCC910891}']
    Function Clone: TMessage;
  End;

  IMEMessageSender = Interface(IInterface)
    ['{99AFDC4A-CE30-41A3-9AA5-D49F2F1106BD}']
    Procedure PostDelayedMessage(const M: TMessage; Const DelayMSec: Cardinal);
    Procedure PostMessage(Const M: TMessage);
    Procedure SendMessage(Const M: TMessage);
  End;

  IMEEnvelopeSender = Interface(IInterface)
    ['{C3AEC52C-A558-40AB-B07B-3000ECDB9C0C}']
    Procedure PostDelayedEnvelope(Const Envelope: TMEMessageEnvelope; Const DelayMSec: Cardinal);
    Procedure PostEnvelope(Const Envelope: TMEMessageEnvelope);
    Procedure SendEnvelope(Const Envelope: TMEMessageEnvelope);
  End;

  IMEEnvelopeReceiver = Interface(IInterface)
    ['{7D464713-2F25-4666-AAF8-757AF07688C3}']
    Procedure ClearEnvelopes;
    Procedure ProcessEnvelope;
    Procedure ProcessEnvelopes;
    Function QueueEnvelope(Const Envelope: TMEMessageEnvelope): Integer;
    Procedure ReceiveEnvelope(Const Envelope: TMEMessageEnvelope);
    Procedure Subscribe(Const AMessageClass: TClass);
    Procedure Unsubscribe(Const AMessageClass: TClass);
  End;

IMEClonableMessage接口用于克隆消息...必须克隆异步消息...因为如果同一条消息有很多订阅者,每个订阅者都会在不同的时间接收和消费消息,所以最好每个人都有自己的消息副本。

其他界面,我认为,不言自明。

最后是 TMEMessageHandler class:

  TMEMessageHandler = Class(TMEPersistent, IMEMessageSender, IMEEnvelopeSender, IMEEnvelopeReceiver)
    /// <summary>Basic thread-safe class that can send and receive synchronous and asynchronous messages and envelopes.</summary>
  Private
    FLock:                 TObject;
    FMessageManager:       TMEMessageManager;
    FSubscriptions:        TDictionary<TClass, Integer>;
    FEnvelopes:            TObjectList<TMEMessageEnvelope>;
    FOnReceiveEnvelope:    TReceiveEnvelopeEvent;
    FAutoProcessEnvelopes: Boolean;
    Procedure _Lock;
    Procedure _Unlock;
    Procedure ClearSubscriptions;
    Function GetMessageManager: TMEMessageManager;
    Procedure SetAutoProcessEnvelopes(Const Value: Boolean);
    Procedure SetMessageManager(Const Value: TMEMessageManager);
  Protected
    Function QueryInterface(Const IID: TGuid; Out Obj): HResult; Stdcall;
    Function _AddRef: Integer; Stdcall;
    Function _Release: Integer; Stdcall;
    Procedure DoReceiveEnvelope(Const Envelope: TMEMessageEnvelope);
    Procedure PostDelayedEnvelope(Const Envelope: TMEMessageEnvelope; Const DelayMSec: Cardinal);
    Procedure PostDelayedMessage(Const M: TMessage; Const DelayMSec: Cardinal);
    Procedure PostEnvelope(Const Envelope: TMEMessageEnvelope);
    Procedure PostMessage(Const M: TMessage);
    Procedure SendEnvelope(Const Envelope: TMEMessageEnvelope);
    Procedure SendMessage(Const M: TMessage);
    Function QueueEnvelope(Const Envelope: TMEMessageEnvelope): Integer;
    Procedure ReceiveEnvelope(Const Envelope: TMEMessageEnvelope);
  Public
    Constructor Create; Override;
    Destructor Destroy; Override;
    Procedure ClearEnvelopes;
    Procedure ProcessEnvelope;
    Procedure ProcessEnvelopes;
    Procedure Subscribe(Const AMessageClass: TClass);
    Procedure Unsubscribe(Const AMessageClass: TClass);
    Property AutoProcessEnvelopes: Boolean Read FAutoProcessEnvelopes Write SetAutoProcessEnvelopes Default True;
    Property MessageManager: TMEMessageManager Read GetMessageManager Write SetMessageManager;
    Property OnReceiveEnvelope: TReceiveEnvelopeEvent Read FOnReceiveEnvelope Write FOnReceiveEnvelope;
  End;

这一切是如何运作的

TMEMessageHandler 立即将任何订阅和取消订阅调用委托给 MessageManager。它将始终订阅提供自己作为 IMEEnvelopeReceiver。它在其内部字典中跟踪订阅,以便在取消订阅时更有效率。

它还会立即将任何调用委托给 Send、Post 和 PostDelayed 方法。 TMEMessageManager:

  • 向订阅的过程发送消息(作为 RTL)
  • 向订阅的对象方法发送消息(作为 RTL)
  • 通过致电订阅的收件人向他们发送信封 接收信封方法
  • Posts 个信封(和信封包裹的消息)订阅 接收者通过使用克隆的副本调用他们的 QeueEnvelope 方法 信封
  • Posts 延迟订阅的信封(和信封包裹的消息) 接收器首先在内部轻量级线程中排队 (TMEDelayedEnvelopeDeliverer) 本身有消息管理器 延迟过去后交付它们

作为接收器,TMEMessageHandler 通过简单地委托给 OnReceiveEnvelope 事件处理程序来实现 ReceiveEnvelope。

Posted 信封由 QueueEnvelope 方法接收,该方法将信封添加到其线程安全队列中,然后,但仅当 AutoProcessEnvelopes 为 True 时,使用主线程的 Queue 调用其自己的 ProcessEnvelope 方法(根据雷米的建议):

Function TMEMessageHandler.QueueEnvelope(Const Envelope: TMEMessageEnvelope): Integer;
Begin
  _Lock;
  Try
    FEnvelopes.Add(Envelope);
    Result := FEnvelopes.Count;
  Finally
    _Unlock;
  End;
  If AutoProcessEnvelopes Then
    TThread.Queue(Nil,
      Procedure
      Begin
        ProcessEnvelope;
      End);
End;

ProcessEnvelope 方法从线程安全队列中提取信封,调用 ReceiveEnvelope 方法(与消息管理器为同步消息调用的方法相同),然后释放信封(请记住,这是为此目的的克隆副本接收器):

Procedure TMEMessageHandler.ProcessEnvelope;
Var
  E: TMEMessageEnvelope;
Begin
  If FEnvelopes.Count > 0 Then Begin
    _Lock;
    Try
      E := FEnvelopes.Extract(FEnvelopes[0]);
    Finally
      _Unlock;
    End;
    E.UpdateInfo(mieReceived);
    ReceiveEnvelope(E);
    E.Free;
  End;
End;

ProcessEnvelopes 方法只是根据需要多次调用前者来清空异步消息队列:

Procedure TMEMessageHandler.ProcessEnvelopes;
Begin
  While FEnvelopes.Count > 0 Do
    ProcessEnvelope;
End;

TMEMessageHandler是如何使用的

将 TMELogMessage 定义为 IMEClonableMessage 来处理要记录的信息后,TMEFileLogger 和其他组件的最小实现如下所示:

Type
  TMEFileLogger = Class(TMEComponent)
  Private
    ...
    FMessageHandler:    TMEMessagehandler;
  Protected
    ...
    Procedure ReceiveLogEnvelope(Const Envelope: TMEMessageEnvelope);
    Property MessageHandler: TMEMessagehandler Read FMessageHandler;
  Public
    Constructor Create(AOwner: TComponent); Override;
    Destructor Destroy; Override;
    ...
  End;

Constructor TMEFileLogger.Create(AOwner: TComponent);
Begin
  Inherited;
  ...
  FMessageHandler                  := TMEMessagehandler.Create;
  MessageHandler.OnReceiveEnvelope := ReceiveLogEnvelope;
  MessageHandler.Subscribe(TMELogMessage);
End;

Destructor TMEFileLogger.Destroy;
Begin
  MessageHandler.Unsubscribe(TMELogMessage);
  MessageHandler.ProcessEnvelopes;
  FreeAndNil(FMessageHandler);
  ...
  Inherited;
End;

Procedure TMEFileLogger.ReceiveLogEnvelope(Const Envelope: TMEMessageEnvelope);
Begin
  If Envelope.Msg Is TMELogMessage Then
    With Envelope.Msg As TMELogMessage Do
      ... something useful ...
End;

抱歉这么长 post,但我希望这对其他人有用。