Flink 是否弃用了 JSONDeserializationSchema()?

is JSONDeserializationSchema() deprecated in Flink?

我是 Flink 的新手,正在做一些与下面非常相似的事情 link。

我也在尝试添加 JSONDeserializationSchema() 作为我的 Kafka 输入 JSON 没有密钥的消息的反序列化器。

但我发现 JSONDeserializationSchema() 不存在。

如果我做错了什么,请告诉我。

JSONDeserializationSchema 已在 Flink 1.8 中删除,此前已被弃用。

推荐的方法是编写一个实现 DeserializationSchema<T> 的反序列化器。这是我从 Flink Operations Playground:

复制的示例
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;

import java.io.IOException;

/**
 * A Kafka {@link DeserializationSchema} to deserialize {@link ClickEvent}s from JSON.
 *
 */
public class ClickEventDeserializationSchema implements DeserializationSchema<ClickEvent> {

    private static final long serialVersionUID = 1L;

    private static final ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public ClickEvent deserialize(byte[] message) throws IOException {
        return objectMapper.readValue(message, ClickEvent.class);
    }

    @Override
    public boolean isEndOfStream(ClickEvent nextElement) {
        return false;
    }

    @Override
    public TypeInformation<ClickEvent> getProducedType() {
        return TypeInformation.of(ClickEvent.class);
    }
}

对于 Kafka 生产者,您需要实施 KafkaSerializationSchema<T>,您会在同一个项目中找到相关示例。

为了解决从 Kafka 读取非键 JSON 消息的问题,我使用了 case class 和 JSON 解析器。

下面的代码生成一个案例 class 并使用 play API 解析 JSON 字段。

import play.api.libs.json.JsValue

object CustomerModel {

  def readElement(jsonElement: JsValue): Customer = {
    val id = (jsonElement \ "id").get.toString().toInt
    val name = (jsonElement \ "name").get.toString()
    Customer(id,name)
  }
case class Customer(id: Int, name: String)
}

def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties()
properties.setProperty("bootstrap.servers", "xxx.xxx.0.114:9092")
properties.setProperty("group.id", "test-grp")

val consumer = new FlinkKafkaConsumer[String]("customer", new SimpleStringSchema(), properties)
val stream1 = env.addSource(consumer).rebalance

val stream2:DataStream[Customer]= stream1.map( str =>{Try(CustomerModel.readElement(Json.parse(str))).getOrElse(Customer(0,Try(CustomerModel.readElement(Json.parse(str))).toString))
    })

stream2.print("stream2")
env.execute("This is Kafka+Flink")

}

Try方法让你克服解析数据时抛出的异常 并且 return 是其中一个字段中的异常(如果我们想要的话),否则它可以 return 具有任何给定或默认字段的案例 class 对象。

代码的示例输出是:

stream2:1> Customer(1,"Thanh")
stream2:1> Customer(5,"Huy")
stream2:3> Customer(0,Failure(com.fasterxml.jackson.databind.JsonMappingException: No content to map due to end-of-input
 at [Source: ; line: 1, column: 0]))

我不确定这是否是最好的方法,但它目前对我有用。