Apache Flink:"Class X does not contain a setter for field Y" 是什么意思?
Apache Flink: What does "Class X does not contain a setter for field Y" mean?
我第一次使用 flink (1.6, 1.7) 并使用来自 https://www.gharchive.org/ 的 github 存档的数据,但将该数据用作流数据源。
我的简单示例只是计算每天每个用户的所有事件 window,我正在尝试复制相同的示例,但改用 TableEnvironment 和 SQL 支持。
但是,我遇到了以下错误:
class org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit 不包含 setter 字段修改时间 如下:
8-12-04 14:17:02:115 INFO main exploration.StreamingTableApp:32 - Starting Streaming Table Flink App Example...
18-12-04 14:17:02:174 INFO main typeutils.TypeExtractor:1818 - class org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit does not contain a setter for field modificationTime
18-12-04 14:17:02:176 INFO main typeutils.TypeExtractor:1857 - Class class org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
18-12-04 14:17:02:937 INFO main exploration.StreamingTableApp:74 - Finished...
我正在读取 CSV 源作为数据流并使用 Gson 解析出 json 行的位并将这些属性映射到元组。
有人对此有任何想法/经验吗?
主要方法:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// Mapped in docker compose file too.
DataStreamSource<String> input = env.readTextFile("/some/path/github/");
// Setup the stream
DataStream<Tuple4<String, Integer, String, Long>> stream = input.map(new GithubTupleConverter())
.assignTimestampsAndWatermarks(new TupleTimestampExtractor());
StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
Table tableFromStream = tEnv.fromDataStream(stream, "user_id, kount, basic_date,event_date");
元组时间戳提取器
public class TupleTimestampExtractor
extends BoundedOutOfOrdernessTimestampExtractor<Tuple4<String, Integer, String, Long>> {
private static final long serialVersionUID = 3737675541965835242L;
public TupleTimestampExtractor() {
super(Time.seconds(30L));
}
@Override
public long extractTimestamp(Tuple4<String, Integer, String, Long> element) {
return element.getField(3);
}
}
GithubTupleConverter.java
public class GithubTupleConverter implements MapFunction<String, Tuple4<String, Integer, String, Long>> {
private static final Gson g = new Gson();
@Override
public Tuple4<String, Integer, String, Long> map(String value) throws Exception {
// Take each line as Json.
JsonObject o = g.fromJson(value, JsonObject.class);
// Extract the user id
String userId = o.get("actor").getAsJsonObject().get("login").getAsString();
// Extract the event type (commit, pull request, fork event)
String type = o.get("type").getAsString();
// Get the event date time
String dateTime = o.get("created_at").getAsString();
// Parse date string to Typed type.
LocalDateTime eventTime = LocalDateTime.parse(dateTime, DateTimeFormatter.ISO_DATE_TIME);
// Format the date so it can be used in the output.
DateTimeFormatter formatter = DateTimeFormatter.ISO_DATE;
return Tuple4.of(userId, 1, formatter.format(eventTime), eventTime.toInstant(ZoneOffset.UTC).toEpochMilli());
}
}
您共享的日志没有显示错误。日志处于 INFO
级别并且没有抛出异常(至少在提供的日志中没有)。
日志条目只是说 class TimestampedFileInputSplit
不能被视为 POJO。一般来说,此消息表明性能不是最佳的,但在这种特殊情况下,这不是问题。
您是否收到任何其他错误消息?
我第一次使用 flink (1.6, 1.7) 并使用来自 https://www.gharchive.org/ 的 github 存档的数据,但将该数据用作流数据源。
我的简单示例只是计算每天每个用户的所有事件 window,我正在尝试复制相同的示例,但改用 TableEnvironment 和 SQL 支持。
但是,我遇到了以下错误:
class org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit 不包含 setter 字段修改时间 如下:
8-12-04 14:17:02:115 INFO main exploration.StreamingTableApp:32 - Starting Streaming Table Flink App Example...
18-12-04 14:17:02:174 INFO main typeutils.TypeExtractor:1818 - class org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit does not contain a setter for field modificationTime
18-12-04 14:17:02:176 INFO main typeutils.TypeExtractor:1857 - Class class org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
18-12-04 14:17:02:937 INFO main exploration.StreamingTableApp:74 - Finished...
我正在读取 CSV 源作为数据流并使用 Gson 解析出 json 行的位并将这些属性映射到元组。
有人对此有任何想法/经验吗?
主要方法:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// Mapped in docker compose file too.
DataStreamSource<String> input = env.readTextFile("/some/path/github/");
// Setup the stream
DataStream<Tuple4<String, Integer, String, Long>> stream = input.map(new GithubTupleConverter())
.assignTimestampsAndWatermarks(new TupleTimestampExtractor());
StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
Table tableFromStream = tEnv.fromDataStream(stream, "user_id, kount, basic_date,event_date");
元组时间戳提取器
public class TupleTimestampExtractor
extends BoundedOutOfOrdernessTimestampExtractor<Tuple4<String, Integer, String, Long>> {
private static final long serialVersionUID = 3737675541965835242L;
public TupleTimestampExtractor() {
super(Time.seconds(30L));
}
@Override
public long extractTimestamp(Tuple4<String, Integer, String, Long> element) {
return element.getField(3);
}
}
GithubTupleConverter.java
public class GithubTupleConverter implements MapFunction<String, Tuple4<String, Integer, String, Long>> {
private static final Gson g = new Gson();
@Override
public Tuple4<String, Integer, String, Long> map(String value) throws Exception {
// Take each line as Json.
JsonObject o = g.fromJson(value, JsonObject.class);
// Extract the user id
String userId = o.get("actor").getAsJsonObject().get("login").getAsString();
// Extract the event type (commit, pull request, fork event)
String type = o.get("type").getAsString();
// Get the event date time
String dateTime = o.get("created_at").getAsString();
// Parse date string to Typed type.
LocalDateTime eventTime = LocalDateTime.parse(dateTime, DateTimeFormatter.ISO_DATE_TIME);
// Format the date so it can be used in the output.
DateTimeFormatter formatter = DateTimeFormatter.ISO_DATE;
return Tuple4.of(userId, 1, formatter.format(eventTime), eventTime.toInstant(ZoneOffset.UTC).toEpochMilli());
}
}
您共享的日志没有显示错误。日志处于 INFO
级别并且没有抛出异常(至少在提供的日志中没有)。
日志条目只是说 class TimestampedFileInputSplit
不能被视为 POJO。一般来说,此消息表明性能不是最佳的,但在这种特殊情况下,这不是问题。
您是否收到任何其他错误消息?