kafka 流不 运行 动态生成 类

kafka streams do not run with dynamically generated classes

我想启动一个反序列化动态创建的流 Class。此 Bean 是通过使用反射和 URLCLassLOader 以给定字符串 Class 作为参数创建的,但 KafkaStreams API 无法识别我的新 class.

流与预先创建的 Bean 完美配合,但在使用动态 Bean 时会自动关闭。反序列化器是与 Jackson 一起创建的,也可以单独工作。

这里是 class 解析器代码

@SuppressWarnings("unchecked")
public static Class<?> getClassFromSource(String className, String sourceCode)
        throws IOException, ClassNotFoundException {

    /*
     * create an empty source file
     */
    File sourceFile = new File(com.google.common.io.Files.createTempDir(), className + ".java");

    sourceFile.deleteOnExit();

    /*
     * generate the source code, using the source filename as the class name write
     * the source code into the source file
     */
    try (FileWriter writer = new FileWriter(sourceFile)) {
        writer.write(sourceCode);
    }

    /*
     * compile the source file
     */
    JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();

    File parentDirectory = null;

    try (StandardJavaFileManager fileManager = compiler.getStandardFileManager(null, null, null)) {

        parentDirectory = sourceFile.getParentFile();

        fileManager.setLocation(StandardLocation.CLASS_OUTPUT, Arrays.asList(parentDirectory));

        Iterable<? extends JavaFileObject> compilationUnits = fileManager
                .getJavaFileObjectsFromFiles(Arrays.asList(sourceFile));

        compiler.getTask(null, fileManager, null, null, null, compilationUnits).call();
    }

    /*
     * load the compiled class
     */
    try (StandardJavaFileManager fileManager = compiler.getStandardFileManager(null, null, null)) {

        parentDirectory = sourceFile.getParentFile();

        fileManager.setLocation(StandardLocation.CLASS_OUTPUT, Arrays.asList(parentDirectory));

        Iterable<? extends JavaFileObject> compilationUnits = fileManager
                .getJavaFileObjectsFromFiles(Arrays.asList(sourceFile));

        compiler.getTask(null, fileManager, null, null, null, compilationUnits).call();
    }

    /*
     * load the compiled class
     */
    try (URLClassLoader classLoader = URLClassLoader.newInstance(new URL[] { parentDirectory.toURI().toURL() })) {
        return (Class<?>) classLoader.loadClass(className);
    }
}

首先我实例化我的 Serdes,它接收一个 Class 作为参数

// dynamic generated class from a source class
Class clazz = getClassFromSource("DynamicClass", source);

// Serdes for created class that implements org.apache.kafka.common.serialization.Deserializer
DynamicDeserializer deserializer = new DynamicDeserializer(clazz);
DynamicSerializer serializer = new DynamicSerializer();
Serde<?> encryptedSerde = Serdes.serdeFrom(serializer, deserializer);

然后启动使用这个Serdes的Stream拓扑

StreamsBuilder builder = new StreamsBuilder();

KTable<String, Long> dynamicStream = builder
            .stream(topicName, Consumed.with(Serdes.String(), encryptedSerde))
            .groupByKey()
            .count();

dynamicStream.to(outputTopicName, Produced.with(Serdes.String(), Serdes.Long()));

流拓扑应该正常执行,但总是产生这个错误

