如何缓冲和删除带有分隔符的分块字节串?
How to buffer and drop a chunked bytestring with a delimiter?
假设您有一个发布者使用广播与一些快速和一些慢速订阅者,并且希望能够为慢速订阅者丢弃消息集而不必将它们保存在内存中。数据由分块的 ByteString 组成,因此不能删除单个 ByteString。
每组 ByteString 后跟一个终止符 ByteString("\n"),因此我需要删除一组以此结尾的 ByteString。
你可以用自定义图表阶段来做这件事吗?是否可以在不聚合并将整个集合保存在内存中的情况下完成?
避免自定义阶段
只要有可能尽量避免自定义阶段,它们很难正确而且非常冗长。通常标准的 akka-stream 阶段和普通的旧函数的一些组合就可以做到这一点。
掉组
大概您有一些标准可以用来决定丢弃哪组邮件:
type ShouldDropTester : () => Boolean
出于演示目的,我将使用一个简单的开关来删除其他组:
val dropEveryOther : ShouldDropTester =
Iterator.from(1)
.map(_ % 2 == 0)
.next
我们还需要一个函数来接收 ShouldDropTester
并使用它来确定是否应该删除某个 ByteString
:
val endOfFile = ByteString("\n")
val dropGroupPredicate : ShouldDropTester => ByteString => Boolean =
(shouldDropTester) => {
var dropGroup = shouldDropTester()
(byteString) =>
if(byteString equals endOfFile) {
val returnValue = dropGroup
dropGroup = shouldDropTester()
returnValue
}
else {
dropGroup
}
}
组合以上两个函数将删除每隔一组的 ByteString。然后可以将此功能转换为 Flow
:
val filterPredicateFunction : ByteString => Boolean =
dropGroupPredicate(dropEveryOther)
val dropGroups : Flow[ByteString, ByteString, _] =
Flow[ByteString] filter filterPredicateFunction
根据需要:不需要缓冲消息组,谓词将作用于单个 ByteString,因此无论文件大小如何,都会消耗恒定量的内存。
假设您有一个发布者使用广播与一些快速和一些慢速订阅者,并且希望能够为慢速订阅者丢弃消息集而不必将它们保存在内存中。数据由分块的 ByteString 组成,因此不能删除单个 ByteString。
每组 ByteString 后跟一个终止符 ByteString("\n"),因此我需要删除一组以此结尾的 ByteString。
你可以用自定义图表阶段来做这件事吗?是否可以在不聚合并将整个集合保存在内存中的情况下完成?
避免自定义阶段
只要有可能尽量避免自定义阶段,它们很难正确而且非常冗长。通常标准的 akka-stream 阶段和普通的旧函数的一些组合就可以做到这一点。
掉组
大概您有一些标准可以用来决定丢弃哪组邮件:
type ShouldDropTester : () => Boolean
出于演示目的,我将使用一个简单的开关来删除其他组:
val dropEveryOther : ShouldDropTester =
Iterator.from(1)
.map(_ % 2 == 0)
.next
我们还需要一个函数来接收 ShouldDropTester
并使用它来确定是否应该删除某个 ByteString
:
val endOfFile = ByteString("\n")
val dropGroupPredicate : ShouldDropTester => ByteString => Boolean =
(shouldDropTester) => {
var dropGroup = shouldDropTester()
(byteString) =>
if(byteString equals endOfFile) {
val returnValue = dropGroup
dropGroup = shouldDropTester()
returnValue
}
else {
dropGroup
}
}
组合以上两个函数将删除每隔一组的 ByteString。然后可以将此功能转换为 Flow
:
val filterPredicateFunction : ByteString => Boolean =
dropGroupPredicate(dropEveryOther)
val dropGroups : Flow[ByteString, ByteString, _] =
Flow[ByteString] filter filterPredicateFunction
根据需要:不需要缓冲消息组,谓词将作用于单个 ByteString,因此无论文件大小如何,都会消耗恒定量的内存。