部署到集群时 flatmap 函数异常
Exception in flatmap function when deploying to cluster
我有一个 flink-ignite 应用程序。我从 kafka 接收消息并处理消息,然后缓存以点燃。当我 运行 在 ide(intellij) 和独立 jar 中编程时没有问题但是当我部署到集群时我得到了这个异常(我在代码的前面创建了 table .).提前致谢。
请注意,连接变量在我的 main class.
中是静态的
Caused by: java.lang.NullPointerException
at altosis.flinkcompute.compute.Main.flatMap(Main.java:95)
at altosis.flinkcompute.compute.Main.flatMap(Main.java:85)
at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
... 22 more
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.getConfig();
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
environment.setParallelism(1);
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id","event-group");
FlinkKafkaConsumer<EventSalesQuantity> consumer = new FlinkKafkaConsumer<EventSalesQuantity>("EventTopic",new EventSerializationSchema(),props);
DataStream<EventSalesQuantity> eventDataStream = environment.addSource(consumer);
KeyedStream<EventSalesQuantity, String> keyedEventStream = eventDataStream.assignTimestampsAndWatermarks(
new AssignerWithPeriodicWatermarksImpl()
).
keyBy(new KeySelector<EventSalesQuantity, String>() {
@Override
public String getKey(EventSalesQuantity eventSalesQuantity) throws Exception {
return eventSalesQuantity.getDealer();
}
});
DataStream<Tuple2<EventSalesQuantity,Integer>> eventSinkStream = keyedEventStream.window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.DAYS),Time.hours(21))).aggregate(new AggregateImpl());
ignite = Ignition.start();
ClientConfiguration cfg = new ClientConfiguration().setAddresses("127.0.0.1:10800");
igniteClient = Ignition.startClient(cfg);
System.out.println(">>> Thin client put-get example started.");
igniteClient.query(
new SqlFieldsQuery(String.format(
"CREATE TABLE IF NOT EXISTS Eventcache (eventtime VARCHAR PRIMARY KEY, bayi VARCHAR, sales INT ) WITH \"VALUE_TYPE=%s\"",
EventSalesQuantity.class.getName()
)).setSchema("PUBLIC")
).getAll();
eventSinkStream.addSink(new FlinkKafkaProducer<Tuple2<EventSalesQuantity, Integer>>("localhost:9092","SinkEventTopic",new EventSinkSerializationSchema()));
Class.forName("org.apache.ignite.IgniteJdbcThinDriver");
conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1/");
eventSinkStream.flatMap(new FlatMapFunction<Tuple2<EventSalesQuantity, Integer>, Object>() {
@Override
public void flatMap(Tuple2<EventSalesQuantity, Integer> eventSalesQuantityIntegerTuple2, Collector<Object> collector) throws Exception {
Ignsql= conn.prepareStatement(
"INSERT INTO Eventcache (eventtime, bayi, sales) VALUES (?, ?, ?)");
Ignsql.setString(1, eventSalesQuantityIntegerTuple2.f0.getTransactionDate());
Ignsql.setString(2, eventSalesQuantityIntegerTuple2.f0.getDealer());
Ignsql.setInt(3, eventSalesQuantityIntegerTuple2.f1);
Ignsql.execute();
Ignsql.close();
}
});
// eventSinkStream.print();
environment.execute();```
我假设当您说 "Note that connection variables are static in my main class" 时,您指的是 Ignsql
。如果是这样,那么您的代码将无法工作,因为该变量不可用于您的地图函数,该函数在工作流实际开始之前由 JobManager 序列化和分发 运行.
您应该创建一个 RichFlatMapFunction class,并在 open()
方法中设置您需要的连接变量,然后在 close()
方法中关闭它们.如果您有设置连接变量所需的配置参数,您可以将它们传递给 RichFlatMapFunction 的构造函数并将它们保存在 (non-transient) 变量中,然后在 open()
方法中使用它们。
我有一个 flink-ignite 应用程序。我从 kafka 接收消息并处理消息,然后缓存以点燃。当我 运行 在 ide(intellij) 和独立 jar 中编程时没有问题但是当我部署到集群时我得到了这个异常(我在代码的前面创建了 table .).提前致谢。 请注意,连接变量在我的 main class.
中是静态的 Caused by: java.lang.NullPointerException
at altosis.flinkcompute.compute.Main.flatMap(Main.java:95)
at altosis.flinkcompute.compute.Main.flatMap(Main.java:85)
at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
... 22 more
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.getConfig();
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
environment.setParallelism(1);
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id","event-group");
FlinkKafkaConsumer<EventSalesQuantity> consumer = new FlinkKafkaConsumer<EventSalesQuantity>("EventTopic",new EventSerializationSchema(),props);
DataStream<EventSalesQuantity> eventDataStream = environment.addSource(consumer);
KeyedStream<EventSalesQuantity, String> keyedEventStream = eventDataStream.assignTimestampsAndWatermarks(
new AssignerWithPeriodicWatermarksImpl()
).
keyBy(new KeySelector<EventSalesQuantity, String>() {
@Override
public String getKey(EventSalesQuantity eventSalesQuantity) throws Exception {
return eventSalesQuantity.getDealer();
}
});
DataStream<Tuple2<EventSalesQuantity,Integer>> eventSinkStream = keyedEventStream.window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.DAYS),Time.hours(21))).aggregate(new AggregateImpl());
ignite = Ignition.start();
ClientConfiguration cfg = new ClientConfiguration().setAddresses("127.0.0.1:10800");
igniteClient = Ignition.startClient(cfg);
System.out.println(">>> Thin client put-get example started.");
igniteClient.query(
new SqlFieldsQuery(String.format(
"CREATE TABLE IF NOT EXISTS Eventcache (eventtime VARCHAR PRIMARY KEY, bayi VARCHAR, sales INT ) WITH \"VALUE_TYPE=%s\"",
EventSalesQuantity.class.getName()
)).setSchema("PUBLIC")
).getAll();
eventSinkStream.addSink(new FlinkKafkaProducer<Tuple2<EventSalesQuantity, Integer>>("localhost:9092","SinkEventTopic",new EventSinkSerializationSchema()));
Class.forName("org.apache.ignite.IgniteJdbcThinDriver");
conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1/");
eventSinkStream.flatMap(new FlatMapFunction<Tuple2<EventSalesQuantity, Integer>, Object>() {
@Override
public void flatMap(Tuple2<EventSalesQuantity, Integer> eventSalesQuantityIntegerTuple2, Collector<Object> collector) throws Exception {
Ignsql= conn.prepareStatement(
"INSERT INTO Eventcache (eventtime, bayi, sales) VALUES (?, ?, ?)");
Ignsql.setString(1, eventSalesQuantityIntegerTuple2.f0.getTransactionDate());
Ignsql.setString(2, eventSalesQuantityIntegerTuple2.f0.getDealer());
Ignsql.setInt(3, eventSalesQuantityIntegerTuple2.f1);
Ignsql.execute();
Ignsql.close();
}
});
// eventSinkStream.print();
environment.execute();```
我假设当您说 "Note that connection variables are static in my main class" 时,您指的是 Ignsql
。如果是这样,那么您的代码将无法工作,因为该变量不可用于您的地图函数,该函数在工作流实际开始之前由 JobManager 序列化和分发 运行.
您应该创建一个 RichFlatMapFunction class,并在 open()
方法中设置您需要的连接变量,然后在 close()
方法中关闭它们.如果您有设置连接变量所需的配置参数,您可以将它们传递给 RichFlatMapFunction 的构造函数并将它们保存在 (non-transient) 变量中,然后在 open()
方法中使用它们。