Flink 是否弃用了 JSONDeserializationSchema()?
is JSONDeserializationSchema() deprecated in Flink?
我是 Flink 的新手,正在做一些与下面非常相似的事情 link。
我也在尝试添加 JSONDeserializationSchema() 作为我的 Kafka 输入 JSON 没有密钥的消息的反序列化器。
但我发现 JSONDeserializationSchema() 不存在。
如果我做错了什么,请告诉我。
JSONDeserializationSchema
已在 Flink 1.8 中删除,此前已被弃用。
推荐的方法是编写一个实现 DeserializationSchema<T>
的反序列化器。这是我从 Flink Operations Playground:
复制的示例
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
/**
* A Kafka {@link DeserializationSchema} to deserialize {@link ClickEvent}s from JSON.
*
*/
public class ClickEventDeserializationSchema implements DeserializationSchema<ClickEvent> {
private static final long serialVersionUID = 1L;
private static final ObjectMapper objectMapper = new ObjectMapper();
@Override
public ClickEvent deserialize(byte[] message) throws IOException {
return objectMapper.readValue(message, ClickEvent.class);
}
@Override
public boolean isEndOfStream(ClickEvent nextElement) {
return false;
}
@Override
public TypeInformation<ClickEvent> getProducedType() {
return TypeInformation.of(ClickEvent.class);
}
}
对于 Kafka 生产者,您需要实施 KafkaSerializationSchema<T>
,您会在同一个项目中找到相关示例。
为了解决从 Kafka 读取非键 JSON 消息的问题,我使用了 case class 和 JSON 解析器。
下面的代码生成一个案例 class 并使用 play API 解析 JSON 字段。
import play.api.libs.json.JsValue
object CustomerModel {
def readElement(jsonElement: JsValue): Customer = {
val id = (jsonElement \ "id").get.toString().toInt
val name = (jsonElement \ "name").get.toString()
Customer(id,name)
}
case class Customer(id: Int, name: String)
}
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties()
properties.setProperty("bootstrap.servers", "xxx.xxx.0.114:9092")
properties.setProperty("group.id", "test-grp")
val consumer = new FlinkKafkaConsumer[String]("customer", new SimpleStringSchema(), properties)
val stream1 = env.addSource(consumer).rebalance
val stream2:DataStream[Customer]= stream1.map( str =>{Try(CustomerModel.readElement(Json.parse(str))).getOrElse(Customer(0,Try(CustomerModel.readElement(Json.parse(str))).toString))
})
stream2.print("stream2")
env.execute("This is Kafka+Flink")
}
Try方法让你克服解析数据时抛出的异常
并且 return 是其中一个字段中的异常(如果我们想要的话),否则它可以 return 具有任何给定或默认字段的案例 class 对象。
代码的示例输出是:
stream2:1> Customer(1,"Thanh")
stream2:1> Customer(5,"Huy")
stream2:3> Customer(0,Failure(com.fasterxml.jackson.databind.JsonMappingException: No content to map due to end-of-input
at [Source: ; line: 1, column: 0]))
我不确定这是否是最好的方法,但它目前对我有用。
我是 Flink 的新手,正在做一些与下面非常相似的事情 link。
我也在尝试添加 JSONDeserializationSchema() 作为我的 Kafka 输入 JSON 没有密钥的消息的反序列化器。
但我发现 JSONDeserializationSchema() 不存在。
如果我做错了什么,请告诉我。
JSONDeserializationSchema
已在 Flink 1.8 中删除,此前已被弃用。
推荐的方法是编写一个实现 DeserializationSchema<T>
的反序列化器。这是我从 Flink Operations Playground:
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
/**
* A Kafka {@link DeserializationSchema} to deserialize {@link ClickEvent}s from JSON.
*
*/
public class ClickEventDeserializationSchema implements DeserializationSchema<ClickEvent> {
private static final long serialVersionUID = 1L;
private static final ObjectMapper objectMapper = new ObjectMapper();
@Override
public ClickEvent deserialize(byte[] message) throws IOException {
return objectMapper.readValue(message, ClickEvent.class);
}
@Override
public boolean isEndOfStream(ClickEvent nextElement) {
return false;
}
@Override
public TypeInformation<ClickEvent> getProducedType() {
return TypeInformation.of(ClickEvent.class);
}
}
对于 Kafka 生产者,您需要实施 KafkaSerializationSchema<T>
,您会在同一个项目中找到相关示例。
为了解决从 Kafka 读取非键 JSON 消息的问题,我使用了 case class 和 JSON 解析器。
下面的代码生成一个案例 class 并使用 play API 解析 JSON 字段。
import play.api.libs.json.JsValue
object CustomerModel {
def readElement(jsonElement: JsValue): Customer = {
val id = (jsonElement \ "id").get.toString().toInt
val name = (jsonElement \ "name").get.toString()
Customer(id,name)
}
case class Customer(id: Int, name: String)
}
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties()
properties.setProperty("bootstrap.servers", "xxx.xxx.0.114:9092")
properties.setProperty("group.id", "test-grp")
val consumer = new FlinkKafkaConsumer[String]("customer", new SimpleStringSchema(), properties)
val stream1 = env.addSource(consumer).rebalance
val stream2:DataStream[Customer]= stream1.map( str =>{Try(CustomerModel.readElement(Json.parse(str))).getOrElse(Customer(0,Try(CustomerModel.readElement(Json.parse(str))).toString))
})
stream2.print("stream2")
env.execute("This is Kafka+Flink")
}
Try方法让你克服解析数据时抛出的异常 并且 return 是其中一个字段中的异常(如果我们想要的话),否则它可以 return 具有任何给定或默认字段的案例 class 对象。
代码的示例输出是:
stream2:1> Customer(1,"Thanh")
stream2:1> Customer(5,"Huy")
stream2:3> Customer(0,Failure(com.fasterxml.jackson.databind.JsonMappingException: No content to map due to end-of-input
at [Source: ; line: 1, column: 0]))
我不确定这是否是最好的方法,但它目前对我有用。