如何将基于 Actor 的逻辑迁移到 Akka Streams?

How to migrate Actor-based logic to Akka Streams?

我使用 Akka 应用程序已有一段时间了。 95%的代码都是用纯演员写的。现在我要将应用程序的某些部分移动到 Akka Streams。 让我了解以下逻辑在 Akka Streams 方面的外观:

+------------+                                             
| CreateUser |                                             
+------------+                                             
      |                                                    
      |                                                    
+------------+     +-------------------+                   
| CheckEmail |-----|EmailIsAlreadyInUse|                   
+------------+     +-------------------+                   
      |                                                    
      |                                                    
+------------+     +-------------------+                   
|3rdPartyCall|-----|NoUserInInternalDB |                   
+------------+     +-------------------+                   
      |                                                    
      |                                                    
+------------+     +-------------------+                   
|  SaveUser  |-----|    UserDBError    |                   
+------------+     +-------------------+                   
      |                                                    
      |                                                    
+------------+                                             
| UserSaved  |                                             
+------------+   

在当前的实现中,所有块都是我发送给适当参与者的消息。如果消息流成功,我会向发件人发回一条 UserSaved 消息。否则,我会向发件人发回其中一条验证消息:EmailIsAlreadyInUseNoUserInInternalDBUserDBError.

这是一组消息:

case class CreateUser(email: String)
case class CheckEmailUniqueness(email: String)
case class ExternalServiceValidation(email: String)
case class SaveUser(email: String)

sealed trait CreateUserResult
sealed trait CreateUserError
case class UserCreated(email: String) extends CreateUserResult
case class EmailIsAlreadyInUse(email: String) extends CreateUserResult with CreateUserError
case class NoUserInExternalDB(email: String) extends CreateUserResult with CreateUserError
case class UserDBError(email: String) extends CreateUserResult with CreateUserError

如何将此逻辑迁移到 Akka Streams?

消息结构

因为 akka-stream 数据是单向传输的,从源到接收器,没有 "send back to the sender" 功能。您唯一的选择是不断将消息转发到下一步。

因此,我认为您只需要在邮件周围添加一些额外的结构。 Either 构造似乎对此很有用。假设您的 CreateUser Actor 有一个 self-contained 函数:

def createUserFunction(createUser : CreateUser) : UserCreated = ???

这之后可以跟一个函数 CheckEmail:

val Set[String] existingEmails = ???

def checkEmailUniqueness(userCreated : UserCreated) : Either[CreateUserError, UserCreated] =
  if(existingEmails contains userCreated.email)
    Left(EmailIsAlreadyInUse(userCreated.email))
  else
    Right(createUser)

类似地,3rdPartyCall 也会 return 一个 Either:

 def thirdPartyLibraryFunction(userCreated : UserCreated) : Boolean = ???

 def thirdPartyCall(userCreated : UserCreated) : Either[CreateUserError, UserCreated] = 
   if(!thirdPartyLibraryFunction(userCreated))
     Left(NoUserInExternalDB(userCreated.email))
   else
     Right(userCreated)

Akka 流构建

通过这种结构化的消息传递,您现在可以创建一个只在一个方向上移动的流。我们首先创建一个 Flow 来创建用户:

 val createUserFlow : Flow[CreateUser, UserCreated, _] = 
   Flow[CreateUser] map (createUserFunction)

然后一封电子邮件检查流程:

 val emailFlow : Flow[UserCreated, Either[CreateUserError, UserCreated],_] = 
   Flow[UserCreated] map (checkEmailUniqueness)

现在制作第三方调用的 Flow:

 val thirdPartyFlow : Flow[UserCreated, Either[CreateUserError, UserCreated],_] = 
   Flow[UserCreated] map (_ flatMap thirdPartyCall)

这些流现在可以与 SourceSink 一起构成流的基础:

 val userSource : Source[CreateUser, _] = ???

 val userSink : Sink[Either[CreateUserError, UserCreated], _] = 
   Sink[Either[CreateUserError, UserCreated]] foreach {
     case Left(error) =>
       System.err.println("Error with user creation : " error.email)
     case Right(userCreated) =>
       System.out.println("User Created: " userCreated.email)
   }

 //create the full stream
 userSource
   .via(createUserFlow)
   .via(emailFlow)
   .via(thirdPartyFlow)
   .to(userSink)
   .run()