尝试将 Kafka 与 Flink 集成时出现 NoClassDefFoundError
NoClassDefFoundError when trying to integrate Kafka with Flink
我正在尝试将 kafka
和 flink
结合起来。这个想法是使用 kafka 队列并使用 flink 转换数据。我正在关注下面提到的示例
https://github.com/abhishek-ch/evolveML/blob/master/flink/kafka-flink-example/pom.xml
这些是我的依赖。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-core</artifactId>
<version>0.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.8_2.11</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>1.0.0</version>
</dependency>
另外,我在项目中包含了 kafka 和 flink 类,如下所示。
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>3.0.2</version>
<executions>
<execution>
<id>unpack</id>
<!-- executed just before the package phase -->
<phase>prepare-package</phase>
<goals>
<goal>unpack</goal>
</goals>
<configuration>
<artifactItems>
<!-- For Flink connector classes -->
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.8_2.11</artifactId>
<version>1.3.2</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>org/apache/flink/**</includes>
</artifactItem>
<!-- For Kafka API classes -->
<artifactItem>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>1.0.0</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>kafka/**</includes>
</artifactItem>
</artifactItems>
</configuration>
</execution>
</executions>
</plugin>
我的 Java 使用 kafka 队列的代码是
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Map<String, String> map = new HashMap<>();
map.put("bootstrap.servers", kafka_server);
map.put("zookeeper.connect", "localhost:40862");
map.put("group.id", "test");
map.put("topic", "data");
// parse user parameters
ParameterTool parameterTool = ParameterTool.fromMap(map);
DataStream<String> messageStream = null;
try {
messageStream = env.addSource(new org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082<>(
parameterTool.getRequired("topic"),
new SimpleStringSchema(),
parameterTool.getProperties()));
} catch (Exception e) {
LOGGER.error("Error", e);
}
// print() will write the contents of the stream to the TaskManager's standard out stream
// the rebelance call is causing a repartitioning of the data so that all machines
// see the messages (for example in cases when "num kafka partitions" < "num flink operators"
messageStream.rebalance().map(new MapFunction<String, String>() {
private static final long serialVersionUID = -6867736771747690202L;
@Override
public String map(String value) throws Exception {
LOGGER.info("============================" + value);
return "Kafka and Flink says: " + value;
}
}).print();
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
此代码示例来自我之前提到的 github 项目。此代码在部署在 tomcat 中的 war 文件中运行。
当 运行 这段代码出现以下错误。
Unrecoverable error java.util.concurrent.CompletionException: java.lang.NoClassDefFoundError: org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082
我在 war 摘录中提到了 类。我想弄清楚如何解决这个问题。非常感谢任何帮助或建议。
A NoClassDefFoundError
经常提示版本/依赖问题,事实上你的依赖有点混乱。
您正在从 1.3.2(当前版本)和 0.9.1(相当古老的版本)导入 Flink 依赖项。 Flink Kafka 连接器适用于 Kafka 0.8,但您引入了 Kafka 1.0.0 依赖项。
您应该修改 flink-streaming-core 依赖项,改为依赖 flink-streaming-java_2.11,版本 1.3.2。 (几年前,flink-streaming-core 更名为 flink-streaming-java 和 flink-streaming-scala。)
此外,flink-connector-kafka-0.8_2.11 适用于 Kafka 版本 0.8.x,而您将其与 Kafka 版本 1.0.0 结合使用。我建议你删除 kafka_2.11 依赖项并依靠 maven 传递包含正确版本的 kafka jar。
我正在尝试将 kafka
和 flink
结合起来。这个想法是使用 kafka 队列并使用 flink 转换数据。我正在关注下面提到的示例
https://github.com/abhishek-ch/evolveML/blob/master/flink/kafka-flink-example/pom.xml
这些是我的依赖。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-core</artifactId>
<version>0.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.8_2.11</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>1.0.0</version>
</dependency>
另外,我在项目中包含了 kafka 和 flink 类,如下所示。
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>3.0.2</version>
<executions>
<execution>
<id>unpack</id>
<!-- executed just before the package phase -->
<phase>prepare-package</phase>
<goals>
<goal>unpack</goal>
</goals>
<configuration>
<artifactItems>
<!-- For Flink connector classes -->
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.8_2.11</artifactId>
<version>1.3.2</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>org/apache/flink/**</includes>
</artifactItem>
<!-- For Kafka API classes -->
<artifactItem>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>1.0.0</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>kafka/**</includes>
</artifactItem>
</artifactItems>
</configuration>
</execution>
</executions>
</plugin>
我的 Java 使用 kafka 队列的代码是
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Map<String, String> map = new HashMap<>();
map.put("bootstrap.servers", kafka_server);
map.put("zookeeper.connect", "localhost:40862");
map.put("group.id", "test");
map.put("topic", "data");
// parse user parameters
ParameterTool parameterTool = ParameterTool.fromMap(map);
DataStream<String> messageStream = null;
try {
messageStream = env.addSource(new org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082<>(
parameterTool.getRequired("topic"),
new SimpleStringSchema(),
parameterTool.getProperties()));
} catch (Exception e) {
LOGGER.error("Error", e);
}
// print() will write the contents of the stream to the TaskManager's standard out stream
// the rebelance call is causing a repartitioning of the data so that all machines
// see the messages (for example in cases when "num kafka partitions" < "num flink operators"
messageStream.rebalance().map(new MapFunction<String, String>() {
private static final long serialVersionUID = -6867736771747690202L;
@Override
public String map(String value) throws Exception {
LOGGER.info("============================" + value);
return "Kafka and Flink says: " + value;
}
}).print();
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
此代码示例来自我之前提到的 github 项目。此代码在部署在 tomcat 中的 war 文件中运行。
当 运行 这段代码出现以下错误。
Unrecoverable error java.util.concurrent.CompletionException: java.lang.NoClassDefFoundError: org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082
我在 war 摘录中提到了 类。我想弄清楚如何解决这个问题。非常感谢任何帮助或建议。
A NoClassDefFoundError
经常提示版本/依赖问题,事实上你的依赖有点混乱。
您正在从 1.3.2(当前版本)和 0.9.1(相当古老的版本)导入 Flink 依赖项。 Flink Kafka 连接器适用于 Kafka 0.8,但您引入了 Kafka 1.0.0 依赖项。
您应该修改 flink-streaming-core 依赖项,改为依赖 flink-streaming-java_2.11,版本 1.3.2。 (几年前,flink-streaming-core 更名为 flink-streaming-java 和 flink-streaming-scala。)
此外,flink-connector-kafka-0.8_2.11 适用于 Kafka 版本 0.8.x,而您将其与 Kafka 版本 1.0.0 结合使用。我建议你删除 kafka_2.11 依赖项并依靠 maven 传递包含正确版本的 kafka jar。