我将在 Flink - scala 中获得支持
Avro support in Flink - scala
如何从 scala
中的 Flink
读取 avro
?
batch/stream/table是否相同:StreamExecutionEnvironment
/ExecutionEnvironment
/TableEnvironment
?
会不会像这样:val custTS: TableSource = new AvroInputFormat("/path/to/file", ...)
下面是 java avro 实现参考(connectors),但在任何地方都找不到 scala 参考:
AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class);
DataSet<User> usersDS = env.createInput(users);
您可以使用 Flink 的 InputFormats
,包括 AvroInputFormat
,来自 Java 以及 Scala API:
- 流式处理和批处理:
val avroInputStream = env.createInput(new AvroInputFormat[User](in, classOf[User]))
- Table API:
tableEnv.registerTable("table", avroInputStream.toTable)
如何从 scala
中的 Flink
读取 avro
?
batch/stream/table是否相同:StreamExecutionEnvironment
/ExecutionEnvironment
/TableEnvironment
?
会不会像这样:val custTS: TableSource = new AvroInputFormat("/path/to/file", ...)
下面是 java avro 实现参考(connectors),但在任何地方都找不到 scala 参考:
AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class);
DataSet<User> usersDS = env.createInput(users);
您可以使用 Flink 的 InputFormats
,包括 AvroInputFormat
,来自 Java 以及 Scala API:
- 流式处理和批处理:
val avroInputStream = env.createInput(new AvroInputFormat[User](in, classOf[User]))
- Table API:
tableEnv.registerTable("table", avroInputStream.toTable)