更改 Scala 局部变量范围
Changing scala local variables scope
以下是我为流式应用程序制作的错误处理程序的 Scala 代码片段。它使用 akka 流来消费 Kafka 主题中的消息 ('errormsg'),并将它们写入 Kudu 中的 table。
val kafkaMessages: Source[ConsumerMessage.CommittableMessage[String, Array[Byte]], Consumer.Control] = Consumer.committableSource(
consumerSettings,
Subscriptions.topics(conf.getString("kafka.topics.errorRawCdr")))
val cdrs: Source[Errors, Consumer.Control] = kafkaMessages.map(msg => {
val bytes: Array[Byte] = msg.record.value()
val errormsg = (bytes.map(_.toChar)).mkString
new Errors(1235, "filename", "cdr", "cdr_type", 0, errormsg)
})
cdrs.to(new ErrorKuduSink(session, table)).run()
我想在下方重新使用变量“errormsg”,作为通过电子邮件向我发送该消息的几行的一部分。
如何转义“errormsg”(或者合并下面的代码片段),以便变量范围合适?
send a new Mail (
from = ("errorhandler@domain.com"),
to = "myemailadres@domain.com",
subject = "Encountered error",
message = errormsg
)
在这里,我建议使用 MutableList:
来轻松解决您的问题
val kafkaMessages: Source[ConsumerMessage.CommittableMessage[String, Array[Byte]], Consumer.Control] = Consumer.committableSource(
consumerSettings,
Subscriptions.topics(conf.getString("kafka.topics.errorRawCdr")))
import scala.collection.mutable._
val errorMessages: MutableList[String] = new MutableList
val cdrs: Source[Errors, Consumer.Control] = kafkaMessages.map(msg => {
val bytes: Array[Byte] = msg.record.value()
val errormsg = (bytes.map(_.toChar)).mkString
errorMessages += errormsg
new Errors(1235, "filename", "cdr", "cdr_type", 0, errormsg)
})
cdrs.to(new ErrorKuduSink(session, table)).run()
解决方案 1:直接在您的地图方法中发送电子邮件(将在每条 kafka 消息上发送电子邮件)
def sendEmail(errormsg: String): Unit = ???
val cdrs: Source[Errors, Consumer.Control] =
kafkaMessages.map { msg =>
val bytes: Array[Byte] = msg.record.value()
val errormsg = (bytes.map(_.toChar)).mkString
sendEmail(errormsg) // call function that sends email
new Errors(1235, "filename", "cdr", "cdr_type", 0, errormsg)
}
解决方案 2:如果您想在下游阶段更复杂地使用 errormsg
,您需要 return 来自地图阶段的元组:
val kafkaMessages: Source[ConsumerMessage.CommittableMessage[String, Array[Byte]], Consumer.Control] =
Consumer.committableSource(consumerSettings, Subscriptions.topics(conf.getString("kafka.topics.errorRawCdr")))
val cdrs: Source[Errors, Consumer.Control] =
kafkaMessages.map { msg =>
val bytes: Array[Byte] = msg.record.value()
val errormsg = (bytes.map(_.toChar)).mkString
(new Errors(1235, "filename", "cdr", "cdr_type", 0, errormsg), errormsg) // we are returning a tuple so type of downstream elements will be (Errors, String)
}.map { case i@(errors, errormsg) =>
sendEmail(errormsg)
i
}.map { tuple =>
...
}.map(_._1) // as we dont need a tuple any more we can get original element and continue processing of it
cdrs.to(new ErrorKuduSink(session, table)).run()
解决方案 3:如果您想要更复杂的处理(例如在一封电子邮件中批量处理多个错误消息),您可能需要创建一个 RunnableGraph
val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
val in = Consumer.committableSource(consumerSettings, Subscriptions.topics(conf.getString("kafka.topics.errorRawCdr")))
.map { msg =>
val bytes: Array[Byte] = msg.record.value()
val errormsg = (bytes.map(_.toChar)).mkString
(new Errors(1235, "filename", "cdr", "cdr_type", 0, errormsg), errormsg)
}
val kuduout = new ErrorKuduSink(session, table)
val emailout = Sink.foreach[Seq[String]] { errormsgs =>
sendEmail(errormsgs)
}
val f1 = Flow[(Errors, String)]
.map(_._1) // take errors
val f2 = Flow[(Errors, String)]
.map(_._2) // take errormsgs
.groupedWithin(100, 1.hour)
val bcast = builder.add(Broadcast[Int](2))
in ~> bcast
bcast ~> f1 ~> kuduout
bcast ~> f2 ~> emailout
ClosedShape
})
以下是我为流式应用程序制作的错误处理程序的 Scala 代码片段。它使用 akka 流来消费 Kafka 主题中的消息 ('errormsg'),并将它们写入 Kudu 中的 table。
val kafkaMessages: Source[ConsumerMessage.CommittableMessage[String, Array[Byte]], Consumer.Control] = Consumer.committableSource(
consumerSettings,
Subscriptions.topics(conf.getString("kafka.topics.errorRawCdr")))
val cdrs: Source[Errors, Consumer.Control] = kafkaMessages.map(msg => {
val bytes: Array[Byte] = msg.record.value()
val errormsg = (bytes.map(_.toChar)).mkString
new Errors(1235, "filename", "cdr", "cdr_type", 0, errormsg)
})
cdrs.to(new ErrorKuduSink(session, table)).run()
我想在下方重新使用变量“errormsg”,作为通过电子邮件向我发送该消息的几行的一部分。
如何转义“errormsg”(或者合并下面的代码片段),以便变量范围合适?
send a new Mail (
from = ("errorhandler@domain.com"),
to = "myemailadres@domain.com",
subject = "Encountered error",
message = errormsg
)
在这里,我建议使用 MutableList:
来轻松解决您的问题val kafkaMessages: Source[ConsumerMessage.CommittableMessage[String, Array[Byte]], Consumer.Control] = Consumer.committableSource(
consumerSettings,
Subscriptions.topics(conf.getString("kafka.topics.errorRawCdr")))
import scala.collection.mutable._
val errorMessages: MutableList[String] = new MutableList
val cdrs: Source[Errors, Consumer.Control] = kafkaMessages.map(msg => {
val bytes: Array[Byte] = msg.record.value()
val errormsg = (bytes.map(_.toChar)).mkString
errorMessages += errormsg
new Errors(1235, "filename", "cdr", "cdr_type", 0, errormsg)
})
cdrs.to(new ErrorKuduSink(session, table)).run()
解决方案 1:直接在您的地图方法中发送电子邮件(将在每条 kafka 消息上发送电子邮件)
def sendEmail(errormsg: String): Unit = ???
val cdrs: Source[Errors, Consumer.Control] =
kafkaMessages.map { msg =>
val bytes: Array[Byte] = msg.record.value()
val errormsg = (bytes.map(_.toChar)).mkString
sendEmail(errormsg) // call function that sends email
new Errors(1235, "filename", "cdr", "cdr_type", 0, errormsg)
}
解决方案 2:如果您想在下游阶段更复杂地使用 errormsg
,您需要 return 来自地图阶段的元组:
val kafkaMessages: Source[ConsumerMessage.CommittableMessage[String, Array[Byte]], Consumer.Control] =
Consumer.committableSource(consumerSettings, Subscriptions.topics(conf.getString("kafka.topics.errorRawCdr")))
val cdrs: Source[Errors, Consumer.Control] =
kafkaMessages.map { msg =>
val bytes: Array[Byte] = msg.record.value()
val errormsg = (bytes.map(_.toChar)).mkString
(new Errors(1235, "filename", "cdr", "cdr_type", 0, errormsg), errormsg) // we are returning a tuple so type of downstream elements will be (Errors, String)
}.map { case i@(errors, errormsg) =>
sendEmail(errormsg)
i
}.map { tuple =>
...
}.map(_._1) // as we dont need a tuple any more we can get original element and continue processing of it
cdrs.to(new ErrorKuduSink(session, table)).run()
解决方案 3:如果您想要更复杂的处理(例如在一封电子邮件中批量处理多个错误消息),您可能需要创建一个 RunnableGraph
val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
val in = Consumer.committableSource(consumerSettings, Subscriptions.topics(conf.getString("kafka.topics.errorRawCdr")))
.map { msg =>
val bytes: Array[Byte] = msg.record.value()
val errormsg = (bytes.map(_.toChar)).mkString
(new Errors(1235, "filename", "cdr", "cdr_type", 0, errormsg), errormsg)
}
val kuduout = new ErrorKuduSink(session, table)
val emailout = Sink.foreach[Seq[String]] { errormsgs =>
sendEmail(errormsgs)
}
val f1 = Flow[(Errors, String)]
.map(_._1) // take errors
val f2 = Flow[(Errors, String)]
.map(_._2) // take errormsgs
.groupedWithin(100, 1.hour)
val bcast = builder.add(Broadcast[Int](2))
in ~> bcast
bcast ~> f1 ~> kuduout
bcast ~> f2 ~> emailout
ClosedShape
})