运行 定制的 Kafka 流 DSL 应用程序 returns java.lang.ClassNotFoundException

running custom built Kafka streams DSL app returns java.lang.ClassNotFoundException

我正在尝试从包含 json 数据的 kafka 主题中读取数据,并根据字段 "entity" 的值写入新主题。我正在使用以下代码从 kafka

读取和写入
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import java.util.Properties;

public class entityDataLoader {
public static void main(final String[] args) throws Exception {
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "map-function-lambda-example");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName());
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

// Set up serializers and deserializers, which we will use for overriding the default serdes
// specified above.
final Serde<String> stringSerde = Serdes.String();
final Serde<byte[]> byteArraySerde = Serdes.ByteArray();

// In the subsequent lines we define the processing topology of the Streams application.
final KStreamBuilder builder = new KStreamBuilder();

// Read the input Kafka topic into a KStream instance.
final KStream<byte[], String> textLines = builder.stream(byteArraySerde, stringSerde, "postilion-events");

String content = textLines.toString();
String entity = JSONExtractor.returnJSONValue(content, "entity");
System.out.println(entity);

textLines.to(entity);

final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
streams.cleanUp();
streams.start();

// Add shutdown hook to respond to SIGTERM and gracefully close Kafka Streams
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}

有什么想法可以成功地运行这个应用程序吗?

使用 Netbeans,我使用依赖项构建并将 jar 文件放在 /home/kafka 路径中并尝试 运行 它作为 class 路径并指定 class我创建了(使用命令 java -cp mavenproject.jar postilionkafka.entityDataLoader)。我收到以下错误

Error: A JNI error has occurred, please check your installation and try again
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/kafka/streams/processor/TopologyBuilder
    at java.lang.Class.getDeclaredMethods0(Native Method)
    at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
    at java.lang.Class.privateGetMethodRecursive(Class.java:3048)
    at java.lang.Class.getMethod0(Class.java:3018)
    at java.lang.Class.getMethod(Class.java:1784)
    at sun.launcher.LauncherHelper.validateMainClass(LauncherHelper.java:544)
    at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:526)
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.streams.processor.TopologyBuilder
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 7 more

感谢@James,我已经能够解决这个问题。我无法从主题中的记录中提取实体数据。题目中的记录是JSON,例子是{"date":{"string":"2017-03-20"},"time":{"string":"20:04:13:563"},"event_nr":1572470,"interface":"Transaction Manager","event_id":5001,"date_time":1490040253563,"entity":"Transaction Manager","state":0,"msg_param_1":{"string":"ISWSnk"},"msg_param_2":{"string":"Application startup"},"msg_param_3":null,"msg_param_4":null,"msg_param_5":null,"msg_param_6":null,"msg_param_7":null,"msg_param_8":null,"msg_param_9":null,"long_msg_param_1":null,"long_msg_param_2":null,"long_msg_param_3":null,"long_msg_param_4":null,"long_msg_param_5":null,"long_msg_param_6":null,"long_msg_param_7":null,"long_msg_param_8":null,"long_msg_param_9":null,"last_sent":{"long":1490040253563},"transmit_count":{"int":1},"team_id":null,"app_id":{"int":4},"logged_by_app_id":{"int":4},"entity_type":{"int":3},"binary_data":null}

我想根据 entity 字段的值写一个主题(对于下面的 json 示例,它应该写一个主题 事务管理器。如果我运行我当前的代码,我得到下面的错误

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. org.apache.kafka.streams.kstream.internals.KStreamImpl@568db2f2 No Object found Unexpected character (o) at position 0. null Exception in thread "main" java.lang.NullPointerException: topic can't be null at java.util.Objects.requireNonNull(Objects.java:228) at org.apache.kafka.streams.kstream.internals.KStreamImpl.to(KStreamImpl.java:353) at org.apache.kafka.streams.kstream.internals.KStreamImpl.to(KStreamImpl.java:337) at postilionkafka.dataload.main(dataload.java:35)

JSON提取器 class 定义为

import org.json.simple.JSONObject;
import org.json.simple.parser.ParseException;
import org.json.simple.parser.JSONParser;
class JSONExtractor {

/**
 *
 */
public static String returnJSONValue(String args, String value){
    JSONParser parser = new JSONParser();
    String app= null;
    System.out.println(args);
    try{
        Object obj = parser.parse(args);
        JSONObject JObj = (JSONObject)obj;
        app= (String) JObj.get(value);
        return app;
    }
    catch(ParseException pe){
        System.out.println("No Object found");
        System.out.println(pe);
    }
    return app;
}
}

这看起来像是一个简单的 class 路径问题,请尝试在 class 路径参数中添加所有非标准 java 的 jar,例如:

java -cp kafka-stream.jar:mavenproject.jar postilionkafka.entityDataLoader

这往往很快就会变得过于复杂,这也是我们使用 Maven 来管理依赖项的原因之一。我通常 运行 我正在处理的任何应用程序直接来自 IDE,这也是一种更简单的调试方法。如果我确实必须在我的 IDE 之外启动,我仍然会尝试从我的 IDE 开始,IntelliJ 会注销包含所需依赖项的执行命令,并节省我重新建立这些依赖项的时间是以及如何从我的本地 Maven 存储库中提取它们。

如果 IDE 中的 运行ning 对您不起作用,则另一种方法是使用 Maven exec。在 running a project from maven.

上查看此答案