是否可以使用 Apache Flink 处理 RabbitMQ 中的中毒消息?

Is it possible to handle poisoned messages in RabbitMQ with Apache Flink?

我正在创建一个简单的 Apache Flink 作业(使用 Scala),它只尝试打印代表 RabbitMQ 队列接收的事件的案例 class (RMQSource)

我创建了自己的反序列化模式(使用 Jackson),当消费的消息实际上是 JSON 代表案例 class 时,它工作正常。但是如果队列接收到格式错误的事件(我猜我们可以称之为“中毒消息”),作业就会失败并不断重新启动。我必须清除队列,然后作业状态更改为 'running'。

问题:

如何防止收到中毒邮件时作业失败?我可以在使用之前验证消息吗?如果我可以在 Rabbit 中设置死信交换,我应该在哪里(如果可能的话)代表 Apache Flink 作为消费者进行否定确认?有更好的方法来处理这个问题并让作业 运行 消耗下一个格式正确的消息吗?


我的自定义 DeserializationSchema 提供给 RMQSource[测试]:

class eventSerializationSchema extends  DeserializationSchema[Test] {

  @throws(classOf[IOException])
  def deserialize(message: Array[Byte]): Test =  objectMapper.readValue(message, classOf[Test])

  def isEndOfStream(nextElement: Test): Boolean = false

  def getProducedType: TypeInformation[Test] = createTypeInformation[Test]
}

object eventSerializationSchema{

  val objectMapper: ObjectMapper = new ObjectMapper().registerModule(DefaultScalaModule)

}

中毒消息到达消费队列时出现错误:

com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'a': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (byte[])"a"; line: 1, column: 2]
    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2337)
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:720)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3593)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2688)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:870)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:762)
    at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4684)
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4586)
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3609)
    at org.angoglez.deserializers.eventSerializationSchema.deserialize(eventSerializationSchema.scala:17)
    at org.angoglez.deserializers.eventSerializationSchema.deserialize(eventSerializationSchema.scala:14)
    at org.apache.flink.streaming.connectors.rabbitmq.RMQDeserializationSchemaWrapper.deserialize(RMQDeserializationSchemaWrapper.java:47)
    at org.apache.flink.streaming.connectors.rabbitmq.RMQSource.processMessage(RMQSource.java:319)
    at org.apache.flink.streaming.connectors.rabbitmq.RMQSource.run(RMQSource.java:331)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:116)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:73)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:323)

在我看来,您有以下几种选择:

  • 捕获在反序列化方法中抛出的异常,并删除有害记录。

  • 捕获在反序列化方法中抛出的异常,并以某种方式将您想知道的有关这些有害记录的信息编码到正在生成的对象中。然后在一个下游的处理函数中,过滤掉这些毒记录,并发送到一个侧输出。

  • 不要在反序列化器中应用ObjectMapper,而是在链式处理函数中进行真正的反序列化,可以直接将有害记录发送到侧输出。