怎么弄清楚,如果网络服务器仍然处于活动状态
How figure out, if the webserver is still active
我通过 websocket 客户端将来自 Kafka 的传入消息转发到 web 服务器。
以下代码显示了我是如何做的:
import akka.Done
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.model.ws._
import akka.kafka.scaladsl.Consumer
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import com.typesafe.scalalogging.Logger
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import scala.concurrent.{Future, Promise}
final case class WsGraph(logger: Logger, sink: Sink[Message, Future[Done]])(implicit val system: ActorSystem) {
private implicit val materializer = ActorMaterializer()
private implicit val akka = system.settings.config.getConfig("akka.kafka.consumer")
private implicit val executor = system.dispatcher
private val consumerSetup = system.settings.config.getConfig("kafka.consumer.setup")
private val wsSetup = system.settings.config.getConfig("websocket.setup")
private val consumerSettings: ConsumerSettings[String, String] =
ConsumerSettings(akka, new StringDeserializer, new StringDeserializer)
.withBootstrapServers(consumerSetup.getString("bootStrapServers"))
.withGroupId(consumerSetup.getString("groupId"))
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
private val kafkaAsSource: Source[Message, (Consumer.Control, Promise[Option[Message]])] = Consumer
.plainSource(
consumerSettings,
Subscriptions.topics(consumerSetup.getString("topics"))
)
.map(msg => TextMessage(msg.value()))
.concatMat(Source.maybe[Message])(Keep.both)
.mapAsync(Runtime.getRuntime.availableProcessors())(Future(_))
private val socketFlow: Flow[Message, Message, (Consumer.Control, Promise[Option[Message]])] =
Flow.fromSinkAndSourceMat(sink, kafkaAsSource)(Keep.right)
private val (upgradeResponse, (draining, _)) =
Http().singleWebSocketRequest(
WebSocketRequest(wsSetup.getString("server")),
socketFlow)
val create: Future[Either[String, Done]] = upgradeResponse.map { upgrade =>
// just like a regular http request we can access response status which is available via upgrade.response.status
// status code 101 (Switching Protocols) indicates that server support WebSockets
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
logger.info("Switching protocols")
Right(Done)
} else {
Left(s"Connection failed: ${upgrade.response.status}")
}
}
sys.addShutdownHook {
draining.shutdown()
logger.info("Draining websocket ressource.")
}
}
这里的问题是,如果无法访问网络服务器,上面的 actor 就会关闭。
问题是,如果网络服务器不再可用,如何确定 actor 应该重新启动并尝试再次连接。
我认为你的代码
private val (upgradeResponse, (draining, _)) =
Http().singleWebSocketRequest(
WebSocketRequest(wsSetup.getString("server")),
socketFlow)
有 return 类型
(Future[WebSocketUpgradeResponse], T)
因为您只使用 upgradeResponse
即 Future[WebSocketUpgradeResponse]
也许您可以尝试使用 Recover with Retries
重写您的代码
所以你必须更换你的
val create: Future[Either[String, Done]] = upgradeResponse.map { upgrade =>
// just like a regular http request we can access response status which is available via upgrade.response.status
// status code 101 (Switching Protocols) indicates that server support WebSockets
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
logger.info("Switching protocols")
Right(Done)
} else {
Left(s"Connection failed: ${upgrade.response.status}")
}
}
和
planB = Source.empty
Source.fromFuture(upgradeResponse).recoverWithRetries(3, {
case ex: RuntimeException => logger.error("Error", ex); planB
}).runWith(Sink.ignore).map {upgrade =>
// just like a regular http request we can access response status which is available via upgrade.response.status
// status code 101 (Switching Protocols) indicates that server support WebSockets
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
logger.info("Switching protocols")
Right(Done)
} else {
Left(s"Connection failed: ${upgrade.response.status}")
}
}
这里可以为RuntimeException
添加异常处理
详情请参考https://doc.akka.io/docs/akka/2.5.5/scala/stream/stream-error.html
希望对您有所帮助。如果有任何错误,请告诉我。
谢谢
我通过 websocket 客户端将来自 Kafka 的传入消息转发到 web 服务器。 以下代码显示了我是如何做的:
import akka.Done
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.model.ws._
import akka.kafka.scaladsl.Consumer
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import com.typesafe.scalalogging.Logger
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import scala.concurrent.{Future, Promise}
final case class WsGraph(logger: Logger, sink: Sink[Message, Future[Done]])(implicit val system: ActorSystem) {
private implicit val materializer = ActorMaterializer()
private implicit val akka = system.settings.config.getConfig("akka.kafka.consumer")
private implicit val executor = system.dispatcher
private val consumerSetup = system.settings.config.getConfig("kafka.consumer.setup")
private val wsSetup = system.settings.config.getConfig("websocket.setup")
private val consumerSettings: ConsumerSettings[String, String] =
ConsumerSettings(akka, new StringDeserializer, new StringDeserializer)
.withBootstrapServers(consumerSetup.getString("bootStrapServers"))
.withGroupId(consumerSetup.getString("groupId"))
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
private val kafkaAsSource: Source[Message, (Consumer.Control, Promise[Option[Message]])] = Consumer
.plainSource(
consumerSettings,
Subscriptions.topics(consumerSetup.getString("topics"))
)
.map(msg => TextMessage(msg.value()))
.concatMat(Source.maybe[Message])(Keep.both)
.mapAsync(Runtime.getRuntime.availableProcessors())(Future(_))
private val socketFlow: Flow[Message, Message, (Consumer.Control, Promise[Option[Message]])] =
Flow.fromSinkAndSourceMat(sink, kafkaAsSource)(Keep.right)
private val (upgradeResponse, (draining, _)) =
Http().singleWebSocketRequest(
WebSocketRequest(wsSetup.getString("server")),
socketFlow)
val create: Future[Either[String, Done]] = upgradeResponse.map { upgrade =>
// just like a regular http request we can access response status which is available via upgrade.response.status
// status code 101 (Switching Protocols) indicates that server support WebSockets
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
logger.info("Switching protocols")
Right(Done)
} else {
Left(s"Connection failed: ${upgrade.response.status}")
}
}
sys.addShutdownHook {
draining.shutdown()
logger.info("Draining websocket ressource.")
}
}
这里的问题是,如果无法访问网络服务器,上面的 actor 就会关闭。 问题是,如果网络服务器不再可用,如何确定 actor 应该重新启动并尝试再次连接。
我认为你的代码
private val (upgradeResponse, (draining, _)) =
Http().singleWebSocketRequest(
WebSocketRequest(wsSetup.getString("server")),
socketFlow)
有 return 类型
(Future[WebSocketUpgradeResponse], T)
因为您只使用 upgradeResponse
即 Future[WebSocketUpgradeResponse]
也许您可以尝试使用 Recover with Retries
重写您的代码所以你必须更换你的
val create: Future[Either[String, Done]] = upgradeResponse.map { upgrade =>
// just like a regular http request we can access response status which is available via upgrade.response.status
// status code 101 (Switching Protocols) indicates that server support WebSockets
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
logger.info("Switching protocols")
Right(Done)
} else {
Left(s"Connection failed: ${upgrade.response.status}")
}
}
和
planB = Source.empty
Source.fromFuture(upgradeResponse).recoverWithRetries(3, {
case ex: RuntimeException => logger.error("Error", ex); planB
}).runWith(Sink.ignore).map {upgrade =>
// just like a regular http request we can access response status which is available via upgrade.response.status
// status code 101 (Switching Protocols) indicates that server support WebSockets
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
logger.info("Switching protocols")
Right(Done)
} else {
Left(s"Connection failed: ${upgrade.response.status}")
}
}
这里可以为RuntimeException
添加异常处理
详情请参考https://doc.akka.io/docs/akka/2.5.5/scala/stream/stream-error.html
希望对您有所帮助。如果有任何错误,请告诉我。 谢谢