2019-09-01 14:54:16 WARN  ConsumerConfig:355 - The configuration 'log4j.appender.stdout.Target' was supplied but isn't a known config.
2019-09-01 14:54:16 WARN  ConsumerConfig:355 - The configuration 'log4j.appender.stdout.layout' was supplied but isn't a known config.
2019-09-01 14:54:16 WARN  ConsumerConfig:355 - The configuration 'log4j.appender.stdout.layout.ConversionPattern' was supplied but isn't a known config.
2019-09-01 14:54:16 WARN  ConsumerConfig:355 - The configuration 'stream.restart.application' was supplied but isn't a known config.
2019-09-01 14:54:16 WARN  ConsumerConfig:355 - The configuration 'aes.key.path' was supplied but isn't a known config.
2019-09-01 14:54:16 WARN  ConsumerConfig:355 - The configuration 'path.to.listening' was supplied but isn't a known config.
2019-09-01 14:54:16 WARN  ConsumerConfig:355 - The configuration 'log4j.appender.stdout' was supplied but isn't a known config.
2019-09-01 14:54:16 WARN  ConsumerConfig:355 - The configuration 'admin.retries' was supplied but isn't a known config.
2019-09-01 14:54:16 WARN  ConsumerConfig:355 - The configuration 'log4j.rootLogger' was supplied but isn't a known config.
2019-09-01 14:54:16 INFO  AppInfoParser:117 - Kafka version: 2.3.0
2019-09-01 14:54:16 INFO  AppInfoParser:118 - Kafka commitId: fc1aaa116b661c8a
2019-09-01 14:54:16 INFO  AppInfoParser:119 - Kafka startTimeMs: 1567360456724
2019-09-01 14:54:16 INFO  KafkaStreams:800 - stream-client [streamingbean-test-20190901145412544-15574162-7649-4c98-acd2-7a68ced01d72] Started Streams client
2019-09-01 14:54:16 INFO  StreamThread:740 - stream-thread [streamingbean-test-20190901145412544-15574162-7649-4c98-acd2-7a68ced01d72-StreamThread-1] Starting
2019-09-01 14:54:16 INFO  StreamThread:207 - stream-thread [streamingbean-test-20190901145412544-15574162-7649-4c98-acd2-7a68ced01d72-StreamThread-1] State transition from CREATED to RUNNING
2019-09-01 14:54:16 INFO  KafkaConsumer:1027 - [Consumer clientId=streamingbean-test-20190901145412544-15574162-7649-4c98-acd2-7a68ced01d72-StreamThread-1-consumer, groupId=streamingbean-test-20190901145412544] Subscribed to pattern: 'DynamicBean|streamingbean-test-20190901145412544-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition'
2019-09-01 14:54:17 INFO  Metadata:266 - [Producer clientId=streamingbean-test-20190901145412544-15574162-7649-4c98-acd2-7a68ced01d72-StreamThread-1-producer] Cluster ID: tp7OBhwVRQqT2NpPlL55_Q
2019-09-01 14:54:17 INFO  Metadata:266 - [Consumer clientId=streamingbean-test-20190901145412544-15574162-7649-4c98-acd2-7a68ced01d72-StreamThread-1-consumer, groupId=streamingbean-test-20190901145412544] Cluster ID: tp7OBhwVRQqT2NpPlL55_Q
2019-09-01 14:54:17 INFO  AbstractCoordinator:728 - [Consumer clientId=streamingbean-test-20190901145412544-15574162-7649-4c98-acd2-7a68ced01d72-StreamThread-1-consumer, groupId=streamingbean-test-20190901145412544] Discovered group coordinator AcerDerick:9092 (id: 2147483647 rack: null)
2019-09-01 14:54:17 INFO  ConsumerCoordinator:476 - [Consumer clientId=streamingbean-test-20190901145412544-15574162-7649-4c98-acd2-7a68ced01d72-StreamThread-1-consumer, groupId=streamingbean-test-20190901145412544] Revoking previously assigned partitions []
2019-09-01 14:54:17 INFO  StreamThread:207 - stream-thread [streamingbean-test-20190901145412544-15574162-7649-4c98-acd2-7a68ced01d72-StreamThread-1] State transition from RUNNING to PARTITIONS_REVOKED
2019-09-01 14:54:17 INFO  KafkaStreams:257 - stream-client [streamingbean-test-20190901145412544-15574162-7649-4c98-acd2-7a68ced01d72] State transition from RUNNING to REBALANCING
2019-09-01 14:54:17 INFO  KafkaConsumer:1068 - [Consumer clientId=streamingbean-test-20190901145412544-15574162-7649-4c98-acd2-7a68ced01d72-StreamThread-1-restore-consumer, groupId=null] Unsubscribed all topics or patterns and assigned partitions
2019-09-01 14:54:17 INFO  StreamThread:324 - stream-thread [streamingbean-test-20190901145412544-15574162-7649-4c98-acd2-7a68ced01d72-StreamThread-1] partition revocation took 0 ms.
    suspended active tasks: []
    suspended standby tasks: []
2019-09-01 14:54:17 INFO  AbstractCoordinator:505 - [Consumer clientId=streamingbean-test-20190901145412544-15574162-7649-4c98-acd2-7a68ced01d72-StreamThread-1-consumer, groupId=streamingbean-test-20190901145412544] (Re-)joining group
2019-09-01 14:54:17 ERROR StreamsPartitionAssignor:354 - stream-thread [streamingbean-test-20190901145412544-15574162-7649-4c98-acd2-7a68ced01d72-StreamThread-1-consumer] DynamicClass is unknown yet during rebalance, please make sure they have been pre-created before starting the Streams application.
2019-09-01 14:54:17 INFO  AbstractCoordinator:469 - [Consumer clientId=streamingbean-test-20190901145412544-15574162-7649-4c98-acd2-7a68ced01d72-StreamThread-1-consumer, groupId=streamingbean-test-20190901145412544] Successfully joined group with generation 1
2019-09-01 14:54:17 INFO  ConsumerCoordinator:283 - [Consumer clientId=streamingbean-test-20190901145412544-15574162-7649-4c98-acd2-7a68ced01d72-StreamThread-1-consumer, groupId=streamingbean-test-20190901145412544] Setting newly assigned partitions: 
2019-09-01 14:54:17 INFO  StreamThread:1164 - stream-thread [streamingbean-test-20190901145412544-15574162-7649-4c98-acd2-7a68ced01d72-StreamThread-1] Informed to shut down
2019-09-01 14:54:17 INFO  StreamThread:207 - stream-thread [streamingbean-test-20190901145412544-15574162-7649-4c98-acd2-7a68ced01d72-StreamThread-1] State transition from PARTITIONS_REVOKED to PENDING_SHUTDOWN
2019-09-01 14:54:17 INFO  StreamThread:1178 - stream-thread [streamingbean-test-20190901145412544-15574162-7649-4c98-acd2-7a68ced01d72-StreamThread-1] Shutting down
2019-09-01 14:54:17 INFO  KafkaConsumer:1068 - [Consumer clientId=streamingbean-test-20190901145412544-15574162-7649-4c98-acd2-7a68ced01d72-StreamThread-1-restore-consumer, groupId=null] Unsubscribed all topics or patterns and assigned partitions
2019-09-01 14:54:17 INFO  KafkaProducer:1153 - [Producer clientId=streamingbean-test-20190901145412544-15574162-7649-4c98-acd2-7a68ced01d72-StreamThread-1-producer] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2019-09-01 14:54:17 INFO  StreamThread:207 - stream-thread [streamingbean-test-20190901145412544-15574162-7649-4c98-acd2-7a68ced01d72-StreamThread-1] State transition from PENDING_SHUTDOWN to DEAD
2019-09-01 14:54:17 INFO  StreamThread:1198 - stream-thread [streamingbean-test-20190901145412544-15574162-7649-4c98-acd2-7a68ced01d72-StreamThread-1] Shutdown complete

一段时间后,我用一个简单的解决方案解决了这个问题,但可能不是最优雅的。首先,我使用 JSON 字符串反序列化器从主题中获取数据,然后将其传递给另一个反序列化器,该反序列化器转换为我的动态对象。