如何缓冲和删除带有分隔符的分块字节串?

How to buffer and drop a chunked bytestring with a delimiter?

假设您有一个发布者使用广播与一些快速和一些慢速订阅者,并且希望能够为慢速订阅者丢弃消息集而不必将它们保存在内存中。数据由分块的 ByteString 组成,因此不能删除单个 ByteString。

每组 ByteString 后跟一个终止符 ByteString("\n"),因此我需要删除一组以此结尾的 ByteString。

你可以用自定义图表阶段来做这件事吗?是否可以在不聚合并将整个集合保存在内存中的情况下完成?

避免自定义阶段

只要有可能尽量避免自定义阶段,它们很难正确而且非常冗长。通常标准的 akka-stream 阶段和普通的旧函数的一些组合就可以做到这一点。

掉组

大概您有一些标准可以用来决定丢弃哪组邮件:

type ShouldDropTester : () => Boolean

出于演示目的,我将使用一个简单的开关来删除其他组:

val dropEveryOther : ShouldDropTester = 
  Iterator.from(1)
          .map(_ % 2 == 0)
          .next

我们还需要一个函数来接收 ShouldDropTester 并使用它来确定是否应该删除某个 ByteString

val endOfFile = ByteString("\n")

val dropGroupPredicate : ShouldDropTester => ByteString => Boolean = 
  (shouldDropTester) => {
    var dropGroup = shouldDropTester()

    (byteString) => 
      if(byteString equals endOfFile) {
        val returnValue = dropGroup
        dropGroup = shouldDropTester()
        returnValue
      }
      else {
        dropGroup
      }      
  }

组合以上两个函数将删除每隔一组的 ByteString。然后可以将此功能转换为 Flow:

val filterPredicateFunction : ByteString => Boolean =
  dropGroupPredicate(dropEveryOther)

val dropGroups : Flow[ByteString, ByteString, _] =
  Flow[ByteString] filter filterPredicateFunction

根据需要:不需要缓冲消息组,谓词将作用于单个 ByteString,因此无论文件大小如何,都会消耗恒定量的内存。