使用 Akka 流时,如何按包含不同值的字符串进行分组?

Working with Akka streams, how can I group by strings that contain different values?

让我澄清一下我的问题:

这是我的代码

def averagePerAttributeFlow[T] = Flow[T]
.groupBy(10000, {
  case User(id, location, age) =>
    location match {
      case Location(city,state,country) =>
        println("MATCHING BY CITY")
        city+state+country
      case Location("n/a",state,country) =>
        println("MATCHING BY STATE")
        state+country
    }
  case UserBookRating(userId, bookISBN, rating) => userId
})

我有来自 csv 文件的数据流。有些行的格式是这样的:

"chicago, illinois, usa"

然而有些行是这样的:

" , 伊利诺伊州, 美国"

有些线路甚至有城市垃圾数据:

“###,美国伊利诺伊州”

如何根据包含 illinois、usa 的所有字符串进行分组,而不管是否提供城市?

当用户键入 "chicago, illinois, usa" 时,分组有效,但当用户键入 "n/a, illinois, usa" 时,分组无效。 n/a 表示用户不关心城市

编辑:

好的,所以我发现可能是我的过滤流程没有按预期工作。这是代码:

def filterByAttributeFlow[T](attr: String) = Flow[T].filter( {
  case user: User =>
    val attrSplit = attr.split(",").map(_.trim())
    attrSplit match {
      case Array(city,state,country) =>
        user.location.isCity(city, state, country) // check if location matches
      case Array(_,state,country) =>
        println("Filtering by State")
        user.location.isState(state,country)
    }
  case bookRating: UserBookRating =>
    bookRating.userId.contentEquals(attr) // check if userId matches
    }).withAttributes(ActorAttributes.supervisionStrategy {
      e: Throwable =>
        system.log.error(s"Error filtering ratings by '$attr': {}", e)
        Supervision.Resume // skips the erroneous data and resumes the stream
    })

如何过滤并保留所有包含美国伊利诺伊州的数据条目,即使城市未提供或充满垃圾数据?

我想通了。我不得不在过滤器流程中切换这两种情况并将 _ 更改为 "n/a"...我想我只需要与 "rubber duck" 交谈:]