使用 Akka 流时,如何按包含不同值的字符串进行分组?
Working with Akka streams, how can I group by strings that contain different values?
让我澄清一下我的问题:
这是我的代码
def averagePerAttributeFlow[T] = Flow[T]
.groupBy(10000, {
case User(id, location, age) =>
location match {
case Location(city,state,country) =>
println("MATCHING BY CITY")
city+state+country
case Location("n/a",state,country) =>
println("MATCHING BY STATE")
state+country
}
case UserBookRating(userId, bookISBN, rating) => userId
})
我有来自 csv 文件的数据流。有些行的格式是这样的:
"chicago, illinois, usa"
然而有些行是这样的:
" , 伊利诺伊州, 美国"
有些线路甚至有城市垃圾数据:
“###,美国伊利诺伊州”
如何根据包含 illinois、usa 的所有字符串进行分组,而不管是否提供城市?
当用户键入 "chicago, illinois, usa" 时,分组有效,但当用户键入 "n/a, illinois, usa" 时,分组无效。
n/a 表示用户不关心城市
编辑:
好的,所以我发现可能是我的过滤流程没有按预期工作。这是代码:
def filterByAttributeFlow[T](attr: String) = Flow[T].filter( {
case user: User =>
val attrSplit = attr.split(",").map(_.trim())
attrSplit match {
case Array(city,state,country) =>
user.location.isCity(city, state, country) // check if location matches
case Array(_,state,country) =>
println("Filtering by State")
user.location.isState(state,country)
}
case bookRating: UserBookRating =>
bookRating.userId.contentEquals(attr) // check if userId matches
}).withAttributes(ActorAttributes.supervisionStrategy {
e: Throwable =>
system.log.error(s"Error filtering ratings by '$attr': {}", e)
Supervision.Resume // skips the erroneous data and resumes the stream
})
如何过滤并保留所有包含美国伊利诺伊州的数据条目,即使城市未提供或充满垃圾数据?
我想通了。我不得不在过滤器流程中切换这两种情况并将 _ 更改为 "n/a"...我想我只需要与 "rubber duck" 交谈:]
让我澄清一下我的问题:
这是我的代码
def averagePerAttributeFlow[T] = Flow[T]
.groupBy(10000, {
case User(id, location, age) =>
location match {
case Location(city,state,country) =>
println("MATCHING BY CITY")
city+state+country
case Location("n/a",state,country) =>
println("MATCHING BY STATE")
state+country
}
case UserBookRating(userId, bookISBN, rating) => userId
})
我有来自 csv 文件的数据流。有些行的格式是这样的:
"chicago, illinois, usa"
然而有些行是这样的:
" , 伊利诺伊州, 美国"
有些线路甚至有城市垃圾数据:
“###,美国伊利诺伊州”
如何根据包含 illinois、usa 的所有字符串进行分组,而不管是否提供城市?
当用户键入 "chicago, illinois, usa" 时,分组有效,但当用户键入 "n/a, illinois, usa" 时,分组无效。 n/a 表示用户不关心城市
编辑:
好的,所以我发现可能是我的过滤流程没有按预期工作。这是代码:
def filterByAttributeFlow[T](attr: String) = Flow[T].filter( {
case user: User =>
val attrSplit = attr.split(",").map(_.trim())
attrSplit match {
case Array(city,state,country) =>
user.location.isCity(city, state, country) // check if location matches
case Array(_,state,country) =>
println("Filtering by State")
user.location.isState(state,country)
}
case bookRating: UserBookRating =>
bookRating.userId.contentEquals(attr) // check if userId matches
}).withAttributes(ActorAttributes.supervisionStrategy {
e: Throwable =>
system.log.error(s"Error filtering ratings by '$attr': {}", e)
Supervision.Resume // skips the erroneous data and resumes the stream
})
如何过滤并保留所有包含美国伊利诺伊州的数据条目,即使城市未提供或充满垃圾数据?
我想通了。我不得不在过滤器流程中切换这两种情况并将 _ 更改为 "n/a"...我想我只需要与 "rubber duck" 交谈:]