如何将基于 Actor 的逻辑迁移到 Akka Streams?
How to migrate Actor-based logic to Akka Streams?
我使用 Akka 应用程序已有一段时间了。 95%的代码都是用纯演员写的。现在我要将应用程序的某些部分移动到 Akka Streams。
让我了解以下逻辑在 Akka Streams 方面的外观:
+------------+
| CreateUser |
+------------+
|
|
+------------+ +-------------------+
| CheckEmail |-----|EmailIsAlreadyInUse|
+------------+ +-------------------+
|
|
+------------+ +-------------------+
|3rdPartyCall|-----|NoUserInInternalDB |
+------------+ +-------------------+
|
|
+------------+ +-------------------+
| SaveUser |-----| UserDBError |
+------------+ +-------------------+
|
|
+------------+
| UserSaved |
+------------+
在当前的实现中,所有块都是我发送给适当参与者的消息。如果消息流成功,我会向发件人发回一条 UserSaved
消息。否则,我会向发件人发回其中一条验证消息:EmailIsAlreadyInUse
或 NoUserInInternalDB
或 UserDBError
.
这是一组消息:
case class CreateUser(email: String)
case class CheckEmailUniqueness(email: String)
case class ExternalServiceValidation(email: String)
case class SaveUser(email: String)
sealed trait CreateUserResult
sealed trait CreateUserError
case class UserCreated(email: String) extends CreateUserResult
case class EmailIsAlreadyInUse(email: String) extends CreateUserResult with CreateUserError
case class NoUserInExternalDB(email: String) extends CreateUserResult with CreateUserError
case class UserDBError(email: String) extends CreateUserResult with CreateUserError
如何将此逻辑迁移到 Akka Streams?
消息结构
因为 akka-stream 数据是单向传输的,从源到接收器,没有 "send back to the sender" 功能。您唯一的选择是不断将消息转发到下一步。
因此,我认为您只需要在邮件周围添加一些额外的结构。 Either
构造似乎对此很有用。假设您的 CreateUser
Actor 有一个 self-contained 函数:
def createUserFunction(createUser : CreateUser) : UserCreated = ???
这之后可以跟一个函数 CheckEmail
:
val Set[String] existingEmails = ???
def checkEmailUniqueness(userCreated : UserCreated) : Either[CreateUserError, UserCreated] =
if(existingEmails contains userCreated.email)
Left(EmailIsAlreadyInUse(userCreated.email))
else
Right(createUser)
类似地,3rdPartyCall
也会 return 一个 Either:
def thirdPartyLibraryFunction(userCreated : UserCreated) : Boolean = ???
def thirdPartyCall(userCreated : UserCreated) : Either[CreateUserError, UserCreated] =
if(!thirdPartyLibraryFunction(userCreated))
Left(NoUserInExternalDB(userCreated.email))
else
Right(userCreated)
Akka 流构建
通过这种结构化的消息传递,您现在可以创建一个只在一个方向上移动的流。我们首先创建一个 Flow
来创建用户:
val createUserFlow : Flow[CreateUser, UserCreated, _] =
Flow[CreateUser] map (createUserFunction)
然后一封电子邮件检查流程:
val emailFlow : Flow[UserCreated, Either[CreateUserError, UserCreated],_] =
Flow[UserCreated] map (checkEmailUniqueness)
现在制作第三方调用的 Flow:
val thirdPartyFlow : Flow[UserCreated, Either[CreateUserError, UserCreated],_] =
Flow[UserCreated] map (_ flatMap thirdPartyCall)
这些流现在可以与 Source
和 Sink
一起构成流的基础:
val userSource : Source[CreateUser, _] = ???
val userSink : Sink[Either[CreateUserError, UserCreated], _] =
Sink[Either[CreateUserError, UserCreated]] foreach {
case Left(error) =>
System.err.println("Error with user creation : " error.email)
case Right(userCreated) =>
System.out.println("User Created: " userCreated.email)
}
//create the full stream
userSource
.via(createUserFlow)
.via(emailFlow)
.via(thirdPartyFlow)
.to(userSink)
.run()
我使用 Akka 应用程序已有一段时间了。 95%的代码都是用纯演员写的。现在我要将应用程序的某些部分移动到 Akka Streams。 让我了解以下逻辑在 Akka Streams 方面的外观:
+------------+
| CreateUser |
+------------+
|
|
+------------+ +-------------------+
| CheckEmail |-----|EmailIsAlreadyInUse|
+------------+ +-------------------+
|
|
+------------+ +-------------------+
|3rdPartyCall|-----|NoUserInInternalDB |
+------------+ +-------------------+
|
|
+------------+ +-------------------+
| SaveUser |-----| UserDBError |
+------------+ +-------------------+
|
|
+------------+
| UserSaved |
+------------+
在当前的实现中,所有块都是我发送给适当参与者的消息。如果消息流成功,我会向发件人发回一条 UserSaved
消息。否则,我会向发件人发回其中一条验证消息:EmailIsAlreadyInUse
或 NoUserInInternalDB
或 UserDBError
.
这是一组消息:
case class CreateUser(email: String)
case class CheckEmailUniqueness(email: String)
case class ExternalServiceValidation(email: String)
case class SaveUser(email: String)
sealed trait CreateUserResult
sealed trait CreateUserError
case class UserCreated(email: String) extends CreateUserResult
case class EmailIsAlreadyInUse(email: String) extends CreateUserResult with CreateUserError
case class NoUserInExternalDB(email: String) extends CreateUserResult with CreateUserError
case class UserDBError(email: String) extends CreateUserResult with CreateUserError
如何将此逻辑迁移到 Akka Streams?
消息结构
因为 akka-stream 数据是单向传输的,从源到接收器,没有 "send back to the sender" 功能。您唯一的选择是不断将消息转发到下一步。
因此,我认为您只需要在邮件周围添加一些额外的结构。 Either
构造似乎对此很有用。假设您的 CreateUser
Actor 有一个 self-contained 函数:
def createUserFunction(createUser : CreateUser) : UserCreated = ???
这之后可以跟一个函数 CheckEmail
:
val Set[String] existingEmails = ???
def checkEmailUniqueness(userCreated : UserCreated) : Either[CreateUserError, UserCreated] =
if(existingEmails contains userCreated.email)
Left(EmailIsAlreadyInUse(userCreated.email))
else
Right(createUser)
类似地,3rdPartyCall
也会 return 一个 Either:
def thirdPartyLibraryFunction(userCreated : UserCreated) : Boolean = ???
def thirdPartyCall(userCreated : UserCreated) : Either[CreateUserError, UserCreated] =
if(!thirdPartyLibraryFunction(userCreated))
Left(NoUserInExternalDB(userCreated.email))
else
Right(userCreated)
Akka 流构建
通过这种结构化的消息传递,您现在可以创建一个只在一个方向上移动的流。我们首先创建一个 Flow
来创建用户:
val createUserFlow : Flow[CreateUser, UserCreated, _] =
Flow[CreateUser] map (createUserFunction)
然后一封电子邮件检查流程:
val emailFlow : Flow[UserCreated, Either[CreateUserError, UserCreated],_] =
Flow[UserCreated] map (checkEmailUniqueness)
现在制作第三方调用的 Flow:
val thirdPartyFlow : Flow[UserCreated, Either[CreateUserError, UserCreated],_] =
Flow[UserCreated] map (_ flatMap thirdPartyCall)
这些流现在可以与 Source
和 Sink
一起构成流的基础:
val userSource : Source[CreateUser, _] = ???
val userSink : Sink[Either[CreateUserError, UserCreated], _] =
Sink[Either[CreateUserError, UserCreated]] foreach {
case Left(error) =>
System.err.println("Error with user creation : " error.email)
case Right(userCreated) =>
System.out.println("User Created: " userCreated.email)
}
//create the full stream
userSource
.via(createUserFlow)
.via(emailFlow)
.via(thirdPartyFlow)
.to(userSink)
.run()