Flink CsvTableSource 流式处理
Flink CsvTableSource Streaming
我想流式传输 csv 文件并使用 flink 执行 sql 操作。但是我写的代码只读了一次就停止了。它不流。提前致谢,
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.getTableEnvironment(env);
CsvTableSource csvtable = CsvTableSource.builder()
.path("D:/employee.csv")
.ignoreFirstLine()
.fieldDelimiter(",")
.field("id", Types.INT())
.field("name", Types.STRING())
.field("designation", Types.STRING())
.field("age", Types.INT())
.field("location", Types.STRING())
.build();
tableEnv.registerTableSource("employee", csvtable);
Table table = tableEnv.scan("employee").where("name='jay'").select("id,name,location");
//Table table1 = tableEnv.scan("employee").where("age > 23").select("id,name,age,location");
DataStream<Row> stream = tableEnv.toAppendStream(table, Row.class);
//DataStream<Row> stream1 = tableEnv.toAppendStream(table1, Row.class);
stream.print();
//stream1.print();
env.execute();
CsvTableSource
基于 FileInputFormat
逐行读取和解析引用文件。结果行被转发到流式查询中。因此,在 CsvTableSource
中,从连续读取和转发行的意义上讲是流式传输。但是,CsvTableSource
在文件末尾终止。因此,它发出有界流。
我假设您期望的行为是 CsvTableSource
读取文件直到文件结束,然后等待向文件追加写入。
但是,这不是 CsvTableSource
的工作方式。您需要为此实现自定义 TableSource
。
我想流式传输 csv 文件并使用 flink 执行 sql 操作。但是我写的代码只读了一次就停止了。它不流。提前致谢,
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.getTableEnvironment(env);
CsvTableSource csvtable = CsvTableSource.builder()
.path("D:/employee.csv")
.ignoreFirstLine()
.fieldDelimiter(",")
.field("id", Types.INT())
.field("name", Types.STRING())
.field("designation", Types.STRING())
.field("age", Types.INT())
.field("location", Types.STRING())
.build();
tableEnv.registerTableSource("employee", csvtable);
Table table = tableEnv.scan("employee").where("name='jay'").select("id,name,location");
//Table table1 = tableEnv.scan("employee").where("age > 23").select("id,name,age,location");
DataStream<Row> stream = tableEnv.toAppendStream(table, Row.class);
//DataStream<Row> stream1 = tableEnv.toAppendStream(table1, Row.class);
stream.print();
//stream1.print();
env.execute();
CsvTableSource
基于 FileInputFormat
逐行读取和解析引用文件。结果行被转发到流式查询中。因此,在 CsvTableSource
中,从连续读取和转发行的意义上讲是流式传输。但是,CsvTableSource
在文件末尾终止。因此,它发出有界流。
我假设您期望的行为是 CsvTableSource
读取文件直到文件结束,然后等待向文件追加写入。
但是,这不是 CsvTableSource
的工作方式。您需要为此实现自定义 TableSource
。