Why does Spark application fail with "Exception in thread "main" java.lang.NoClassDefFoundError: ...StringDeserializer"?

Why does Spark application fail with "Exception in thread "main" java.lang.NoClassDefFoundError: ...StringDeserializer"?

我正在开发一个 Spark 应用程序,它使用 Spark 和 Java 来侦听 Kafka 流。

我用kafka_2.10-0.10.2.1.

我已经为 Kafka 属性设置了各种参数:bootstrap.serverskey.deserializervalue.deserializer

我的应用程序编译正常,但是当我提交它时,它失败并出现以下错误:

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/StringDeserializer

我确实使用 StringDeserializer 作为 key.deserializervalue.deserializer,所以这确实与我编写应用程序的方式有关。

pom.xml中使用的各种maven依赖:

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.1.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.10</artifactId>
        <version>2.1.1</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
        <version>2.1.1</version>
    </dependency>

我已经尝试更新 spark 的版本 streaming/kafka。我在任何地方都找不到太多。

spark-streaming_2.10

这取决于 Scala 2.10

您的其他依赖项正在使用 Scala 2.11

Upgrading the version是当前错误的正确解法。

并确保在 streaming-kafka-0-10 中,这与您 运行

的 Kafka 版本相匹配

Application is compiling fine but when I am trying to submit the spark job, its showing error: Exception in thread "main" java.lang.NoClassDefFoundError:

By default, maven does not include dependency jars when it builds a target

正如您在 中提到的:

Turned out issue was with uber jar not building correctly.

这正是问题所在。它确实与您 assemble 您的 Spark 应用程序的方式有关,我担心您可能选择了 uber jar 方式。在我看来,这是在 assemble 和 spark-submit 时间浪费你的时间。

我个人更喜欢使用 --packages 命令行选项,它负责在需要时删除所有必要的依赖项。

$ ./bin/spark-submit --help
...
  --packages                  Comma-separated list of maven coordinates of jars to include
                              on the driver and executor classpaths. Will search the local
                              maven repo, then maven central and any additional remote
                              repositories given by --repositories. The format for the
                              coordinates should be groupId:artifactId:version.
...

这让您作为 Spark 开发人员的生活变得更轻松,您不再需要等待 maven/sbt 下载依赖项并将它们一起 assemble。它是在 spark-submit 时间完成的(也许这也是其他人的工作!:))

您应该spark-submit如下:

spark-submit --packages org.apache.spark:spark-streaming-kafka-0-10_2.11:2.1.1 ...

这个额外要求的原因是 spark-streaming-kafka-0-10 模块默认不包含在 Spark 的 CLASSPATH 中(因为大多数时候它被认为是不必要的)。通过执行上述 --packages 命令行,您触发加载模块(及其传递依赖项)。

您不应将该模块捆绑在您的 Spark 应用程序的 uber jar 中。

Turned out issue was with uber jar not building correctly. In case you would like to assemble the application and package an uber jar.

src/assembly/assembly.xml

中创建程序集文件
<assembly xmlns="http://maven.apache.org/ASSEMBLY/2.1.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/ASSEMBLY/2.1.0 http://maven.apache.org/xsd/assembly-2.1.0.xsd">
    <id>bin</id>
    <formats>
        <format>jar</format>
    </formats>
    <includeBaseDirectory>false</includeBaseDirectory>
    <dependencySets>
        <dependencySet>
            <unpack>true</unpack>
            <scope>provided</scope>
        </dependencySet>
    </dependencySets>
</assembly>

并将 maven-assembly-plugin 添加到 pom.xml


        <plugins>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.3.0</version>
                <configuration>
                    <descriptors>
                        <descriptor>src/assembly/assembly.xml</descriptor>
                    </descriptors>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>

如果您想向 uber jar 添加依赖项,只需向其添加提供的范围即可。
在你的情况下,它将是这样的:


    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.1.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.10</artifactId>
        <version>2.1.1</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
        <version>2.1.1</version>
        <scope>provided</scope>
    </dependency>

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.10.2.1</version>
        <scope>provided</scope>
    </dependency>
$spark-submit --class Main application-bin.jar

如果您愿意,可以使用 shade 插件生成 fat jar。 Jacek 通过 --packages 方法提出的建议。

<plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

您还可以使用 maven-dependency 插件获取一些依赖项并将其放入您的程序集的 lib 目录中,然后将其提供给 spark。

<plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-dependency-plugin</artifactId>
                <version>3.1.2</version>
                <executions>
                    <execution>
                        <id>copy</id>
                        <phase>initialize</phase>
                        <goals>
                            <goal>copy</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <artifactItems>
                        <artifactItem>
                            <groupId>org.apache.logging.log4j</groupId>
                            <artifactId>log4j-core</artifactId>
                            <version>${log4j2.version}</version>
                            <type>jar</type>
                            <overWrite>true</overWrite>
                            <outputDirectory>${project.build.directory}/log4j-v2-jars</outputDirectory>
                            <destFileName>log4j-v2-core.jar</destFileName>
                        </artifactItem>
                        <artifactItem>
                            <groupId>org.apache.logging.log4j</groupId>
                            <artifactId>log4j-api</artifactId>
                            <version>${log4j2.version}</version>
                            <type>jar</type>
                            <overWrite>true</overWrite>
                            <outputDirectory>${project.build.directory}/log4j-v2-jars</outputDirectory>
                            <destFileName>log4j-v2-api.jar</destFileName>
                        </artifactItem>
                        <artifactItem>
                            <groupId>org.apache.logging.log4j</groupId>
                            <artifactId>log4j-1.2-api</artifactId>
                            <version>${log4j2.version}</version>
                            <type>jar</type>
                            <overWrite>true</overWrite>
                            <outputDirectory>${project.build.directory}/log4j-v2-jars</outputDirectory>
                            <destFileName>log4j-v2-1.2-api.jar</destFileName>
                        </artifactItem>
                        <artifactItem>
                            <groupId>org.apache.logging.log4j</groupId>
                            <artifactId>log4j-slf4j-impl</artifactId>
                            <version>${log4j2.version}</version>
                            <type>jar</type>
                            <overWrite>true</overWrite>
                            <outputDirectory>${project.build.directory}/log4j-v2-jars</outputDirectory>
                            <destFileName>log4j-v2-slf4j-impl.jar</destFileName>
                        </artifactItem>
                    </artifactItems>
                    <outputDirectory>${project.build.directory}/wars</outputDirectory>
                    <overWriteReleases>false</overWriteReleases>
                    <overWriteSnapshots>true</overWriteSnapshots>
                </configuration>
            </plugin>

我之所以提出这个建议是因为也许在你的情况下(就像我的工作一样)你的集群在一个非常严格的防火墙后面并且不允许 spark 与 nexus 交谈以在提交时解析包步。在那种情况下,您确实需要在准备人工制品时处理这个问题,其中任何一个都可能对您有所帮助。

在我使用 maven-dependency 的示例中,我获取 log4jv2 并将其传递给 spark 2.3,以便获得 log4j-v2 日志输出(您可以放置​​依赖项)。