Apache Flink中NodeObject的数据拆分方法
How to split the data of NodeObject in Apache Flink
我正在使用 Flink 处理来自某些数据源(例如 Kafka、Pravega 等)的数据。
在我的例子中,数据源是 Pravega,它为我提供了一个 flink 连接器。
我的数据源正在向我发送一些 JSON 数据,如下所示:
{"key": "value"}
{"key": "value2"}
{"key": "value3"}
...
...
这是我的一段代码:
PravegaDeserializationSchema<ObjectNode> adapter = new PravegaDeserializationSchema<>(ObjectNode.class, new JavaSerializer<>());
FlinkPravegaReader<ObjectNode> source = FlinkPravegaReader.<ObjectNode>builder()
.withPravegaConfig(pravegaConfig)
.forStream(stream)
.withDeserializationSchema(adapter)
.build();
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<ObjectNode> dataStream = env.addSource(source).name("Pravega Stream");
dataStream.map(new MapFunction<ObjectNode, String>() {
@Override
public String map(ObjectNode node) throws Exception {
return node.toString();
}
})
.keyBy("word") // ERROR
.timeWindow(Time.seconds(10))
.sum("count");
如您所见,我使用 FlinkPravegaReader
和适当的反序列化器来获取来自 Pravega 的 JSON 流。
然后我尝试将 JSON 数据转换成字符串,KeyBy
并计算它们。
但是,我得到一个错误:
The program finished with the following exception:
Field expression must be equal to '*' or '_' for non-composite types.
org.apache.flink.api.common.operators.Keys$ExpressionKeys.<init>(Keys.java:342)
org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:340)
myflink.StreamingJob.main(StreamingJob.java:114)
似乎 KeyBy
抛出了这个异常。
嗯,我不是Flink专家,所以我不知道为什么。看过官方例子的源码WordCount
。在该示例中,有一个自定义拆分器,用于将字符串数据拆分为单词。
所以我在想在这种情况下是否也需要使用某种分离器?如果是这样,我应该使用哪种分离器?你能举个例子吗?如果不是,为什么会出现这样的错误,如何解决?
我猜你已经阅读了关于如何指定键的文档
示例代码使用 keyby("word")
因为 word
是 POJO 类型的字段 WC
.
// some ordinary POJO (Plain old Java Object)
public class WC {
public String word;
public int count;
}
DataStream<WC> words = // [...]
DataStream<WC> wordCounts = words.keyBy("word").window(/*window specification*/);
在你的例子中,你在 keyBy
之前放置了一个 map
运算符,这个 map
运算符的输出是一个 string
。因此,您的情况显然没有 word
字段。如果你真的想把这个 string
流分组,你需要这样写 .keyBy(String::toString)
或者您甚至可以实施自定义 keySelector
来生成您自己的 key
。
我正在使用 Flink 处理来自某些数据源(例如 Kafka、Pravega 等)的数据。
在我的例子中,数据源是 Pravega,它为我提供了一个 flink 连接器。
我的数据源正在向我发送一些 JSON 数据,如下所示:
{"key": "value"}
{"key": "value2"}
{"key": "value3"}
...
...
这是我的一段代码:
PravegaDeserializationSchema<ObjectNode> adapter = new PravegaDeserializationSchema<>(ObjectNode.class, new JavaSerializer<>());
FlinkPravegaReader<ObjectNode> source = FlinkPravegaReader.<ObjectNode>builder()
.withPravegaConfig(pravegaConfig)
.forStream(stream)
.withDeserializationSchema(adapter)
.build();
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<ObjectNode> dataStream = env.addSource(source).name("Pravega Stream");
dataStream.map(new MapFunction<ObjectNode, String>() {
@Override
public String map(ObjectNode node) throws Exception {
return node.toString();
}
})
.keyBy("word") // ERROR
.timeWindow(Time.seconds(10))
.sum("count");
如您所见,我使用 FlinkPravegaReader
和适当的反序列化器来获取来自 Pravega 的 JSON 流。
然后我尝试将 JSON 数据转换成字符串,KeyBy
并计算它们。
但是,我得到一个错误:
The program finished with the following exception:
Field expression must be equal to '*' or '_' for non-composite types.
org.apache.flink.api.common.operators.Keys$ExpressionKeys.<init>(Keys.java:342)
org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:340)
myflink.StreamingJob.main(StreamingJob.java:114)
似乎 KeyBy
抛出了这个异常。
嗯,我不是Flink专家,所以我不知道为什么。看过官方例子的源码WordCount
。在该示例中,有一个自定义拆分器,用于将字符串数据拆分为单词。
所以我在想在这种情况下是否也需要使用某种分离器?如果是这样,我应该使用哪种分离器?你能举个例子吗?如果不是,为什么会出现这样的错误,如何解决?
我猜你已经阅读了关于如何指定键的文档
示例代码使用 keyby("word")
因为 word
是 POJO 类型的字段 WC
.
// some ordinary POJO (Plain old Java Object)
public class WC {
public String word;
public int count;
}
DataStream<WC> words = // [...]
DataStream<WC> wordCounts = words.keyBy("word").window(/*window specification*/);
在你的例子中,你在 keyBy
之前放置了一个 map
运算符,这个 map
运算符的输出是一个 string
。因此,您的情况显然没有 word
字段。如果你真的想把这个 string
流分组,你需要这样写 .keyBy(String::toString)
或者您甚至可以实施自定义 keySelector
来生成您自己的 key
。